import threading class Controller: def __init__(self, ser): # if there isn't a timeout we can get deadlocked in some situations assert ser.timeout is not None self.ser = ser self.ser_lock = threading.Lock() self.callbacks = dict() self.properties = dict() self.last_status = None # add callbacks to these to capture these events self.on_status = [] self.on_fan_toggle = [] self.on_humd_toggle = [] self.registerCallback(b"s:", lambda line: self._read_status(line)) self.registerCallback(b"h:", lambda line: self._toggle_cb(line)) self.registerCallback(b"f:", lambda line: self._toggle_cb(line)) self.processing = True self.process_thr = threading.Thread(target=self._process_loop) self.process_thr.start() def stop(self): self.processing = False if self.process_thr is not None: self.process_thr.join() def restart(self): self.stop() self.processing = True self.process_thr = threading.Thread(target=self._process_loop) self.process_thr.start() def _read_status(self, line: bytes): parts = line.split(b":") if len(parts) < 5: print("w: invalid status ", line) return self.last_status = { "humd": float(parts[1]), "temp": float(parts[2]), "v": float(parts[3]), "v2": float(parts[4]), } for cb in self.on_status: cb(self.last_status) def _toggle_cb(self, line: bytes): parts = line.split(b":") if len(parts) < 2: print("w: invalid status ", line) return try: ty = parts[0] toggle_map = { b"f": self.on_fan_toggle, b"h": self.on_humd_toggle, } if ty not in toggle_map: print("e: invalid toggle update ", line) return status = int(parts[1]) for cb in toggle_map[ty]: cb(status) except Exception as e: print("e: bad toggle update ", line, repr(e)) def registerCallback(self, prefix, cb): self.callbacks[prefix] = cb def unregisterCallback(self, prefix): del self.callbacks[prefix] def _process_loop(self): tmp = b"" try: while self.processing: line = self.ser.read_until(b"\n") tmp += line if len(tmp) == 0: continue if not tmp.endswith(b"\n"): continue if len(tmp) > 80: print("w: ignoring long unterminated line ", tmp) tmp = b"" continue self._process_line(tmp.rstrip()) # strip off the newlines tmp = b"" finally: self.process_thr = None def _process_line(self, line): matching = [] for prefix in self.callbacks: if line.startswith(prefix): matching.append(prefix) if len(matching) == 0: print("w: uncaught line ", line) for prefix in matching: self.callbacks[prefix](line) def _read(self, prop): prefix = bytes("r:" + prop, "utf8") result, me = None, self ev = threading.Event() def set_result(v): nonlocal result, me, ev if len(v) >= len(prefix): result = v[len(prefix) + 1 :] ev.set() else: print("e: invalid line found ", v) me.unregisterCallback(prefix) self.registerCallback(prefix, set_result) self.ser.write(prefix + b"\n") ev.wait(timeout=1.0) return result def _write(self, prop, val): prefix = bytes("w:" + prop, "utf8") full_msg = prefix + bytes(":{}\n".format(val), "utf8") success, me = False, self ev = threading.Event() def set_result(_): nonlocal success, me, ev success = True ev.set() me.unregisterCallback(prefix) self.registerCallback(prefix, set_result) self.ser.write(full_msg) ev.wait(timeout=1.0) return success def enumerate(self): done, line_added, me = threading.Event(), False, self def add_property(line: bytes): nonlocal me, done, line_added parts = line.split(b":") if len(parts) >= 3: ty_map = {b"i": int, b"f": float, b"b": lambda x: bool(int(x))} if parts[2] in ty_map: me.properties[parts[1].decode("utf8")] = ty_map[parts[2]] line_added = True else: print("w: unknown property {} found: {}".format(parts[1], line)) elif line.startswith(b"d:done"): done.set() self.registerCallback(b"d:", add_property) self.ser.write(b"d\n") done.wait(timeout=1.0) def queryStatus(self): self.ser.write(b"s\n") # callbacks should handle everything else def getProp(self, prop: str): if prop not in self.properties: raise ValueError("invalid property {}".format(prop)) for _ in range(5): result = self._read(prop) if result is not None: try: return self.properties[prop](result) except Exception as e: print("e: unable to convert result ", result, repr(e)) raise RuntimeError("unable to read prop {}".format(prop)) def setProp(self, prop: str, val): if prop not in self.properties: raise ValueError("invalid property {}".format(prop)) for _ in range(5): result = self._write(prop, val) if result is not None: try: return self.properties[prop](result) except Exception as e: print("e: unable to convert result ", result, repr(e)) raise RuntimeError("unable to write prop {} {}".format(prop, val)) def humidifier(self, enable): if enable: self.ser.write(b"H\n") else: self.ser.write(b"h\n") def fan(self, enable): if enable: self.ser.write(b"F\n") else: self.ser.write(b"f\n") def manualMode(self, enable): if enable: self.ser.write(b"M\n") else: self.ser.write(b"m\n")