1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| async def serial_wr_loop(self) -> None: async def read_from_serial(reader): count = 0 self.running = True while not self.stopped: data = await reader.read(1000) count += 1
logger.info( "Received at `%s` from Serial: `%s`", count, data, )
data_json = {} if 'None' in gw.serial_device: logger.error("gw.serial_device is None, please check") self.stopped = True elif 'YE600X' in gw.serial_device: try: data_str = bytes.decode(data, encoding='utf8') if data_str.find("CONNECTED") >= 0: self.serial_mac = bytes.decode(data[data_str.find(",")+1:], encoding='utf8') self.serial_connect_status = "online" logger.info("serial device connected, mac: %s", self.serial_mac) if data_str.find("DISCONN") >= 0: self.serial_mac = bytes.decode(data[data_str.find(",")+1:], encoding='utf8') self.serial_connect_status = "offline" logger.info("serial device disconnected, mac: %s", self.serial_mac) except: logger.info("serial read ascii data") data_json["name"] = "YE600X" data_json["mac"] = self.serial_mac data_json["connect_status"] = self.serial_connect_status data_json["manufacturerdata"] = ''.join(['%02X' % b for b in data]) elif 'sps' in gw.serial_device: data_json["manufacturerdata"] = ''.join(['%02X' % b for b in data]) data_json["id"] = "88:33:22:FF:44:77" data_json["name"] = "sps" data_json["rssi"] = -15 else: logger.error("gw.serial_device is not defined, please check") self.stopped = True
decoded_json = decodeBLE(json.dumps(data_json)) if decoded_json is None: logger.info("serial decodeBLE return None") else: msg = decoded_json gw.publish( msg, gw.presence_topic, ) logger.error("serial read loop stopped") self.running = False
async def write_to_serial(writer): self.running = True while not self.stopped: writer.write(b'off\n') await writer.drain() await asyncio.sleep(2) logger.error("serial write loop stopped") self.running = False try: reader, writer = await serial_asyncio.open_serial_connection(url=gw.serial_port, baudrate=115200) task_1 = asyncio.create_task(read_from_serial(reader)) task_2 = asyncio.create_task(write_to_serial(writer)) await task_1 await task_2 except: self.running = False self.stopped = True logger.error("serial %s port open failed, please check", gw.serial_port) def run(arg: str) -> None: thread = Thread(target=loop.run_forever, daemon=True) thread.start() asyncio.run_coroutine_threadsafe(gw.serial_wr_loop(), loop)
|