from __future__ import annotations import json import re import urllib.error import urllib.parse import urllib.request from typing import Any HEX_COLOR_RE = re.compile(r"^#?([0-9a-fA-F]{3}|[0-9a-fA-F]{6}|[0-9a-fA-F]{8})$") def _device_field() -> dict[str, Any]: return { "id": "device_ip", "label": "Device IP", "type": "text", "required": True, "placeholder": "192.168.1.50", } class WLEDPlugin: name = "WLED" desc = "Control WLED devices by IP using the JSON API." version = "0.1.0" actions = [ { "id": "power", "name": "Power", "desc": "Turn a WLED device on, off, or toggle it.", "fields": [ _device_field(), { "id": "state", "label": "State", "type": "select", "default": "toggle", "options": [ {"label": "Toggle", "value": "toggle"}, {"label": "On", "value": "on"}, {"label": "Off", "value": "off"}, ], }, ], }, { "id": "brightness", "name": "Brightness", "desc": "Set device brightness from 0 to 100 percent.", "fields": [ _device_field(), {"id": "brightness", "label": "Brightness 0-100", "type": "number", "default": 100}, ], }, { "id": "color", "name": "Color", "desc": "Set a segment color from a hex value.", "fields": [ _device_field(), {"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0}, {"id": "color", "label": "Hex Color", "type": "text", "default": "#ff8800", "placeholder": "#ff8800"}, {"id": "brightness", "label": "Segment Brightness 0-100", "type": "number", "placeholder": "Optional"}, ], }, { "id": "pixel_range", "name": "Pixel Range", "desc": "Set color and brightness for a specific pixel range.", "fields": [ _device_field(), {"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0}, {"id": "start_pixel", "label": "Start Pixel", "type": "number", "default": 0}, {"id": "stop_pixel", "label": "Stop Pixel (exclusive)", "type": "number", "default": 1}, {"id": "color", "label": "Hex Color", "type": "text", "default": "#ff8800", "placeholder": "#ff8800"}, {"id": "brightness", "label": "Range Brightness 0-100", "type": "number", "default": 100}, ], }, { "id": "effect", "name": "Effect", "desc": "Set an effect, speed, intensity, and palette on a segment.", "fields": [ _device_field(), {"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0}, {"id": "effect", "label": "Effect ID / Name / r / ~", "type": "text", "required": True, "placeholder": "0, Rainbow, r, ~"}, {"id": "speed", "label": "Effect Speed 0-255", "type": "number", "placeholder": "Optional"}, {"id": "intensity", "label": "Effect Intensity 0-255", "type": "number", "placeholder": "Optional"}, {"id": "palette", "label": "Palette ID / r / ~", "type": "text", "placeholder": "Optional"}, ], }, { "id": "mega_update", "name": "Mega Update", "desc": "Apply power, brightness, color, effect, pixel range, and extra WLED JSON together.", "fields": [ _device_field(), { "id": "power", "label": "Power", "type": "select", "default": "keep", "options": [ {"label": "Keep", "value": "keep"}, {"label": "On", "value": "on"}, {"label": "Off", "value": "off"}, {"label": "Toggle", "value": "toggle"}, ], }, {"id": "global_brightness", "label": "Global Brightness 0-100", "type": "number", "placeholder": "Optional"}, {"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0}, {"id": "color", "label": "Segment Hex Color", "type": "text", "placeholder": "Optional"}, {"id": "segment_brightness", "label": "Segment Brightness 0-100", "type": "number", "placeholder": "Optional"}, {"id": "effect", "label": "Effect ID / Name / r / ~", "type": "text", "placeholder": "Optional"}, {"id": "speed", "label": "Effect Speed 0-255", "type": "number", "placeholder": "Optional"}, {"id": "intensity", "label": "Effect Intensity 0-255", "type": "number", "placeholder": "Optional"}, {"id": "palette", "label": "Palette ID / r / ~", "type": "text", "placeholder": "Optional"}, {"id": "pixel_start", "label": "Pixel Start", "type": "number", "placeholder": "Optional"}, {"id": "pixel_stop", "label": "Pixel Stop (exclusive)", "type": "number", "placeholder": "Optional"}, {"id": "pixel_color", "label": "Pixel Range Hex Color", "type": "text", "placeholder": "Optional"}, {"id": "pixel_brightness", "label": "Pixel Range Brightness 0-100", "type": "number", "placeholder": "Optional"}, { "id": "extra_json", "label": "Extra WLED JSON", "type": "json", "placeholder": "{\n \"transition\": 4\n}", }, ], }, ] def on_load(self, ctx): ctx.db.add_event("plugin.loaded", {"plugin": self.name}) def execute_action(self, ctx, action_id, config, event): if action_id == "power": payload = {"on": self._power_value(config.get("state", "toggle"))} self._post_state(config, payload, ctx, action_id) return if action_id == "brightness": payload = {"bri": self._brightness(config.get("brightness", 100))} self._post_state(config, payload, ctx, action_id) return if action_id == "color": segment = self._segment(config) segment["col"] = [self._hex_color(config.get("color", "#ff8800"))] if self._has_value(config, "brightness"): segment["bri"] = self._brightness(config.get("brightness")) self._post_state(config, {"seg": [segment]}, ctx, action_id) return if action_id == "pixel_range": segment = self._pixel_range_segment( config, "start_pixel", "stop_pixel", "color", "brightness", ) self._post_state(config, {"seg": [segment]}, ctx, action_id) return if action_id == "effect": segment = self._effect_segment(config, require_effect=True) self._post_state(config, {"seg": [segment]}, ctx, action_id) return if action_id == "mega_update": payload = self._mega_payload(config) preflight_payload = self._pixel_range_preflight_payload(payload) if preflight_payload: self._post_state(config, preflight_payload, ctx, f"{action_id}.prepare") self._post_state(config, payload, ctx, action_id) return raise ValueError(f"Unknown WLED action '{action_id}'.") def _mega_payload(self, config: dict[str, Any]) -> dict[str, Any]: payload: dict[str, Any] = {} power = str(config.get("power", "keep") or "keep").lower() if power != "keep": payload["on"] = self._power_value(power) if self._has_value(config, "global_brightness"): payload["bri"] = self._brightness(config.get("global_brightness")) segment = self._segment(config) if self._has_value(config, "color"): segment["col"] = [self._hex_color(config.get("color"))] if self._has_value(config, "segment_brightness"): segment["bri"] = self._brightness(config.get("segment_brightness")) segment.update(self._effect_segment(config, require_effect=False)) has_pixel_range = any(self._has_value(config, key) for key in ("pixel_start", "pixel_stop", "pixel_color", "pixel_brightness")) if has_pixel_range: segment.update( self._pixel_range_segment( config, "pixel_start", "pixel_stop", "pixel_color", "pixel_brightness", ) ) if len(segment) > 1: payload["seg"] = [segment] extra = self._extra_json(config.get("extra_json")) self._deep_merge(payload, extra) if not payload: raise ValueError("Mega Update needs at least one value to update.") return payload def _post_state(self, config: dict[str, Any], payload: dict[str, Any], ctx, action_id: str) -> None: url = f"{self._base_url(config)}/json/state" data = json.dumps(payload, separators=(",", ":")).encode("utf-8") request = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"}, method="POST", ) timeout = max(1.0, min(30.0, float(config.get("timeout", 5) or 5))) try: with urllib.request.urlopen(request, timeout=timeout) as response: body = response.read(2048).decode("utf-8", errors="replace") status = response.status except urllib.error.HTTPError as exc: body = exc.read(2048).decode("utf-8", errors="replace") raise RuntimeError(f"WLED request failed with {exc.code}: {body[:300]}") from exc except urllib.error.URLError as exc: raise RuntimeError(f"WLED request failed: {exc.reason}") from exc ctx.db.add_event( "plugin.wled_request", { "plugin": self.name, "action_id": action_id, "url": url, "status": status, "payload": payload, "response": body[:1000], }, ) def _pixel_range_preflight_payload(self, payload: dict[str, Any]) -> dict[str, Any]: segments = payload.get("seg") if not isinstance(segments, list) or not any(isinstance(segment, dict) and "i" in segment for segment in segments): return {} preflight = {} if payload.get("on") is True: preflight["on"] = True if "bri" in payload: preflight["bri"] = payload["bri"] return preflight def _base_url(self, config: dict[str, Any]) -> str: device_ip = str(config.get("device_ip", "") or "").strip() if not device_ip: raise ValueError("WLED Device IP is required.") if "://" not in device_ip: device_ip = f"http://{device_ip}" parsed = urllib.parse.urlparse(device_ip) if parsed.scheme not in {"http", "https"} or not parsed.netloc: raise ValueError("WLED Device IP must be an IP/host, optionally with http:// or https://.") return urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path.rstrip("/"), "", "", "")).rstrip("/") def _segment(self, config: dict[str, Any]) -> dict[str, Any]: return {"id": self._int_range(config.get("segment_id", 0), 0, 999, "Segment ID")} def _effect_segment(self, config: dict[str, Any], require_effect: bool) -> dict[str, Any]: segment: dict[str, Any] = {} if self._has_value(config, "effect"): segment["fx"] = self._wled_effect(config.get("effect"), config) elif require_effect: raise ValueError("Effect is required.") if self._has_value(config, "speed"): segment["sx"] = self._int_range(config.get("speed"), 0, 255, "Effect Speed") if self._has_value(config, "intensity"): segment["ix"] = self._int_range(config.get("intensity"), 0, 255, "Effect Intensity") if self._has_value(config, "palette"): segment["pal"] = self._wled_id_or_command(config.get("palette"), "Palette") return segment def _pixel_range_segment( self, config: dict[str, Any], start_key: str, stop_key: str, color_key: str, brightness_key: str, ) -> dict[str, Any]: start = self._int_range(config.get(start_key, 0), 0, 99999, "Start Pixel") stop = self._int_range(config.get(stop_key, start + 1), start + 1, 100000, "Stop Pixel") color = self._hex_color(config.get(color_key, "#ff8800")) brightness = self._percent(config.get(brightness_key, 100), "Range Brightness") return self._segment(config) | {"i": [start, stop, self._scale_hex(color, brightness)]} def _power_value(self, value: Any) -> bool | str: normalized = str(value or "toggle").strip().lower() if normalized == "on": return True if normalized == "off": return False if normalized == "toggle": return "t" raise ValueError("Power state must be on, off, or toggle.") def _brightness(self, value: Any) -> int: percent = self._percent(value, "Brightness") return int((percent * 255 / 100) + 0.5) def _percent(self, value: Any, label: str) -> int: return self._int_range(value, 0, 100, f"{label} 0-100") def _int_range(self, value: Any, minimum: int, maximum: int, label: str) -> int: try: parsed = int(float(str(value).strip())) except (TypeError, ValueError) as exc: raise ValueError(f"{label} must be a number.") from exc if parsed < minimum or parsed > maximum: raise ValueError(f"{label} must be between {minimum} and {maximum}.") return parsed def _hex_color(self, value: Any) -> str: raw = str(value or "").strip() match = HEX_COLOR_RE.match(raw) if not match: raise ValueError("Hex color must be #RGB, #RRGGBB, or #RRGGBBAA.") color = match.group(1).upper() if len(color) == 3: color = "".join(channel * 2 for channel in color) return color def _scale_hex(self, color: str, brightness: int) -> str: if len(color) not in {6, 8}: raise ValueError("Pixel range colors must be RGB or RGBW hex.") scaled = [] for index in range(0, len(color), 2): channel = int(color[index : index + 2], 16) scaled.append(f"{int((channel * brightness / 100) + 0.5):02X}") return "".join(scaled) def _wled_effect(self, value: Any, config: dict[str, Any]) -> int | str: raw = str(value or "").strip() parsed = self._wled_id_or_command(raw, "Effect") if isinstance(parsed, int) or parsed in {"r", "~", "~-"} or re.fullmatch(r"~?-?\d+(r)?", str(parsed)): return parsed effects = self._get_json_list(config, "/json/eff", "effects") for index, name in enumerate(effects): if str(name).strip().lower() == raw.lower(): return index raise ValueError(f"Effect '{raw}' was not found on the WLED device.") def _wled_id_or_command(self, value: Any, label: str) -> int | str: raw = str(value or "").strip() if not raw: raise ValueError(f"{label} is required.") if raw.lower() in {"r", "~", "~-"}: return raw.lower() try: parsed = int(raw) except ValueError: if re.fullmatch(r"~?-?\d+(r)?", raw): return raw return raw if parsed < 0: raise ValueError(f"{label} must not be negative.") return parsed def _get_json_list(self, config: dict[str, Any], path: str, label: str) -> list[Any]: url = f"{self._base_url(config)}{path}" request = urllib.request.Request(url, method="GET") timeout = max(1.0, min(30.0, float(config.get("timeout", 5) or 5))) try: with urllib.request.urlopen(request, timeout=timeout) as response: parsed = json.loads(response.read().decode("utf-8", errors="replace")) except urllib.error.URLError as exc: raise RuntimeError(f"Could not read WLED {label}: {exc.reason}") from exc except json.JSONDecodeError as exc: raise RuntimeError(f"WLED {label} response was not valid JSON.") from exc if not isinstance(parsed, list): raise RuntimeError(f"WLED {label} response was not a list.") return parsed def _extra_json(self, value: Any) -> dict[str, Any]: raw = str(value or "").strip() if not raw: return {} try: parsed = json.loads(raw) except json.JSONDecodeError as exc: raise ValueError(f"Extra WLED JSON is invalid: {exc.msg}") from exc if not isinstance(parsed, dict): raise ValueError("Extra WLED JSON must be an object.") return parsed def _deep_merge(self, base: dict[str, Any], extra: dict[str, Any]) -> None: for key, value in extra.items(): if isinstance(base.get(key), dict) and isinstance(value, dict): self._deep_merge(base[key], value) else: base[key] = value def _has_value(self, config: dict[str, Any], key: str) -> bool: return key in config and str(config.get(key, "")).strip() != "" PLUGIN = WLEDPlugin()