from __future__ import annotations import asyncio import json import threading from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from backend.database import Database from backend.services.actions import ActionEngine from backend.services.pico import parse_pico_line from backend.services.plugins import PluginContext, PluginManager from plugins.clipboard_tools import ClipboardToolsPlugin from plugins.http_requests import HTTPRequestsPlugin from plugins.wled import WLEDPlugin class DummyWs: async def broadcast(self, *_args, **_kwargs): return None class DummyApp: def __init__(self, db): self.db = db self.ws = DummyWs() self.actions = type("Actions", (), {"keys": type("Keys", (), {"press_combo": staticmethod(_noop_press_combo)})()})() def public_state(self): return self.db.state() async def _noop_press_combo(_combo): return None def test_parse_pico_event_accepts_current_firmware_json(): event = parse_pico_line('{"button":2,"pin":27,"event":"down","pressed":true}') assert event is not None assert event.button == 2 assert event.pin == 27 assert event.event == "down" assert event.pressed is True def test_parse_pico_event_ignores_startup_text(): assert parse_pico_line("streamdeck-pico ready") is None assert parse_pico_line("pins=28,27,26") is None def test_database_seeds_profiles_folders_and_buttons(tmp_path: Path): db = Database(tmp_path / "streamdeck.sqlite") state = db.state() assert len(state["profiles"]) == 1 assert len(state["folders"]) == 1 assert len(state["buttons"]) == 10 def test_move_physical_button_swaps_mapping(tmp_path: Path): db = Database(tmp_path / "streamdeck.sqlite") folder_id = db.get_setting("active_folder_id") db.move_physical_button(folder_id, position=1, physical_button=2) buttons = [button for button in db.state()["buttons"] if button["folder_id"] == folder_id] by_pos = {button["position"]: button["physical_button"] for button in buttons} assert by_pos[1] == 2 assert by_pos[2] == 1 def test_physical_layout_syncs_to_all_profiles_and_folders_without_actions(tmp_path: Path): db = Database(tmp_path / "streamdeck.sqlite") first_profile_id = db.get_setting("active_profile_id") canonical_folder_id = db.get_setting("active_folder_id") extra_folder_id = db.create_folder(first_profile_id, canonical_folder_id, "Extra") second_profile_id = db.create_profile("Second") second_root_id = db.get_setting("active_folder_id") second_button = next( button for button in db.state()["buttons"] if button["folder_id"] == second_root_id and button["position"] == 1 ) db.update_button(second_button["id"], {"action_type": "key_combo", "action_config": {"combo": "ctrl+s"}}) db.move_physical_button(canonical_folder_id, position=1, physical_button=2) state = db.state() for folder_id in {canonical_folder_id, extra_folder_id, second_root_id}: buttons = [button for button in state["buttons"] if button["folder_id"] == folder_id] by_pos = {button["position"]: button["physical_button"] for button in buttons} assert by_pos[1] == 2 assert by_pos[2] == 1 updated_second_button = db.get_button(second_button["id"]) assert updated_second_button["action_type"] == "key_combo" assert updated_second_button["action_config"] == {"combo": "ctrl+s"} def test_physical_layout_cannot_be_changed_outside_canonical_folder(tmp_path: Path): db = Database(tmp_path / "streamdeck.sqlite") profile_id = db.get_setting("active_profile_id") root_id = db.get_setting("active_folder_id") folder_id = db.create_folder(profile_id, root_id, "Locked") button = next(button for button in db.state()["buttons"] if button["folder_id"] == folder_id and button["position"] == 1) try: db.move_physical_button(folder_id, position=1, physical_button=2) except ValueError as exc: assert "first profile's root folder" in str(exc) else: raise AssertionError("Expected non-canonical folder move to fail.") try: db.update_button(button["id"], {"physical_button": 2}) except ValueError as exc: assert "first profile's root folder" in str(exc) else: raise AssertionError("Expected non-canonical physical update to fail.") def test_folder_rotation_moves_to_next_folder_and_wraps(tmp_path: Path): db = Database(tmp_path / "streamdeck.sqlite") profile_id = db.get_setting("active_profile_id") root_id = db.get_setting("active_folder_id") first_id = db.create_folder(profile_id, root_id, "First") second_id = db.create_folder(profile_id, root_id, "Second") db.set_setting("active_folder_id", root_id) engine = ActionEngine(DummyApp(db)) asyncio.run(engine.execute("folder_rotation", {"direction": "next"})) assert db.get_setting("active_folder_id") == first_id asyncio.run(engine.execute("folder_rotation", {"direction": "next"})) assert db.get_setting("active_folder_id") == second_id asyncio.run(engine.execute("folder_rotation", {"direction": "next"})) assert db.get_setting("active_folder_id") == root_id def test_profiles_and_folders_can_be_renamed_and_deleted(tmp_path: Path): db = Database(tmp_path / "streamdeck.sqlite") profile_id = db.get_setting("active_profile_id") root_id = db.get_setting("active_folder_id") db.update_profile(profile_id, name="Renamed Profile") db.update_folder(root_id, name="Renamed Root") folder_id = db.create_folder(profile_id, root_id, "Temporary") state = db.state() assert state["profiles"][0]["name"] == "Renamed Profile" assert next(folder for folder in state["folders"] if folder["id"] == root_id)["name"] == "Renamed Root" db.delete_folder(folder_id) assert all(folder["id"] != folder_id for folder in db.state()["folders"]) def test_first_profile_and_root_folder_are_protected(tmp_path: Path): db = Database(tmp_path / "streamdeck.sqlite") first_profile_id = db.get_setting("active_profile_id") root_id = db.get_setting("active_folder_id") try: db.delete_profile(first_profile_id) except ValueError as exc: assert "first profile" in str(exc) else: raise AssertionError("Expected first profile delete to fail.") try: db.delete_folder(root_id) except ValueError as exc: assert "root folder" in str(exc) else: raise AssertionError("Expected root folder delete to fail.") def test_plugin_failure_is_isolated(tmp_path: Path): plugin_dir = tmp_path / "plugins" plugin_dir.mkdir() (plugin_dir / "bad.py").write_text("raise RuntimeError('boom')", encoding="utf-8") manager = PluginManager(plugin_dir) db = Database(tmp_path / "streamdeck.sqlite") manager.load_all(PluginContext(DummyApp(db))) plugins = manager.public_plugins() assert plugins[0]["enabled"] is False assert "boom" in plugins[0]["error"] def test_http_request_plugin_posts_templated_json(tmp_path: Path): received = {} class Handler(BaseHTTPRequestHandler): def do_POST(self): length = int(self.headers.get("Content-Length", "0")) received["path"] = self.path received["token"] = self.headers.get("X-Test-Token") received["body"] = self.rfile.read(length).decode("utf-8") self.send_response(200) self.end_headers() self.wfile.write(b'{"ok":true}') def log_message(self, *_args): return None server = ThreadingHTTPServer(("127.0.0.1", 0), Handler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() try: db = Database(tmp_path / "streamdeck.sqlite") plugin = HTTPRequestsPlugin() plugin.execute_action( PluginContext(DummyApp(db)), "send_request", { "method": "POST", "url": f"http://127.0.0.1:{server.server_port}/webhook/{{{{event.button}}}}", "headers": [{"key": "X-Test-Token", "value": "button-{{event.button}}"}], "body": '{"button":"{{event.button}}","event":"{{event.event}}"}', "timeout": 3, "log_response": True, }, {"button": 4, "event": "down"}, ) finally: server.shutdown() thread.join(timeout=5) assert received["path"] == "/webhook/4" assert received["token"] == "button-4" assert json.loads(received["body"]) == {"button": "4", "event": "down"} def test_wled_plugin_sends_power_brightness_and_color_payloads(tmp_path: Path): received = [] class Handler(BaseHTTPRequestHandler): def do_POST(self): length = int(self.headers.get("Content-Length", "0")) received.append({"path": self.path, "body": json.loads(self.rfile.read(length).decode("utf-8"))}) self.send_response(200) self.end_headers() self.wfile.write(b'{"success":true}') def log_message(self, *_args): return None server = ThreadingHTTPServer(("127.0.0.1", 0), Handler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() try: db = Database(tmp_path / "streamdeck.sqlite") plugin = WLEDPlugin() base_config = {"device_ip": f"127.0.0.1:{server.server_port}"} plugin.execute_action(PluginContext(DummyApp(db)), "power", base_config | {"state": "toggle"}, None) plugin.execute_action(PluginContext(DummyApp(db)), "brightness", base_config | {"brightness": 50}, None) plugin.execute_action( PluginContext(DummyApp(db)), "color", base_config | {"segment_id": 2, "color": "#0f8", "brightness": 25}, None, ) finally: server.shutdown() thread.join(timeout=5) assert [item["path"] for item in received] == ["/json/state", "/json/state", "/json/state"] assert received[0]["body"] == {"on": "t"} assert received[1]["body"] == {"bri": 128} assert received[2]["body"] == {"seg": [{"id": 2, "col": ["00FF88"], "bri": 64}]} def test_wled_plugin_pixel_range_scales_color_for_range_brightness(tmp_path: Path): received = {} class Handler(BaseHTTPRequestHandler): def do_POST(self): length = int(self.headers.get("Content-Length", "0")) received["body"] = json.loads(self.rfile.read(length).decode("utf-8")) self.send_response(200) self.end_headers() self.wfile.write(b"{}") def log_message(self, *_args): return None server = ThreadingHTTPServer(("127.0.0.1", 0), Handler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() try: db = Database(tmp_path / "streamdeck.sqlite") WLEDPlugin().execute_action( PluginContext(DummyApp(db)), "pixel_range", { "device_ip": f"127.0.0.1:{server.server_port}", "segment_id": 1, "start_pixel": 10, "stop_pixel": 18, "color": "#336699", "brightness": 50, }, None, ) finally: server.shutdown() thread.join(timeout=5) assert received["body"] == {"seg": [{"id": 1, "i": [10, 18, "1A334D"]}]} def test_wled_mega_update_combines_effect_color_range_and_extra_json(tmp_path: Path): received = [] class Handler(BaseHTTPRequestHandler): def do_GET(self): if self.path == "/json/eff": body = json.dumps(["Solid", "Blink", "Rainbow"]).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(body) return self.send_response(404) self.end_headers() def do_POST(self): length = int(self.headers.get("Content-Length", "0")) received.append(json.loads(self.rfile.read(length).decode("utf-8"))) self.send_response(200) self.end_headers() self.wfile.write(b"{}") def log_message(self, *_args): return None server = ThreadingHTTPServer(("127.0.0.1", 0), Handler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() try: db = Database(tmp_path / "streamdeck.sqlite") WLEDPlugin().execute_action( PluginContext(DummyApp(db)), "mega_update", { "device_ip": f"http://127.0.0.1:{server.server_port}", "power": "on", "global_brightness": 80, "segment_id": 0, "color": "#112233", "segment_brightness": 40, "effect": "Rainbow", "speed": 120, "intensity": 200, "pixel_start": 0, "pixel_stop": 8, "pixel_color": "#ff0000", "pixel_brightness": 25, "extra_json": '{"transition":4}', }, None, ) finally: server.shutdown() thread.join(timeout=5) assert received == [ {"on": True, "bri": 204}, { "on": True, "bri": 204, "seg": [{"id": 0, "col": ["112233"], "bri": 102, "fx": 2, "sx": 120, "ix": 200, "i": [0, 8, "400000"]}], "transition": 4, } ] def test_clipboard_plugin_copy_renders_event_tokens(tmp_path: Path): db = Database(tmp_path / "streamdeck.sqlite") plugin = ClipboardToolsPlugin() writes = [] plugin._set_clipboard_text = writes.append asyncio.run( plugin.execute_action( PluginContext(DummyApp(db)), "copy_text", {"text": "Button {{event.button}} went {{event.event}}"}, {"button": 7, "event": "up"}, ) ) assert writes == ["Button 7 went up"] def test_clipboard_plugin_paste_restores_previous_value(tmp_path: Path): db = Database(tmp_path / "streamdeck.sqlite") plugin = ClipboardToolsPlugin() writes = [] presses = [] plugin._get_clipboard_text = lambda: "original clipboard" plugin._set_clipboard_text = writes.append app = DummyApp(db) app.actions = type( "Actions", (), {"keys": type("Keys", (), {"press_combo": staticmethod(lambda combo: _record_combo(presses, combo))})()}, )() asyncio.run( plugin.execute_action( PluginContext(app), "paste_snippet", {"text": "Snippet {{event.button}}", "restore_clipboard": True, "restore_delay_ms": 0}, {"button": 2}, ) ) assert writes == ["Snippet 2", "original clipboard"] assert presses == ["ctrl+v"] def test_clipboard_plugin_transform_modes_cover_common_cases(): plugin = ClipboardToolsPlugin() assert plugin._transform_text("Mixed Value", {"transform": "uppercase"}) == "MIXED VALUE" assert plugin._transform_text(" Mixed Value ", {"transform": "collapse_whitespace"}) == "Mixed Value" assert plugin._transform_text("Mixed Value", {"transform": "snake_case"}) == "mixed_value" assert plugin._transform_text("mixed value", {"transform": "camel_case"}) == "mixedValue" assert plugin._transform_text("hello world", {"transform": "replace", "find": "world", "replace": "deck"}) == "hello deck" assert plugin._transform_text("world", {"transform": "prefix", "extra_text": "hello "}) == "hello world" async def _record_combo(store, combo): store.append(combo)