Example3ΒΆ
This example simulates a driver which snoops on the thermostat of the previous example, and if the temperature becomes too hot, it opens a window, and closes it when the temperature drops:
import asyncio, time
from indipydriver import (IPyServer, IPyDriver, Device,
TextVector, TextMember,
getProperties, setNumberVector
)
# Other vectors, members and events are available, this example only imports those used.
class WindowControl:
"This is a simulation containing variables only"
def __init__(self):
"Set initial value of window"
self.window = "Open"
# window should be "Open" or 'Closed'
self.update_time = time.time()
# Set whenever a temperature update is requested
def set_window(self, temperature):
"""Gets new temperature, sets window accordingly"""
if temperature > 21:
self.window = "Open"
if temperature < 18:
self.window = "Closed"
self.update_time = time.time()
class WindowDriver(IPyDriver):
"""IPyDriver is subclassed here"""
async def clientevent(self, event):
"""On receiving data from the client, this is called,
Only a 'getProperties' is expected."""
match event:
case getProperties():
await event.vector.send_defVector()
async def hardware(self):
"Update client with window status"
# Send an initial getProperties to snoop on Thermostat
# This is necessary to inform IPyServer that this driver
# wants copies of data sent from the thermostat
await self.send_getProperties(devicename="Thermostat",
vectorname="temperaturevector")
windowcontrol = self.driverdata["windowcontrol"]
statusvector = self['Window']['windowstatus']
while True:
# every ten seconds send an update on window position
await asyncio.sleep(10)
now_time = time.time()
if now_time - windowcontrol.update_time > 20.0:
# No new temperature has been received for longer than 20 seconds
# the thermostat could have been disconnected. Send another
# getProperties just in case it is reconnected
await self.send_getProperties(devicename="Thermostat",
vectorname="temperaturevector")
statusvector['status'] = windowcontrol.window
# and transmit it to the client
await statusvector.send_setVector(allvalues=False)
# allvalues=False means that not all values will be sent, only
# values that have changed, so this avoids unnecessary data
# being transmitted
async def snoopevent(self, event):
"""Handle receipt of an event from the Thermostat."""
windowcontrol = self.driverdata["windowcontrol"]
match event:
case setNumberVector(devicename="Thermostat",
vectorname="temperaturevector") if "temperature" in event:
# A setNumberVector has been sent from the thermostat to the client
# and this driver has received a copy, and so can read the temperature
try:
temperature = self.indi_number_to_float(event["temperature"])
except TypeError:
# ignore an incoming invalid number
return
# this updates windowcontrol which opens or closes the widow
# and also updates its update_time
windowcontrol.set_window(temperature)
def make_driver():
"Creates the driver"
# create hardware object
windowcontrol = WindowControl()
status = TextMember( name="status",
label="Window position",
membervalue=windowcontrol.window )
windowstatus = TextVector( name="windowstatus",
label="Window Status",
group="Values",
perm="ro",
state="Ok",
textmembers=[status] )
# create a Device with these vectors
window = Device( devicename="Window",
properties=[windowstatus] )
# Create the WindowDriver (inherited from IPyDriver) containing this device
windowdriver = WindowDriver( devices=[window],
windowcontrol=windowcontrol )
# and return the driver
return windowdriver
# Assuming the thermostat example is example2.py, these would be run with
if __name__ == "__main__":
import example2
thermodriver = example2.make_driver()
windowdriver = make_driver()
server = IPyServer([thermodriver, windowdriver])
asyncio.run(server.asyncrun())