muizenval

Observe mouse traps remotely
Log | Files | Refs

__init__.py (2388B)


      1 from datetime import datetime
      2 from typing import Any, Callable, Dict, Optional
      3 from serial import Serial
      4 from serial.tools.list_ports import comports
      5 from .exception import RemoteException
      6 
      7 import json
      8 import traceback
      9 
     10 
     11 CommandHandler = Callable[[Dict[str, Any]], Optional[Dict[str, Any]]]
     12 
     13 
     14 class Remote:
     15     commands: Dict[str, CommandHandler] = dict()
     16     running = True
     17 
     18     @staticmethod
     19     def list_ports():
     20         return comports()
     21 
     22     def __init__(self, baud: int):
     23         self.baud = baud
     24 
     25     def command(self, name: str):
     26         def inner(func: CommandHandler):
     27             self.commands[name] = func
     28             return func
     29 
     30         return inner
     31 
     32     def run(self, serial_path, timeout=1):
     33         serial = Serial(port=serial_path, baudrate=self.baud, timeout=timeout)
     34         self.running = True
     35         while self.running:
     36             command = ''
     37             status = ''
     38             params = None
     39             response = None
     40             try:
     41                 line = serial.readline().decode(errors='ignore')
     42                 try:
     43                     if line == '':
     44                         continue
     45                     if ' ' in line:
     46                         command, params_raw = line.split(' ', 1)
     47                         params = json.loads(params_raw)
     48                     else:
     49                         command = line
     50                         params = dict()
     51                 except json.JSONDecodeError:
     52                     raise RemoteException('bad-request')
     53 
     54                 if command not in self.commands:
     55                     raise RemoteException('bad-command')
     56 
     57                 res = self.commands[command](params) or dict()
     58 
     59                 status = 'ok'
     60                 response = json.dumps(res)
     61             except RemoteException as err:
     62                 status = err.name
     63             except KeyboardInterrupt:
     64                 break
     65             except Exception as err:
     66                 print(f'Error handling {command} ({params}):')
     67                 traceback.print_exc()
     68                 status = 'unknown'
     69 
     70             print(
     71                 f'{datetime.now().strftime("%d/%m/%y %H:%M:%S")} | {command} -> { status }')
     72             serial.write(status.encode())
     73             if response is not None:
     74                 serial.write(b' ' + response.encode())
     75             serial.write(b'\n')
     76         serial.close()
     77 
     78     def stop(self):
     79         self.running = False