From 195880078aef1e1884542f8731492387a9477fe8 Mon Sep 17 00:00:00 2001 From: space Date: Thu, 21 May 2026 21:34:58 +0200 Subject: [PATCH] Add WLED plugin and backend tests --- plugins/wled.py | 416 ++++++++++++++++++++++++++++++++++++++++++ tests/test_backend.py | 146 +++++++++++++++ 2 files changed, 562 insertions(+) create mode 100644 plugins/wled.py diff --git a/plugins/wled.py b/plugins/wled.py new file mode 100644 index 0000000..46f9f66 --- /dev/null +++ b/plugins/wled.py @@ -0,0 +1,416 @@ +from __future__ import annotations + +import json +import re +import urllib.error +import urllib.parse +import urllib.request +from typing import Any + + +HEX_COLOR_RE = re.compile(r"^#?([0-9a-fA-F]{3}|[0-9a-fA-F]{6}|[0-9a-fA-F]{8})$") + + +def _device_field() -> dict[str, Any]: + return { + "id": "device_ip", + "label": "Device IP", + "type": "text", + "required": True, + "placeholder": "192.168.1.50", + } + + +class WLEDPlugin: + name = "WLED" + desc = "Control WLED devices by IP using the JSON API." + version = "0.1.0" + actions = [ + { + "id": "power", + "name": "Power", + "desc": "Turn a WLED device on, off, or toggle it.", + "fields": [ + _device_field(), + { + "id": "state", + "label": "State", + "type": "select", + "default": "toggle", + "options": [ + {"label": "Toggle", "value": "toggle"}, + {"label": "On", "value": "on"}, + {"label": "Off", "value": "off"}, + ], + }, + ], + }, + { + "id": "brightness", + "name": "Brightness", + "desc": "Set device brightness from 0 to 100 percent.", + "fields": [ + _device_field(), + {"id": "brightness", "label": "Brightness 0-100", "type": "number", "default": 100}, + ], + }, + { + "id": "color", + "name": "Color", + "desc": "Set a segment color from a hex value.", + "fields": [ + _device_field(), + {"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0}, + {"id": "color", "label": "Hex Color", "type": "text", "default": "#ff8800", "placeholder": "#ff8800"}, + {"id": "brightness", "label": "Segment Brightness 0-100", "type": "number", "placeholder": "Optional"}, + ], + }, + { + "id": "pixel_range", + "name": "Pixel Range", + "desc": "Set color and brightness for a specific pixel range.", + "fields": [ + _device_field(), + {"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0}, + {"id": "start_pixel", "label": "Start Pixel", "type": "number", "default": 0}, + {"id": "stop_pixel", "label": "Stop Pixel (exclusive)", "type": "number", "default": 1}, + {"id": "color", "label": "Hex Color", "type": "text", "default": "#ff8800", "placeholder": "#ff8800"}, + {"id": "brightness", "label": "Range Brightness 0-100", "type": "number", "default": 100}, + ], + }, + { + "id": "effect", + "name": "Effect", + "desc": "Set an effect, speed, intensity, and palette on a segment.", + "fields": [ + _device_field(), + {"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0}, + {"id": "effect", "label": "Effect ID / Name / r / ~", "type": "text", "required": True, "placeholder": "0, Rainbow, r, ~"}, + {"id": "speed", "label": "Effect Speed 0-255", "type": "number", "placeholder": "Optional"}, + {"id": "intensity", "label": "Effect Intensity 0-255", "type": "number", "placeholder": "Optional"}, + {"id": "palette", "label": "Palette ID / r / ~", "type": "text", "placeholder": "Optional"}, + ], + }, + { + "id": "mega_update", + "name": "Mega Update", + "desc": "Apply power, brightness, color, effect, pixel range, and extra WLED JSON together.", + "fields": [ + _device_field(), + { + "id": "power", + "label": "Power", + "type": "select", + "default": "keep", + "options": [ + {"label": "Keep", "value": "keep"}, + {"label": "On", "value": "on"}, + {"label": "Off", "value": "off"}, + {"label": "Toggle", "value": "toggle"}, + ], + }, + {"id": "global_brightness", "label": "Global Brightness 0-100", "type": "number", "placeholder": "Optional"}, + {"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0}, + {"id": "color", "label": "Segment Hex Color", "type": "text", "placeholder": "Optional"}, + {"id": "segment_brightness", "label": "Segment Brightness 0-100", "type": "number", "placeholder": "Optional"}, + {"id": "effect", "label": "Effect ID / Name / r / ~", "type": "text", "placeholder": "Optional"}, + {"id": "speed", "label": "Effect Speed 0-255", "type": "number", "placeholder": "Optional"}, + {"id": "intensity", "label": "Effect Intensity 0-255", "type": "number", "placeholder": "Optional"}, + {"id": "palette", "label": "Palette ID / r / ~", "type": "text", "placeholder": "Optional"}, + {"id": "pixel_start", "label": "Pixel Start", "type": "number", "placeholder": "Optional"}, + {"id": "pixel_stop", "label": "Pixel Stop (exclusive)", "type": "number", "placeholder": "Optional"}, + {"id": "pixel_color", "label": "Pixel Range Hex Color", "type": "text", "placeholder": "Optional"}, + {"id": "pixel_brightness", "label": "Pixel Range Brightness 0-100", "type": "number", "placeholder": "Optional"}, + { + "id": "extra_json", + "label": "Extra WLED JSON", + "type": "json", + "placeholder": "{\n \"transition\": 4\n}", + }, + ], + }, + ] + + def on_load(self, ctx): + ctx.db.add_event("plugin.loaded", {"plugin": self.name}) + + def execute_action(self, ctx, action_id, config, event): + if action_id == "power": + payload = {"on": self._power_value(config.get("state", "toggle"))} + self._post_state(config, payload, ctx, action_id) + return + + if action_id == "brightness": + payload = {"bri": self._brightness(config.get("brightness", 100))} + self._post_state(config, payload, ctx, action_id) + return + + if action_id == "color": + segment = self._segment(config) + segment["col"] = [self._hex_color(config.get("color", "#ff8800"))] + if self._has_value(config, "brightness"): + segment["bri"] = self._brightness(config.get("brightness")) + self._post_state(config, {"seg": [segment]}, ctx, action_id) + return + + if action_id == "pixel_range": + segment = self._pixel_range_segment( + config, + "start_pixel", + "stop_pixel", + "color", + "brightness", + ) + self._post_state(config, {"seg": [segment]}, ctx, action_id) + return + + if action_id == "effect": + segment = self._effect_segment(config, require_effect=True) + self._post_state(config, {"seg": [segment]}, ctx, action_id) + return + + if action_id == "mega_update": + payload = self._mega_payload(config) + preflight_payload = self._pixel_range_preflight_payload(payload) + if preflight_payload: + self._post_state(config, preflight_payload, ctx, f"{action_id}.prepare") + self._post_state(config, payload, ctx, action_id) + return + + raise ValueError(f"Unknown WLED action '{action_id}'.") + + def _mega_payload(self, config: dict[str, Any]) -> dict[str, Any]: + payload: dict[str, Any] = {} + power = str(config.get("power", "keep") or "keep").lower() + if power != "keep": + payload["on"] = self._power_value(power) + if self._has_value(config, "global_brightness"): + payload["bri"] = self._brightness(config.get("global_brightness")) + + segment = self._segment(config) + if self._has_value(config, "color"): + segment["col"] = [self._hex_color(config.get("color"))] + if self._has_value(config, "segment_brightness"): + segment["bri"] = self._brightness(config.get("segment_brightness")) + segment.update(self._effect_segment(config, require_effect=False)) + + has_pixel_range = any(self._has_value(config, key) for key in ("pixel_start", "pixel_stop", "pixel_color", "pixel_brightness")) + if has_pixel_range: + segment.update( + self._pixel_range_segment( + config, + "pixel_start", + "pixel_stop", + "pixel_color", + "pixel_brightness", + ) + ) + + if len(segment) > 1: + payload["seg"] = [segment] + + extra = self._extra_json(config.get("extra_json")) + self._deep_merge(payload, extra) + + if not payload: + raise ValueError("Mega Update needs at least one value to update.") + return payload + + def _post_state(self, config: dict[str, Any], payload: dict[str, Any], ctx, action_id: str) -> None: + url = f"{self._base_url(config)}/json/state" + data = json.dumps(payload, separators=(",", ":")).encode("utf-8") + request = urllib.request.Request( + url, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + timeout = max(1.0, min(30.0, float(config.get("timeout", 5) or 5))) + try: + with urllib.request.urlopen(request, timeout=timeout) as response: + body = response.read(2048).decode("utf-8", errors="replace") + status = response.status + except urllib.error.HTTPError as exc: + body = exc.read(2048).decode("utf-8", errors="replace") + raise RuntimeError(f"WLED request failed with {exc.code}: {body[:300]}") from exc + except urllib.error.URLError as exc: + raise RuntimeError(f"WLED request failed: {exc.reason}") from exc + + ctx.db.add_event( + "plugin.wled_request", + { + "plugin": self.name, + "action_id": action_id, + "url": url, + "status": status, + "payload": payload, + "response": body[:1000], + }, + ) + + def _pixel_range_preflight_payload(self, payload: dict[str, Any]) -> dict[str, Any]: + segments = payload.get("seg") + if not isinstance(segments, list) or not any(isinstance(segment, dict) and "i" in segment for segment in segments): + return {} + preflight = {} + if payload.get("on") is True: + preflight["on"] = True + if "bri" in payload: + preflight["bri"] = payload["bri"] + return preflight + + def _base_url(self, config: dict[str, Any]) -> str: + device_ip = str(config.get("device_ip", "") or "").strip() + if not device_ip: + raise ValueError("WLED Device IP is required.") + if "://" not in device_ip: + device_ip = f"http://{device_ip}" + parsed = urllib.parse.urlparse(device_ip) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise ValueError("WLED Device IP must be an IP/host, optionally with http:// or https://.") + return urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path.rstrip("/"), "", "", "")).rstrip("/") + + def _segment(self, config: dict[str, Any]) -> dict[str, Any]: + return {"id": self._int_range(config.get("segment_id", 0), 0, 999, "Segment ID")} + + def _effect_segment(self, config: dict[str, Any], require_effect: bool) -> dict[str, Any]: + segment: dict[str, Any] = {} + if self._has_value(config, "effect"): + segment["fx"] = self._wled_effect(config.get("effect"), config) + elif require_effect: + raise ValueError("Effect is required.") + + if self._has_value(config, "speed"): + segment["sx"] = self._int_range(config.get("speed"), 0, 255, "Effect Speed") + if self._has_value(config, "intensity"): + segment["ix"] = self._int_range(config.get("intensity"), 0, 255, "Effect Intensity") + if self._has_value(config, "palette"): + segment["pal"] = self._wled_id_or_command(config.get("palette"), "Palette") + return segment + + def _pixel_range_segment( + self, + config: dict[str, Any], + start_key: str, + stop_key: str, + color_key: str, + brightness_key: str, + ) -> dict[str, Any]: + start = self._int_range(config.get(start_key, 0), 0, 99999, "Start Pixel") + stop = self._int_range(config.get(stop_key, start + 1), start + 1, 100000, "Stop Pixel") + color = self._hex_color(config.get(color_key, "#ff8800")) + brightness = self._percent(config.get(brightness_key, 100), "Range Brightness") + return self._segment(config) | {"i": [start, stop, self._scale_hex(color, brightness)]} + + def _power_value(self, value: Any) -> bool | str: + normalized = str(value or "toggle").strip().lower() + if normalized == "on": + return True + if normalized == "off": + return False + if normalized == "toggle": + return "t" + raise ValueError("Power state must be on, off, or toggle.") + + def _brightness(self, value: Any) -> int: + percent = self._percent(value, "Brightness") + return int((percent * 255 / 100) + 0.5) + + def _percent(self, value: Any, label: str) -> int: + return self._int_range(value, 0, 100, f"{label} 0-100") + + def _int_range(self, value: Any, minimum: int, maximum: int, label: str) -> int: + try: + parsed = int(float(str(value).strip())) + except (TypeError, ValueError) as exc: + raise ValueError(f"{label} must be a number.") from exc + if parsed < minimum or parsed > maximum: + raise ValueError(f"{label} must be between {minimum} and {maximum}.") + return parsed + + def _hex_color(self, value: Any) -> str: + raw = str(value or "").strip() + match = HEX_COLOR_RE.match(raw) + if not match: + raise ValueError("Hex color must be #RGB, #RRGGBB, or #RRGGBBAA.") + color = match.group(1).upper() + if len(color) == 3: + color = "".join(channel * 2 for channel in color) + return color + + def _scale_hex(self, color: str, brightness: int) -> str: + if len(color) not in {6, 8}: + raise ValueError("Pixel range colors must be RGB or RGBW hex.") + scaled = [] + for index in range(0, len(color), 2): + channel = int(color[index : index + 2], 16) + scaled.append(f"{int((channel * brightness / 100) + 0.5):02X}") + return "".join(scaled) + + def _wled_effect(self, value: Any, config: dict[str, Any]) -> int | str: + raw = str(value or "").strip() + parsed = self._wled_id_or_command(raw, "Effect") + if isinstance(parsed, int) or parsed in {"r", "~", "~-"} or re.fullmatch(r"~?-?\d+(r)?", str(parsed)): + return parsed + + effects = self._get_json_list(config, "/json/eff", "effects") + for index, name in enumerate(effects): + if str(name).strip().lower() == raw.lower(): + return index + raise ValueError(f"Effect '{raw}' was not found on the WLED device.") + + def _wled_id_or_command(self, value: Any, label: str) -> int | str: + raw = str(value or "").strip() + if not raw: + raise ValueError(f"{label} is required.") + if raw.lower() in {"r", "~", "~-"}: + return raw.lower() + try: + parsed = int(raw) + except ValueError: + if re.fullmatch(r"~?-?\d+(r)?", raw): + return raw + return raw + if parsed < 0: + raise ValueError(f"{label} must not be negative.") + return parsed + + def _get_json_list(self, config: dict[str, Any], path: str, label: str) -> list[Any]: + url = f"{self._base_url(config)}{path}" + request = urllib.request.Request(url, method="GET") + timeout = max(1.0, min(30.0, float(config.get("timeout", 5) or 5))) + try: + with urllib.request.urlopen(request, timeout=timeout) as response: + parsed = json.loads(response.read().decode("utf-8", errors="replace")) + except urllib.error.URLError as exc: + raise RuntimeError(f"Could not read WLED {label}: {exc.reason}") from exc + except json.JSONDecodeError as exc: + raise RuntimeError(f"WLED {label} response was not valid JSON.") from exc + if not isinstance(parsed, list): + raise RuntimeError(f"WLED {label} response was not a list.") + return parsed + + def _extra_json(self, value: Any) -> dict[str, Any]: + raw = str(value or "").strip() + if not raw: + return {} + try: + parsed = json.loads(raw) + except json.JSONDecodeError as exc: + raise ValueError(f"Extra WLED JSON is invalid: {exc.msg}") from exc + if not isinstance(parsed, dict): + raise ValueError("Extra WLED JSON must be an object.") + return parsed + + def _deep_merge(self, base: dict[str, Any], extra: dict[str, Any]) -> None: + for key, value in extra.items(): + if isinstance(base.get(key), dict) and isinstance(value, dict): + self._deep_merge(base[key], value) + else: + base[key] = value + + def _has_value(self, config: dict[str, Any], key: str) -> bool: + return key in config and str(config.get(key, "")).strip() != "" + + +PLUGIN = WLEDPlugin() diff --git a/tests/test_backend.py b/tests/test_backend.py index 7de9a46..4876869 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -12,6 +12,7 @@ from backend.services.pico import parse_pico_line from backend.services.plugins import PluginContext, PluginManager from plugins.clipboard_tools import ClipboardToolsPlugin from plugins.http_requests import HTTPRequestsPlugin +from plugins.wled import WLEDPlugin class DummyWs: @@ -225,6 +226,151 @@ def test_http_request_plugin_posts_templated_json(tmp_path: Path): assert json.loads(received["body"]) == {"button": "4", "event": "down"} +def test_wled_plugin_sends_power_brightness_and_color_payloads(tmp_path: Path): + received = [] + + class Handler(BaseHTTPRequestHandler): + def do_POST(self): + length = int(self.headers.get("Content-Length", "0")) + received.append({"path": self.path, "body": json.loads(self.rfile.read(length).decode("utf-8"))}) + self.send_response(200) + self.end_headers() + self.wfile.write(b'{"success":true}') + + def log_message(self, *_args): + return None + + server = ThreadingHTTPServer(("127.0.0.1", 0), Handler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + try: + db = Database(tmp_path / "streamdeck.sqlite") + plugin = WLEDPlugin() + base_config = {"device_ip": f"127.0.0.1:{server.server_port}"} + + plugin.execute_action(PluginContext(DummyApp(db)), "power", base_config | {"state": "toggle"}, None) + plugin.execute_action(PluginContext(DummyApp(db)), "brightness", base_config | {"brightness": 50}, None) + plugin.execute_action( + PluginContext(DummyApp(db)), + "color", + base_config | {"segment_id": 2, "color": "#0f8", "brightness": 25}, + None, + ) + finally: + server.shutdown() + thread.join(timeout=5) + + assert [item["path"] for item in received] == ["/json/state", "/json/state", "/json/state"] + assert received[0]["body"] == {"on": "t"} + assert received[1]["body"] == {"bri": 128} + assert received[2]["body"] == {"seg": [{"id": 2, "col": ["00FF88"], "bri": 64}]} + + +def test_wled_plugin_pixel_range_scales_color_for_range_brightness(tmp_path: Path): + received = {} + + class Handler(BaseHTTPRequestHandler): + def do_POST(self): + length = int(self.headers.get("Content-Length", "0")) + received["body"] = json.loads(self.rfile.read(length).decode("utf-8")) + self.send_response(200) + self.end_headers() + self.wfile.write(b"{}") + + def log_message(self, *_args): + return None + + server = ThreadingHTTPServer(("127.0.0.1", 0), Handler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + try: + db = Database(tmp_path / "streamdeck.sqlite") + WLEDPlugin().execute_action( + PluginContext(DummyApp(db)), + "pixel_range", + { + "device_ip": f"127.0.0.1:{server.server_port}", + "segment_id": 1, + "start_pixel": 10, + "stop_pixel": 18, + "color": "#336699", + "brightness": 50, + }, + None, + ) + finally: + server.shutdown() + thread.join(timeout=5) + + assert received["body"] == {"seg": [{"id": 1, "i": [10, 18, "1A334D"]}]} + + +def test_wled_mega_update_combines_effect_color_range_and_extra_json(tmp_path: Path): + received = [] + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == "/json/eff": + body = json.dumps(["Solid", "Blink", "Rainbow"]).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(body) + return + self.send_response(404) + self.end_headers() + + def do_POST(self): + length = int(self.headers.get("Content-Length", "0")) + received.append(json.loads(self.rfile.read(length).decode("utf-8"))) + self.send_response(200) + self.end_headers() + self.wfile.write(b"{}") + + def log_message(self, *_args): + return None + + server = ThreadingHTTPServer(("127.0.0.1", 0), Handler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + try: + db = Database(tmp_path / "streamdeck.sqlite") + WLEDPlugin().execute_action( + PluginContext(DummyApp(db)), + "mega_update", + { + "device_ip": f"http://127.0.0.1:{server.server_port}", + "power": "on", + "global_brightness": 80, + "segment_id": 0, + "color": "#112233", + "segment_brightness": 40, + "effect": "Rainbow", + "speed": 120, + "intensity": 200, + "pixel_start": 0, + "pixel_stop": 8, + "pixel_color": "#ff0000", + "pixel_brightness": 25, + "extra_json": '{"transition":4}', + }, + None, + ) + finally: + server.shutdown() + thread.join(timeout=5) + + assert received == [ + {"on": True, "bri": 204}, + { + "on": True, + "bri": 204, + "seg": [{"id": 0, "col": ["112233"], "bri": 102, "fx": 2, "sx": 120, "ix": 200, "i": [0, 8, "400000"]}], + "transition": 4, + } + ] + + def test_clipboard_plugin_copy_renders_event_tokens(tmp_path: Path): db = Database(tmp_path / "streamdeck.sqlite") plugin = ClipboardToolsPlugin()