remote.py (3977B)
1 from threading import Thread 2 from time import sleep 3 import tkinter as tk 4 from tkinter import Button, OptionMenu, StringVar, Tk, Label 5 from http.client import HTTPConnection 6 from typing import Optional 7 8 from remote import Remote 9 10 import json 11 import sys 12 import asyncio 13 import websockets 14 15 16 WEBSOCKET_PORT = 1612 17 host, port = 'localhost', 5000 18 19 remote = Remote(115200) 20 token: Optional[str] = None 21 22 23 @remote.command("set_token") 24 def set_token(req): 25 global token 26 token = req['token'] 27 28 29 @remote.command("send") 30 def send_http(params): 31 method, endpoint, body = params["method"], params["endpoint"], params["body"] 32 33 print(body) 34 35 client = HTTPConnection(host, port) 36 client.request(method, endpoint, json.dumps(body)) 37 res = client.getresponse() 38 response = json.load(res) 39 40 print(response) 41 42 return dict(code=res.status, body=response) 43 44 45 token = 'abcdefghijklmnoq' 46 47 48 async def websocket_handler(ws, _): 49 if await ws.recv() == 'token': 50 if token: 51 await ws.send(token) 52 await ws.close() 53 54 55 class RemoteWindow(Tk): 56 running = False 57 closed = False 58 disconnecting = False 59 60 def __init__(self): 61 super().__init__() 62 63 self.title('Team Benni - Remote') 64 self.geometry('500x100') 65 self.protocol("WM_DELETE_WINDOW", self.on_close) 66 67 self.columnconfigure(0, weight=1) 68 self.columnconfigure(1, weight=3) 69 # self.columnconfigure(2, weight=3) 70 71 self.devices = Remote.list_ports() 72 self.device_names = [ 73 f'{p.name} ({p.description})' for p in self.devices] 74 75 self.dev_var = StringVar(self, self.device_names[0]) 76 77 self.label = Label(self, text='Not connected') 78 self.label['anchor'] = tk.CENTER 79 self.label.grid(column=0, row=0, sticky=tk.W, 80 padx=5, pady=5, columnspan=2) 81 82 self.dev_label = Label(self, text='Device:') 83 self.dev_label.grid(column=0, row=1, sticky=tk.E, padx=5, pady=5) 84 85 self.dev_menu = OptionMenu(self, self.dev_var, *self.device_names) 86 self.dev_menu.grid(column=1, row=1, sticky=tk.E, padx=5, pady=5) 87 88 self.connect_button = Button( 89 self, text="Connect", command=self.on_connect) 90 self.connect_button.grid(column=1, row=3, sticky=tk.E, padx=5, pady=5) 91 92 async def run_websocket(self): 93 async with websockets.serve(websocket_handler, '0.0.0.0', # type: ignore 94 WEBSOCKET_PORT): 95 while self.running: 96 await asyncio.sleep(1) 97 98 def on_connect(self): 99 if self.disconnecting: 100 return 101 self.running = not self.running 102 if self.running: 103 port = self.devices[self.device_names.index(self.dev_var.get())] 104 105 self.websocket_thread = Thread( 106 target=lambda: asyncio.run(self.run_websocket())) 107 self.remote_thread = Thread( 108 target=lambda: remote.run(port.device)) 109 110 self.websocket_thread.start() 111 self.remote_thread.start() 112 113 self.label['text'] = f'Connected to {port.name}' 114 if port.description != 'n/a': 115 self.label['text'] += f' ({port.description})' 116 self.connect_button['text'] = 'Disconnect' 117 else: 118 remote.stop() 119 self.disconnecting = True 120 121 self.connect_button['text'] = 'Disconnecting...' 122 123 def on_close(self): 124 if self.running: 125 self.on_connect() 126 127 self.closed = True 128 129 def run(self): 130 while not self.closed or self.disconnecting: 131 if self.disconnecting and not self.remote_thread.is_alive() and not self.websocket_thread.is_alive(): 132 self.label['text'] = 'Not connected' 133 self.connect_button['text'] = 'Connect' 134 self.disconnecting = False 135 136 sleep(0.1) 137 self.update() 138 139 140 if __name__ == "__main__": 141 win = RemoteWindow() 142 win.run()