Add planner previews and streaming
Some checks failed
CI / test (push) Failing after 45s

This commit is contained in:
2026-04-05 19:33:24 +02:00
parent b1d2b6b321
commit 1b0b9cfdef
12 changed files with 332 additions and 31 deletions

41
tests/test_streamer.py Normal file
View File

@@ -0,0 +1,41 @@
import asyncio
from server.streamer import ScreenshotStreamer
class DummyWebSocket:
def __init__(self):
self.sent = []
self.accepted = False
async def accept(self) -> None:
self.accepted = True
async def send_json(self, payload):
self.sent.append(payload)
def test_streamer_broadcasts_to_grid():
streamer = ScreenshotStreamer()
socket = DummyWebSocket()
async def scenario():
key = await streamer.connect(socket, "grid-123")
await streamer.broadcast("grid-123", {"frame": 1})
streamer.disconnect(socket, key)
asyncio.run(scenario())
assert socket.sent == [{"frame": 1}]
def test_streamer_wildcard_listener_receives_updates():
streamer = ScreenshotStreamer()
socket = DummyWebSocket()
async def scenario():
key = await streamer.connect(socket, None)
await streamer.broadcast("grid-456", {"frame": 2})
streamer.disconnect(socket, key)
asyncio.run(scenario())
assert socket.sent == [{"frame": 2}]