muizenval

Observe mouse traps remotely
Log | Files | Refs

remote.py (3977B)


      1 from threading import Thread
      2 from time import sleep
      3 import tkinter as tk
      4 from tkinter import Button, OptionMenu, StringVar, Tk, Label
      5 from http.client import HTTPConnection
      6 from typing import Optional
      7 
      8 from remote import Remote
      9 
     10 import json
     11 import sys
     12 import asyncio
     13 import websockets
     14 
     15 
     16 WEBSOCKET_PORT = 1612
     17 host, port = 'localhost', 5000
     18 
     19 remote = Remote(115200)
     20 token: Optional[str] = None
     21 
     22 
     23 @remote.command("set_token")
     24 def set_token(req):
     25     global token
     26     token = req['token']
     27 
     28 
     29 @remote.command("send")
     30 def send_http(params):
     31     method, endpoint, body = params["method"], params["endpoint"], params["body"]
     32 
     33     print(body)
     34 
     35     client = HTTPConnection(host, port)
     36     client.request(method, endpoint, json.dumps(body))
     37     res = client.getresponse()
     38     response = json.load(res)
     39 
     40     print(response)
     41 
     42     return dict(code=res.status, body=response)
     43 
     44 
     45 token = 'abcdefghijklmnoq'
     46 
     47 
     48 async def websocket_handler(ws, _):
     49     if await ws.recv() == 'token':
     50         if token:
     51             await ws.send(token)
     52     await ws.close()
     53 
     54 
     55 class RemoteWindow(Tk):
     56     running = False
     57     closed = False
     58     disconnecting = False
     59 
     60     def __init__(self):
     61         super().__init__()
     62 
     63         self.title('Team Benni - Remote')
     64         self.geometry('500x100')
     65         self.protocol("WM_DELETE_WINDOW", self.on_close)
     66 
     67         self.columnconfigure(0, weight=1)
     68         self.columnconfigure(1, weight=3)
     69 #        self.columnconfigure(2, weight=3)
     70 
     71         self.devices = Remote.list_ports()
     72         self.device_names = [
     73             f'{p.name} ({p.description})' for p in self.devices]
     74 
     75         self.dev_var = StringVar(self, self.device_names[0])
     76 
     77         self.label = Label(self, text='Not connected')
     78         self.label['anchor'] = tk.CENTER
     79         self.label.grid(column=0, row=0, sticky=tk.W,
     80                         padx=5, pady=5, columnspan=2)
     81 
     82         self.dev_label = Label(self, text='Device:')
     83         self.dev_label.grid(column=0, row=1, sticky=tk.E, padx=5, pady=5)
     84 
     85         self.dev_menu = OptionMenu(self, self.dev_var, *self.device_names)
     86         self.dev_menu.grid(column=1, row=1, sticky=tk.E, padx=5, pady=5)
     87 
     88         self.connect_button = Button(
     89             self, text="Connect", command=self.on_connect)
     90         self.connect_button.grid(column=1, row=3, sticky=tk.E, padx=5, pady=5)
     91 
     92     async def run_websocket(self):
     93         async with websockets.serve(websocket_handler, '0.0.0.0',  # type: ignore
     94                                     WEBSOCKET_PORT):
     95             while self.running:
     96                 await asyncio.sleep(1)
     97 
     98     def on_connect(self):
     99         if self.disconnecting:
    100             return
    101         self.running = not self.running
    102         if self.running:
    103             port = self.devices[self.device_names.index(self.dev_var.get())]
    104 
    105             self.websocket_thread = Thread(
    106                 target=lambda: asyncio.run(self.run_websocket()))
    107             self.remote_thread = Thread(
    108                 target=lambda: remote.run(port.device))
    109 
    110             self.websocket_thread.start()
    111             self.remote_thread.start()
    112 
    113             self.label['text'] = f'Connected to {port.name}'
    114             if port.description != 'n/a':
    115                 self.label['text'] += f' ({port.description})'
    116             self.connect_button['text'] = 'Disconnect'
    117         else:
    118             remote.stop()
    119             self.disconnecting = True
    120 
    121             self.connect_button['text'] = 'Disconnecting...'
    122 
    123     def on_close(self):
    124         if self.running:
    125             self.on_connect()
    126 
    127         self.closed = True
    128 
    129     def run(self):
    130         while not self.closed or self.disconnecting:
    131             if self.disconnecting and not self.remote_thread.is_alive() and not self.websocket_thread.is_alive():
    132                 self.label['text'] = 'Not connected'
    133                 self.connect_button['text'] = 'Connect'
    134                 self.disconnecting = False
    135 
    136             sleep(0.1)
    137             self.update()
    138 
    139 
    140 if __name__ == "__main__":
    141     win = RemoteWindow()
    142     win.run()