287 lines
10 KiB
Python
287 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import threading
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
|
|
from backend.database import Database
|
|
from backend.services.actions import ActionEngine
|
|
from backend.services.pico import parse_pico_line
|
|
from backend.services.plugins import PluginContext, PluginManager
|
|
from plugins.clipboard_tools import ClipboardToolsPlugin
|
|
from plugins.http_requests import HTTPRequestsPlugin
|
|
|
|
|
|
class DummyWs:
|
|
async def broadcast(self, *_args, **_kwargs):
|
|
return None
|
|
|
|
|
|
class DummyApp:
|
|
def __init__(self, db):
|
|
self.db = db
|
|
self.ws = DummyWs()
|
|
self.actions = type("Actions", (), {"keys": type("Keys", (), {"press_combo": staticmethod(_noop_press_combo)})()})()
|
|
|
|
def public_state(self):
|
|
return self.db.state()
|
|
|
|
|
|
async def _noop_press_combo(_combo):
|
|
return None
|
|
|
|
|
|
def test_parse_pico_event_accepts_current_firmware_json():
|
|
event = parse_pico_line('{"button":2,"pin":27,"event":"down","pressed":true}')
|
|
assert event is not None
|
|
assert event.button == 2
|
|
assert event.pin == 27
|
|
assert event.event == "down"
|
|
assert event.pressed is True
|
|
|
|
|
|
def test_parse_pico_event_ignores_startup_text():
|
|
assert parse_pico_line("streamdeck-pico ready") is None
|
|
assert parse_pico_line("pins=28,27,26") is None
|
|
|
|
|
|
def test_database_seeds_profiles_folders_and_buttons(tmp_path: Path):
|
|
db = Database(tmp_path / "streamdeck.sqlite")
|
|
state = db.state()
|
|
assert len(state["profiles"]) == 1
|
|
assert len(state["folders"]) == 1
|
|
assert len(state["buttons"]) == 10
|
|
|
|
|
|
def test_move_physical_button_swaps_mapping(tmp_path: Path):
|
|
db = Database(tmp_path / "streamdeck.sqlite")
|
|
folder_id = db.get_setting("active_folder_id")
|
|
db.move_physical_button(folder_id, position=1, physical_button=2)
|
|
buttons = [button for button in db.state()["buttons"] if button["folder_id"] == folder_id]
|
|
by_pos = {button["position"]: button["physical_button"] for button in buttons}
|
|
assert by_pos[1] == 2
|
|
assert by_pos[2] == 1
|
|
|
|
|
|
def test_physical_layout_syncs_to_all_profiles_and_folders_without_actions(tmp_path: Path):
|
|
db = Database(tmp_path / "streamdeck.sqlite")
|
|
first_profile_id = db.get_setting("active_profile_id")
|
|
canonical_folder_id = db.get_setting("active_folder_id")
|
|
extra_folder_id = db.create_folder(first_profile_id, canonical_folder_id, "Extra")
|
|
second_profile_id = db.create_profile("Second")
|
|
second_root_id = db.get_setting("active_folder_id")
|
|
|
|
second_button = next(
|
|
button for button in db.state()["buttons"]
|
|
if button["folder_id"] == second_root_id and button["position"] == 1
|
|
)
|
|
db.update_button(second_button["id"], {"action_type": "key_combo", "action_config": {"combo": "ctrl+s"}})
|
|
|
|
db.move_physical_button(canonical_folder_id, position=1, physical_button=2)
|
|
state = db.state()
|
|
for folder_id in {canonical_folder_id, extra_folder_id, second_root_id}:
|
|
buttons = [button for button in state["buttons"] if button["folder_id"] == folder_id]
|
|
by_pos = {button["position"]: button["physical_button"] for button in buttons}
|
|
assert by_pos[1] == 2
|
|
assert by_pos[2] == 1
|
|
|
|
updated_second_button = db.get_button(second_button["id"])
|
|
assert updated_second_button["action_type"] == "key_combo"
|
|
assert updated_second_button["action_config"] == {"combo": "ctrl+s"}
|
|
|
|
|
|
def test_physical_layout_cannot_be_changed_outside_canonical_folder(tmp_path: Path):
|
|
db = Database(tmp_path / "streamdeck.sqlite")
|
|
profile_id = db.get_setting("active_profile_id")
|
|
root_id = db.get_setting("active_folder_id")
|
|
folder_id = db.create_folder(profile_id, root_id, "Locked")
|
|
button = next(button for button in db.state()["buttons"] if button["folder_id"] == folder_id and button["position"] == 1)
|
|
|
|
try:
|
|
db.move_physical_button(folder_id, position=1, physical_button=2)
|
|
except ValueError as exc:
|
|
assert "first profile's root folder" in str(exc)
|
|
else:
|
|
raise AssertionError("Expected non-canonical folder move to fail.")
|
|
|
|
try:
|
|
db.update_button(button["id"], {"physical_button": 2})
|
|
except ValueError as exc:
|
|
assert "first profile's root folder" in str(exc)
|
|
else:
|
|
raise AssertionError("Expected non-canonical physical update to fail.")
|
|
|
|
|
|
def test_folder_rotation_moves_to_next_folder_and_wraps(tmp_path: Path):
|
|
db = Database(tmp_path / "streamdeck.sqlite")
|
|
profile_id = db.get_setting("active_profile_id")
|
|
root_id = db.get_setting("active_folder_id")
|
|
first_id = db.create_folder(profile_id, root_id, "First")
|
|
second_id = db.create_folder(profile_id, root_id, "Second")
|
|
db.set_setting("active_folder_id", root_id)
|
|
|
|
engine = ActionEngine(DummyApp(db))
|
|
asyncio.run(engine.execute("folder_rotation", {"direction": "next"}))
|
|
assert db.get_setting("active_folder_id") == first_id
|
|
|
|
asyncio.run(engine.execute("folder_rotation", {"direction": "next"}))
|
|
assert db.get_setting("active_folder_id") == second_id
|
|
|
|
asyncio.run(engine.execute("folder_rotation", {"direction": "next"}))
|
|
assert db.get_setting("active_folder_id") == root_id
|
|
|
|
|
|
def test_profiles_and_folders_can_be_renamed_and_deleted(tmp_path: Path):
|
|
db = Database(tmp_path / "streamdeck.sqlite")
|
|
profile_id = db.get_setting("active_profile_id")
|
|
root_id = db.get_setting("active_folder_id")
|
|
db.update_profile(profile_id, name="Renamed Profile")
|
|
db.update_folder(root_id, name="Renamed Root")
|
|
folder_id = db.create_folder(profile_id, root_id, "Temporary")
|
|
|
|
state = db.state()
|
|
assert state["profiles"][0]["name"] == "Renamed Profile"
|
|
assert next(folder for folder in state["folders"] if folder["id"] == root_id)["name"] == "Renamed Root"
|
|
|
|
db.delete_folder(folder_id)
|
|
assert all(folder["id"] != folder_id for folder in db.state()["folders"])
|
|
|
|
|
|
def test_first_profile_and_root_folder_are_protected(tmp_path: Path):
|
|
db = Database(tmp_path / "streamdeck.sqlite")
|
|
first_profile_id = db.get_setting("active_profile_id")
|
|
root_id = db.get_setting("active_folder_id")
|
|
|
|
try:
|
|
db.delete_profile(first_profile_id)
|
|
except ValueError as exc:
|
|
assert "first profile" in str(exc)
|
|
else:
|
|
raise AssertionError("Expected first profile delete to fail.")
|
|
|
|
try:
|
|
db.delete_folder(root_id)
|
|
except ValueError as exc:
|
|
assert "root folder" in str(exc)
|
|
else:
|
|
raise AssertionError("Expected root folder delete to fail.")
|
|
|
|
|
|
def test_plugin_failure_is_isolated(tmp_path: Path):
|
|
plugin_dir = tmp_path / "plugins"
|
|
plugin_dir.mkdir()
|
|
(plugin_dir / "bad.py").write_text("raise RuntimeError('boom')", encoding="utf-8")
|
|
manager = PluginManager(plugin_dir)
|
|
db = Database(tmp_path / "streamdeck.sqlite")
|
|
manager.load_all(PluginContext(DummyApp(db)))
|
|
plugins = manager.public_plugins()
|
|
assert plugins[0]["enabled"] is False
|
|
assert "boom" in plugins[0]["error"]
|
|
|
|
|
|
def test_http_request_plugin_posts_templated_json(tmp_path: Path):
|
|
received = {}
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def do_POST(self):
|
|
length = int(self.headers.get("Content-Length", "0"))
|
|
received["path"] = self.path
|
|
received["token"] = self.headers.get("X-Test-Token")
|
|
received["body"] = self.rfile.read(length).decode("utf-8")
|
|
self.send_response(200)
|
|
self.end_headers()
|
|
self.wfile.write(b'{"ok":true}')
|
|
|
|
def log_message(self, *_args):
|
|
return None
|
|
|
|
server = ThreadingHTTPServer(("127.0.0.1", 0), Handler)
|
|
thread = threading.Thread(target=server.serve_forever, daemon=True)
|
|
thread.start()
|
|
try:
|
|
db = Database(tmp_path / "streamdeck.sqlite")
|
|
plugin = HTTPRequestsPlugin()
|
|
plugin.execute_action(
|
|
PluginContext(DummyApp(db)),
|
|
"send_request",
|
|
{
|
|
"method": "POST",
|
|
"url": f"http://127.0.0.1:{server.server_port}/webhook/{{{{event.button}}}}",
|
|
"headers": [{"key": "X-Test-Token", "value": "button-{{event.button}}"}],
|
|
"body": '{"button":"{{event.button}}","event":"{{event.event}}"}',
|
|
"timeout": 3,
|
|
"log_response": True,
|
|
},
|
|
{"button": 4, "event": "down"},
|
|
)
|
|
finally:
|
|
server.shutdown()
|
|
thread.join(timeout=5)
|
|
|
|
assert received["path"] == "/webhook/4"
|
|
assert received["token"] == "button-4"
|
|
assert json.loads(received["body"]) == {"button": "4", "event": "down"}
|
|
|
|
|
|
def test_clipboard_plugin_copy_renders_event_tokens(tmp_path: Path):
|
|
db = Database(tmp_path / "streamdeck.sqlite")
|
|
plugin = ClipboardToolsPlugin()
|
|
writes = []
|
|
plugin._set_clipboard_text = writes.append
|
|
|
|
asyncio.run(
|
|
plugin.execute_action(
|
|
PluginContext(DummyApp(db)),
|
|
"copy_text",
|
|
{"text": "Button {{event.button}} went {{event.event}}"},
|
|
{"button": 7, "event": "up"},
|
|
)
|
|
)
|
|
|
|
assert writes == ["Button 7 went up"]
|
|
|
|
|
|
def test_clipboard_plugin_paste_restores_previous_value(tmp_path: Path):
|
|
db = Database(tmp_path / "streamdeck.sqlite")
|
|
plugin = ClipboardToolsPlugin()
|
|
writes = []
|
|
presses = []
|
|
plugin._get_clipboard_text = lambda: "original clipboard"
|
|
plugin._set_clipboard_text = writes.append
|
|
|
|
app = DummyApp(db)
|
|
app.actions = type(
|
|
"Actions",
|
|
(),
|
|
{"keys": type("Keys", (), {"press_combo": staticmethod(lambda combo: _record_combo(presses, combo))})()},
|
|
)()
|
|
|
|
asyncio.run(
|
|
plugin.execute_action(
|
|
PluginContext(app),
|
|
"paste_snippet",
|
|
{"text": "Snippet {{event.button}}", "restore_clipboard": True, "restore_delay_ms": 0},
|
|
{"button": 2},
|
|
)
|
|
)
|
|
|
|
assert writes == ["Snippet 2", "original clipboard"]
|
|
assert presses == ["ctrl+v"]
|
|
|
|
|
|
def test_clipboard_plugin_transform_modes_cover_common_cases():
|
|
plugin = ClipboardToolsPlugin()
|
|
|
|
assert plugin._transform_text("Mixed Value", {"transform": "uppercase"}) == "MIXED VALUE"
|
|
assert plugin._transform_text(" Mixed Value ", {"transform": "collapse_whitespace"}) == "Mixed Value"
|
|
assert plugin._transform_text("Mixed Value", {"transform": "snake_case"}) == "mixed_value"
|
|
assert plugin._transform_text("mixed value", {"transform": "camel_case"}) == "mixedValue"
|
|
assert plugin._transform_text("hello world", {"transform": "replace", "find": "world", "replace": "deck"}) == "hello deck"
|
|
assert plugin._transform_text("world", {"transform": "prefix", "extra_text": "hello "}) == "hello world"
|
|
|
|
|
|
async def _record_combo(store, combo):
|
|
store.append(combo)
|