Files

417 lines
18 KiB
Python

from __future__ import annotations
import json
import re
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
HEX_COLOR_RE = re.compile(r"^#?([0-9a-fA-F]{3}|[0-9a-fA-F]{6}|[0-9a-fA-F]{8})$")
def _device_field() -> dict[str, Any]:
return {
"id": "device_ip",
"label": "Device IP",
"type": "text",
"required": True,
"placeholder": "192.168.1.50",
}
class WLEDPlugin:
name = "WLED"
desc = "Control WLED devices by IP using the JSON API."
version = "0.1.0"
actions = [
{
"id": "power",
"name": "Power",
"desc": "Turn a WLED device on, off, or toggle it.",
"fields": [
_device_field(),
{
"id": "state",
"label": "State",
"type": "select",
"default": "toggle",
"options": [
{"label": "Toggle", "value": "toggle"},
{"label": "On", "value": "on"},
{"label": "Off", "value": "off"},
],
},
],
},
{
"id": "brightness",
"name": "Brightness",
"desc": "Set device brightness from 0 to 100 percent.",
"fields": [
_device_field(),
{"id": "brightness", "label": "Brightness 0-100", "type": "number", "default": 100},
],
},
{
"id": "color",
"name": "Color",
"desc": "Set a segment color from a hex value.",
"fields": [
_device_field(),
{"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0},
{"id": "color", "label": "Hex Color", "type": "text", "default": "#ff8800", "placeholder": "#ff8800"},
{"id": "brightness", "label": "Segment Brightness 0-100", "type": "number", "placeholder": "Optional"},
],
},
{
"id": "pixel_range",
"name": "Pixel Range",
"desc": "Set color and brightness for a specific pixel range.",
"fields": [
_device_field(),
{"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0},
{"id": "start_pixel", "label": "Start Pixel", "type": "number", "default": 0},
{"id": "stop_pixel", "label": "Stop Pixel (exclusive)", "type": "number", "default": 1},
{"id": "color", "label": "Hex Color", "type": "text", "default": "#ff8800", "placeholder": "#ff8800"},
{"id": "brightness", "label": "Range Brightness 0-100", "type": "number", "default": 100},
],
},
{
"id": "effect",
"name": "Effect",
"desc": "Set an effect, speed, intensity, and palette on a segment.",
"fields": [
_device_field(),
{"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0},
{"id": "effect", "label": "Effect ID / Name / r / ~", "type": "text", "required": True, "placeholder": "0, Rainbow, r, ~"},
{"id": "speed", "label": "Effect Speed 0-255", "type": "number", "placeholder": "Optional"},
{"id": "intensity", "label": "Effect Intensity 0-255", "type": "number", "placeholder": "Optional"},
{"id": "palette", "label": "Palette ID / r / ~", "type": "text", "placeholder": "Optional"},
],
},
{
"id": "mega_update",
"name": "Mega Update",
"desc": "Apply power, brightness, color, effect, pixel range, and extra WLED JSON together.",
"fields": [
_device_field(),
{
"id": "power",
"label": "Power",
"type": "select",
"default": "keep",
"options": [
{"label": "Keep", "value": "keep"},
{"label": "On", "value": "on"},
{"label": "Off", "value": "off"},
{"label": "Toggle", "value": "toggle"},
],
},
{"id": "global_brightness", "label": "Global Brightness 0-100", "type": "number", "placeholder": "Optional"},
{"id": "segment_id", "label": "Segment ID", "type": "number", "default": 0},
{"id": "color", "label": "Segment Hex Color", "type": "text", "placeholder": "Optional"},
{"id": "segment_brightness", "label": "Segment Brightness 0-100", "type": "number", "placeholder": "Optional"},
{"id": "effect", "label": "Effect ID / Name / r / ~", "type": "text", "placeholder": "Optional"},
{"id": "speed", "label": "Effect Speed 0-255", "type": "number", "placeholder": "Optional"},
{"id": "intensity", "label": "Effect Intensity 0-255", "type": "number", "placeholder": "Optional"},
{"id": "palette", "label": "Palette ID / r / ~", "type": "text", "placeholder": "Optional"},
{"id": "pixel_start", "label": "Pixel Start", "type": "number", "placeholder": "Optional"},
{"id": "pixel_stop", "label": "Pixel Stop (exclusive)", "type": "number", "placeholder": "Optional"},
{"id": "pixel_color", "label": "Pixel Range Hex Color", "type": "text", "placeholder": "Optional"},
{"id": "pixel_brightness", "label": "Pixel Range Brightness 0-100", "type": "number", "placeholder": "Optional"},
{
"id": "extra_json",
"label": "Extra WLED JSON",
"type": "json",
"placeholder": "{\n \"transition\": 4\n}",
},
],
},
]
def on_load(self, ctx):
ctx.db.add_event("plugin.loaded", {"plugin": self.name})
def execute_action(self, ctx, action_id, config, event):
if action_id == "power":
payload = {"on": self._power_value(config.get("state", "toggle"))}
self._post_state(config, payload, ctx, action_id)
return
if action_id == "brightness":
payload = {"bri": self._brightness(config.get("brightness", 100))}
self._post_state(config, payload, ctx, action_id)
return
if action_id == "color":
segment = self._segment(config)
segment["col"] = [self._hex_color(config.get("color", "#ff8800"))]
if self._has_value(config, "brightness"):
segment["bri"] = self._brightness(config.get("brightness"))
self._post_state(config, {"seg": [segment]}, ctx, action_id)
return
if action_id == "pixel_range":
segment = self._pixel_range_segment(
config,
"start_pixel",
"stop_pixel",
"color",
"brightness",
)
self._post_state(config, {"seg": [segment]}, ctx, action_id)
return
if action_id == "effect":
segment = self._effect_segment(config, require_effect=True)
self._post_state(config, {"seg": [segment]}, ctx, action_id)
return
if action_id == "mega_update":
payload = self._mega_payload(config)
preflight_payload = self._pixel_range_preflight_payload(payload)
if preflight_payload:
self._post_state(config, preflight_payload, ctx, f"{action_id}.prepare")
self._post_state(config, payload, ctx, action_id)
return
raise ValueError(f"Unknown WLED action '{action_id}'.")
def _mega_payload(self, config: dict[str, Any]) -> dict[str, Any]:
payload: dict[str, Any] = {}
power = str(config.get("power", "keep") or "keep").lower()
if power != "keep":
payload["on"] = self._power_value(power)
if self._has_value(config, "global_brightness"):
payload["bri"] = self._brightness(config.get("global_brightness"))
segment = self._segment(config)
if self._has_value(config, "color"):
segment["col"] = [self._hex_color(config.get("color"))]
if self._has_value(config, "segment_brightness"):
segment["bri"] = self._brightness(config.get("segment_brightness"))
segment.update(self._effect_segment(config, require_effect=False))
has_pixel_range = any(self._has_value(config, key) for key in ("pixel_start", "pixel_stop", "pixel_color", "pixel_brightness"))
if has_pixel_range:
segment.update(
self._pixel_range_segment(
config,
"pixel_start",
"pixel_stop",
"pixel_color",
"pixel_brightness",
)
)
if len(segment) > 1:
payload["seg"] = [segment]
extra = self._extra_json(config.get("extra_json"))
self._deep_merge(payload, extra)
if not payload:
raise ValueError("Mega Update needs at least one value to update.")
return payload
def _post_state(self, config: dict[str, Any], payload: dict[str, Any], ctx, action_id: str) -> None:
url = f"{self._base_url(config)}/json/state"
data = json.dumps(payload, separators=(",", ":")).encode("utf-8")
request = urllib.request.Request(
url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
timeout = max(1.0, min(30.0, float(config.get("timeout", 5) or 5)))
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
body = response.read(2048).decode("utf-8", errors="replace")
status = response.status
except urllib.error.HTTPError as exc:
body = exc.read(2048).decode("utf-8", errors="replace")
raise RuntimeError(f"WLED request failed with {exc.code}: {body[:300]}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"WLED request failed: {exc.reason}") from exc
ctx.db.add_event(
"plugin.wled_request",
{
"plugin": self.name,
"action_id": action_id,
"url": url,
"status": status,
"payload": payload,
"response": body[:1000],
},
)
def _pixel_range_preflight_payload(self, payload: dict[str, Any]) -> dict[str, Any]:
segments = payload.get("seg")
if not isinstance(segments, list) or not any(isinstance(segment, dict) and "i" in segment for segment in segments):
return {}
preflight = {}
if payload.get("on") is True:
preflight["on"] = True
if "bri" in payload:
preflight["bri"] = payload["bri"]
return preflight
def _base_url(self, config: dict[str, Any]) -> str:
device_ip = str(config.get("device_ip", "") or "").strip()
if not device_ip:
raise ValueError("WLED Device IP is required.")
if "://" not in device_ip:
device_ip = f"http://{device_ip}"
parsed = urllib.parse.urlparse(device_ip)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ValueError("WLED Device IP must be an IP/host, optionally with http:// or https://.")
return urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path.rstrip("/"), "", "", "")).rstrip("/")
def _segment(self, config: dict[str, Any]) -> dict[str, Any]:
return {"id": self._int_range(config.get("segment_id", 0), 0, 999, "Segment ID")}
def _effect_segment(self, config: dict[str, Any], require_effect: bool) -> dict[str, Any]:
segment: dict[str, Any] = {}
if self._has_value(config, "effect"):
segment["fx"] = self._wled_effect(config.get("effect"), config)
elif require_effect:
raise ValueError("Effect is required.")
if self._has_value(config, "speed"):
segment["sx"] = self._int_range(config.get("speed"), 0, 255, "Effect Speed")
if self._has_value(config, "intensity"):
segment["ix"] = self._int_range(config.get("intensity"), 0, 255, "Effect Intensity")
if self._has_value(config, "palette"):
segment["pal"] = self._wled_id_or_command(config.get("palette"), "Palette")
return segment
def _pixel_range_segment(
self,
config: dict[str, Any],
start_key: str,
stop_key: str,
color_key: str,
brightness_key: str,
) -> dict[str, Any]:
start = self._int_range(config.get(start_key, 0), 0, 99999, "Start Pixel")
stop = self._int_range(config.get(stop_key, start + 1), start + 1, 100000, "Stop Pixel")
color = self._hex_color(config.get(color_key, "#ff8800"))
brightness = self._percent(config.get(brightness_key, 100), "Range Brightness")
return self._segment(config) | {"i": [start, stop, self._scale_hex(color, brightness)]}
def _power_value(self, value: Any) -> bool | str:
normalized = str(value or "toggle").strip().lower()
if normalized == "on":
return True
if normalized == "off":
return False
if normalized == "toggle":
return "t"
raise ValueError("Power state must be on, off, or toggle.")
def _brightness(self, value: Any) -> int:
percent = self._percent(value, "Brightness")
return int((percent * 255 / 100) + 0.5)
def _percent(self, value: Any, label: str) -> int:
return self._int_range(value, 0, 100, f"{label} 0-100")
def _int_range(self, value: Any, minimum: int, maximum: int, label: str) -> int:
try:
parsed = int(float(str(value).strip()))
except (TypeError, ValueError) as exc:
raise ValueError(f"{label} must be a number.") from exc
if parsed < minimum or parsed > maximum:
raise ValueError(f"{label} must be between {minimum} and {maximum}.")
return parsed
def _hex_color(self, value: Any) -> str:
raw = str(value or "").strip()
match = HEX_COLOR_RE.match(raw)
if not match:
raise ValueError("Hex color must be #RGB, #RRGGBB, or #RRGGBBAA.")
color = match.group(1).upper()
if len(color) == 3:
color = "".join(channel * 2 for channel in color)
return color
def _scale_hex(self, color: str, brightness: int) -> str:
if len(color) not in {6, 8}:
raise ValueError("Pixel range colors must be RGB or RGBW hex.")
scaled = []
for index in range(0, len(color), 2):
channel = int(color[index : index + 2], 16)
scaled.append(f"{int((channel * brightness / 100) + 0.5):02X}")
return "".join(scaled)
def _wled_effect(self, value: Any, config: dict[str, Any]) -> int | str:
raw = str(value or "").strip()
parsed = self._wled_id_or_command(raw, "Effect")
if isinstance(parsed, int) or parsed in {"r", "~", "~-"} or re.fullmatch(r"~?-?\d+(r)?", str(parsed)):
return parsed
effects = self._get_json_list(config, "/json/eff", "effects")
for index, name in enumerate(effects):
if str(name).strip().lower() == raw.lower():
return index
raise ValueError(f"Effect '{raw}' was not found on the WLED device.")
def _wled_id_or_command(self, value: Any, label: str) -> int | str:
raw = str(value or "").strip()
if not raw:
raise ValueError(f"{label} is required.")
if raw.lower() in {"r", "~", "~-"}:
return raw.lower()
try:
parsed = int(raw)
except ValueError:
if re.fullmatch(r"~?-?\d+(r)?", raw):
return raw
return raw
if parsed < 0:
raise ValueError(f"{label} must not be negative.")
return parsed
def _get_json_list(self, config: dict[str, Any], path: str, label: str) -> list[Any]:
url = f"{self._base_url(config)}{path}"
request = urllib.request.Request(url, method="GET")
timeout = max(1.0, min(30.0, float(config.get("timeout", 5) or 5)))
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
parsed = json.loads(response.read().decode("utf-8", errors="replace"))
except urllib.error.URLError as exc:
raise RuntimeError(f"Could not read WLED {label}: {exc.reason}") from exc
except json.JSONDecodeError as exc:
raise RuntimeError(f"WLED {label} response was not valid JSON.") from exc
if not isinstance(parsed, list):
raise RuntimeError(f"WLED {label} response was not a list.")
return parsed
def _extra_json(self, value: Any) -> dict[str, Any]:
raw = str(value or "").strip()
if not raw:
return {}
try:
parsed = json.loads(raw)
except json.JSONDecodeError as exc:
raise ValueError(f"Extra WLED JSON is invalid: {exc.msg}") from exc
if not isinstance(parsed, dict):
raise ValueError("Extra WLED JSON must be an object.")
return parsed
def _deep_merge(self, base: dict[str, Any], extra: dict[str, Any]) -> None:
for key, value in extra.items():
if isinstance(base.get(key), dict) and isinstance(value, dict):
self._deep_merge(base[key], value)
else:
base[key] = value
def _has_value(self, config: dict[str, Any], key: str) -> bool:
return key in config and str(config.get(key, "")).strip() != ""
PLUGIN = WLEDPlugin()