Files
clickthrough/tests/test_streamer.py
Luna 1b0b9cfdef
Some checks failed
CI / test (push) Failing after 45s
Add planner previews and streaming
2026-04-05 19:33:24 +02:00

42 lines
1.0 KiB
Python

import asyncio
from server.streamer import ScreenshotStreamer
class DummyWebSocket:
def __init__(self):
self.sent = []
self.accepted = False
async def accept(self) -> None:
self.accepted = True
async def send_json(self, payload):
self.sent.append(payload)
def test_streamer_broadcasts_to_grid():
streamer = ScreenshotStreamer()
socket = DummyWebSocket()
async def scenario():
key = await streamer.connect(socket, "grid-123")
await streamer.broadcast("grid-123", {"frame": 1})
streamer.disconnect(socket, key)
asyncio.run(scenario())
assert socket.sent == [{"frame": 1}]
def test_streamer_wildcard_listener_receives_updates():
streamer = ScreenshotStreamer()
socket = DummyWebSocket()
async def scenario():
key = await streamer.connect(socket, None)
await streamer.broadcast("grid-456", {"frame": 2})
streamer.disconnect(socket, key)
asyncio.run(scenario())
assert socket.sent == [{"frame": 2}]