79 lines
2.7 KiB
Python
79 lines
2.7 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import Any
|
|
|
|
import serial
|
|
import serial.tools.list_ports
|
|
|
|
from backend.services.pico import parse_pico_line
|
|
|
|
|
|
DEFAULT_BAUD = 115200
|
|
|
|
|
|
def find_pico_port() -> str | None:
|
|
for port in serial.tools.list_ports.comports():
|
|
vid = port.vid
|
|
manufacturer = (port.manufacturer or "").lower()
|
|
description = (port.description or "").lower()
|
|
if vid == 0x2E8A or "pico" in description or "raspberry" in manufacturer:
|
|
return port.device
|
|
return None
|
|
|
|
|
|
class SerialService:
|
|
def __init__(self, app: Any):
|
|
self.app = app
|
|
self.task: asyncio.Task[None] | None = None
|
|
self.stop_event = asyncio.Event()
|
|
self.connected_port: str | None = None
|
|
|
|
def start(self) -> None:
|
|
if self.task is None or self.task.done():
|
|
self.stop_event.clear()
|
|
self.task = asyncio.create_task(self._run())
|
|
|
|
async def stop(self) -> None:
|
|
self.stop_event.set()
|
|
if self.task:
|
|
await asyncio.wait([self.task], timeout=2)
|
|
|
|
async def restart(self) -> None:
|
|
await self.stop()
|
|
self.task = None
|
|
self.start()
|
|
|
|
async def _run(self) -> None:
|
|
while not self.stop_event.is_set():
|
|
port = self.app.db.get_setting("serial_port") or find_pico_port()
|
|
if not port:
|
|
self.connected_port = None
|
|
await self.app.ws.broadcast("serial.disconnected", {"reason": "Pico not found"})
|
|
await asyncio.sleep(2)
|
|
continue
|
|
try:
|
|
await self._read_port(port)
|
|
except Exception as exc:
|
|
self.connected_port = None
|
|
await self.app.ws.broadcast("serial.disconnected", {"port": port, "error": str(exc)})
|
|
await asyncio.sleep(2)
|
|
|
|
async def _read_port(self, port: str) -> None:
|
|
with serial.Serial(port, DEFAULT_BAUD, timeout=1) as ser:
|
|
self.connected_port = port
|
|
await self.app.ws.broadcast("serial.connected", {"port": port, "baud": DEFAULT_BAUD})
|
|
while not self.stop_event.is_set():
|
|
raw = await asyncio.to_thread(ser.readline)
|
|
line = raw.decode("utf-8", errors="replace").strip()
|
|
if not line:
|
|
continue
|
|
event = parse_pico_line(line)
|
|
if event is None:
|
|
await self.app.ws.broadcast("serial.diagnostic", {"line": line})
|
|
continue
|
|
payload = event.to_dict()
|
|
self.app.db.add_event(f"button.{payload['event']}", payload)
|
|
await self.app.actions.handle_button_event(payload)
|
|
|