Example4ΒΆ

This example is similar to example 3, operating a thermostat. However in this case the driver delegates the operation to a subclass of a Device object.

It may seem an unecessary complication, however the method could be useful if the driver runs multiple devices, and it keeps the logic of each device separate.

A simple subclass of the driver is created, sending events to devices, and running the device devhardware methods.

The device is subclassed to handle these events, rather than the driver handling them:

# Simulated thermostat with settable target.
# This example illustrates delegation to a Device class

import asyncio

import indipydriver as ipd

from indipyserver import IPyServer

class ThermalControl:
    """This is a simulation containing variables only, normally it
       would control a real heater, and take temperature measurements
       from a sensor."""

    def __init__(self, devicename, target=15):
        """Set start up values"""
        # It is useful to give this controlling object the devicename
        # reference, so it can be identified throughout the code
        self.devicename = devicename
        self.target = target
        self.temperature = 20
        self.heater = "Off"


    async def run_thermostat(self):
        """This simulates temperature increasing/decreasing, and turns
           on/off a heater if moving too far from the target."""
        while True:
            await asyncio.sleep(2)
            if self.heater == "On":
                # increasing temperature if the heater is on
                self.temperature += 0.1
            else:
                # decreasing temperature if the heater is off
                self.temperature -= 0.1

            if self.temperature > self.target+0.5:
                # too hot
                self.heater = "Off"

            if self.temperature < self.target-0.5:
                # too cold
                self.heater = "On"


# Note the driver class name below is not specific to the thermostat
# since this is general purpose

class DelegateDriver(ipd.IPyDriver):

    """IPyDriver is subclassed here, with methods
       to delegate control to any included devices"""

    async def rxevent(self, event):
        """On receiving data, this is called, it sends any received events
           to the appropriate device"""

        if event.devicename in self:
           await self[event.devicename].devrxevent(event)


    async def hardware(self):
        """This coroutine starts when the driver starts, and starts any
           devices devhardware() tasks"""

        # Note no await - the background tasks will automatically be added to a taskgroup

        for device in self.values():
            self.add_background(device.devhardware())



class ThermoDevice(ipd.Device):

    """Device is subclassed here, with a method
       to run the thermalcontrol.run_thermostat() method,
       accept a target temperature,
       and to transmit the temperature to the client"""


    async def devrxevent(self, event):
        "On receiving data, this is called by the drivers rxevent method"

        thermalcontrol = self.devicedata["thermalcontrol"]

        if isinstance(event, ipd.newNumberVector):
            if event.vectorname == "targetvector" and 'target' in event:
                # Set the received value as the thermostat target

                # The self.indi_number_to_float method converts the received string,
                # which may be in a number of formats to a Python float value. This
                # can then be set into thermalcontrol
                try:
                    target = self.indi_number_to_float( event['target'] )
                except TypeError:
                    # ignore an incoming invalid number
                    return
                # set new target
                thermalcontrol.target = target
                # and set the new target value into the vector,
                # then transmit the vector back to client.
                event.vector['target'] = target
                await event.vector.send_setVector()


    async def devhardware(self):
        """This coroutine is added as a background task by the driver's
           hardware method."""

        # get the ThermalControl object which actually runs the
        # instrument, and which is available in the named
        # arguments dictionary 'self.devicedata'.
        thermalcontrol = self.devicedata["thermalcontrol"]

        # set the thermalcontrol instrument running
        self.add_background(thermalcontrol.run_thermostat())

        vector = self['temperaturevector']
        while not self.stop:
            await asyncio.sleep(10)
            # Send the temperature every 10 seconds
            vector['temperature'] = thermalcontrol.temperature
            # and transmit it to the client
            await vector.send_setVector()


def make_driver(devicename, target):
    "Returns an instance of the driver"

    # Make an instance of the object controlling the instrument
    thermalcontrol = ThermalControl(devicename, target)

    # Make a NumberMember holding the temperature value
    temperature = ipd.NumberMember( name="temperature",
                                    format='%3.1f',
                                    membervalue=thermalcontrol.temperature )
    # Make a NumberVector instance, containing the member.
    temperaturevector = ipd.NumberVector( name="temperaturevector",
                                          label="Temperature",
                                          group="Values",
                                          perm="ro",
                                          state="Ok",
                                          numbermembers=[temperature] )

    # create a NumberMember holding the target value
    target = ipd.NumberMember( name="target",
                               format='%3.1f', min=-50, max=99,
                               membervalue=thermalcontrol.target )
    targetvector = ipd.NumberVector( name="targetvector",
                                     label="Target",
                                     group="Values",
                                     perm="rw",
                                     state="Ok",
                                     numbermembers=[target] )

    # note the targetvector has permission rw so the client can set it

    # create an instance of your subclass device with the two vectors
    # and since we are delegating operation to the ThermoDevice,
    # include the thermalcontrol object which will appear in self.devicedata
    thermostat = ThermoDevice( devicename=devicename,
                               properties=[temperaturevector, targetvector],
                               thermalcontrol=thermalcontrol )

    # Create the Driver which will contain this Device, note
    # the driver only needs the device object
    driver = DelegateDriver( thermostat )

    # and return the driver
    return driver



if __name__ == "__main__":

    # create and serve the driver
    # the devicename has to be unique in a network of devices,
    # so rather than statically setting it, the name and
    # initial target temperature could come from script arguments

    # in this case we'll set the devicename as "Thermostat",
    # and the target as 15

    # make a driver for the instrument
    thermodriver = make_driver("Thermostat", 15)
    # and a server, which serves this driver
    server = IPyServer(thermodriver)
    print(f"Running {__file__} with indipydriver version {ipd.version}")
    asyncio.run(server.asyncrun())

This pattern of delegation to a device may be used to break up a complex driver across several modules. So each device, with its vectors, members and its contolling code could be separated.