__init__.py (2388B)
1 from datetime import datetime 2 from typing import Any, Callable, Dict, Optional 3 from serial import Serial 4 from serial.tools.list_ports import comports 5 from .exception import RemoteException 6 7 import json 8 import traceback 9 10 11 CommandHandler = Callable[[Dict[str, Any]], Optional[Dict[str, Any]]] 12 13 14 class Remote: 15 commands: Dict[str, CommandHandler] = dict() 16 running = True 17 18 @staticmethod 19 def list_ports(): 20 return comports() 21 22 def __init__(self, baud: int): 23 self.baud = baud 24 25 def command(self, name: str): 26 def inner(func: CommandHandler): 27 self.commands[name] = func 28 return func 29 30 return inner 31 32 def run(self, serial_path, timeout=1): 33 serial = Serial(port=serial_path, baudrate=self.baud, timeout=timeout) 34 self.running = True 35 while self.running: 36 command = '' 37 status = '' 38 params = None 39 response = None 40 try: 41 line = serial.readline().decode(errors='ignore') 42 try: 43 if line == '': 44 continue 45 if ' ' in line: 46 command, params_raw = line.split(' ', 1) 47 params = json.loads(params_raw) 48 else: 49 command = line 50 params = dict() 51 except json.JSONDecodeError: 52 raise RemoteException('bad-request') 53 54 if command not in self.commands: 55 raise RemoteException('bad-command') 56 57 res = self.commands[command](params) or dict() 58 59 status = 'ok' 60 response = json.dumps(res) 61 except RemoteException as err: 62 status = err.name 63 except KeyboardInterrupt: 64 break 65 except Exception as err: 66 print(f'Error handling {command} ({params}):') 67 traceback.print_exc() 68 status = 'unknown' 69 70 print( 71 f'{datetime.now().strftime("%d/%m/%y %H:%M:%S")} | {command} -> { status }') 72 serial.write(status.encode()) 73 if response is not None: 74 serial.write(b' ' + response.encode()) 75 serial.write(b'\n') 76 serial.close() 77 78 def stop(self): 79 self.running = False