Files
uptime-heart/listen.py
2026-02-02 19:27:41 +01:00

401 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Uptime Kuma to ESP32 LED Status Monitor
Polls Uptime Kuma API and controls ESP32 LEDs based on monitor status.
"""
import asyncio
import logging
import os
import sys
from typing import Optional
import aiohttp
from pythonkuma import UptimeKuma
from aioesphomeapi import APIClient, APIConnectionError, ReconnectLogic
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Configuration
ESP_IP = os.getenv("ESP_IP")
ESP_API_KEY = os.getenv("ESP_API_KEY", "")
KUMA_URL = os.getenv("KUMA_URL")
KUMA_KEY = os.getenv("KUMA_KEY")
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "30"))
KUMA_PUSH_URL = os.getenv("KUMA_PUSH_URL")
KUMA_PUSH_INTERVAL = int(os.getenv("KUMA_PUSH_INTERVAL", "60"))
class UptimeKumaMonitor:
"""Handles communication with Uptime Kuma API"""
def __init__(self, session: aiohttp.ClientSession, base_url: str, api_key: str):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.kuma = UptimeKuma(session, base_url, api_key)
async def get_all_monitors_status(self) -> bool:
"""
Check if all monitors are UP.
Returns True if all monitors are UP, False otherwise.
"""
try:
# Get all monitors using metrics() - returns dict of monitor_id: UptimeKumaMonitor
monitors = await self.kuma.metrics()
if not monitors:
logger.warning("No monitors found in Uptime Kuma")
return False
all_up = True
monitor_count = 0
down_monitors = []
for monitor_id, monitor in monitors.items():
monitor_count += 1
# MonitorStatus.UP has value 1
# Check monitor_status attribute (it's an enum)
if monitor.monitor_status.value != 1:
all_up = False
down_monitors.append(
monitor.monitor_name or f"Monitor {monitor_id}"
)
if down_monitors:
logger.warning(f"Monitors DOWN: {', '.join(down_monitors)}")
logger.info(f"Checked {monitor_count} monitors - All UP: {all_up}")
return all_up
except Exception as e:
logger.error(f"Failed to fetch Uptime Kuma status: {e}")
return False
class ESP32Controller:
"""Handles communication with ESP32 via ESPHome API"""
def __init__(self, host: str, encryption_key: str = ""):
self.host = host
self.encryption_key = encryption_key
self.client: Optional[APIClient] = None
self.reconnect_logic: Optional[ReconnectLogic] = None
self.current_state = None # Track current LED state to minimize traffic
self.light_keys = {} # Map light IDs to entity keys
self.switch_keys = {} # Map switch IDs to entity keys
self.switch_states = {} # Track switch states
async def connect(self):
"""Connect to ESP32"""
try:
self.client = APIClient(
address=self.host,
port=6053,
password="",
noise_psk=self.encryption_key if self.encryption_key else None,
)
await self.client.connect(login=True)
logger.info(f"Connected to ESP32 at {self.host}")
# Get entity list to map entity IDs to keys
entities, services = await self.client.list_entities_services()
switch_names = ['monitoring_enabled', 'force_red_led', 'force_green_led', 'quiet_hours_active']
for entity in entities:
if hasattr(entity, "object_id") and hasattr(entity, "key"):
# Separate switches from lights
if entity.object_id in switch_names:
self.switch_keys[entity.object_id] = entity.key
else:
self.light_keys[entity.object_id] = entity.key
logger.info(f"Found lights: {list(self.light_keys.keys())}")
logger.info(f"Found switches: {list(self.switch_keys.keys())}")
# Fetch initial states for switches
for entity in entities:
if hasattr(entity, "object_id") and entity.object_id in self.switch_keys:
if hasattr(entity, "state"):
self.switch_states[entity.object_id] = entity.state
logger.info(f"Initial state for '{entity.object_id}': {entity.state}")
# Subscribe to state changes for switches
def on_state_changed(state):
# Log all state changes for debugging
entity_name = None
entity_type = None
# Check if it's a switch
for switch_id, key in self.switch_keys.items():
if state.key == key:
self.switch_states[switch_id] = state.state
entity_name = switch_id
entity_type = "switch"
logger.info(
f"Switch '{switch_id}' changed to: {state.state}"
)
break
# Check if it's a light (for debugging)
if entity_name is None:
for light_id, key in self.light_keys.items():
if state.key == key:
entity_name = light_id
entity_type = "light"
logger.debug(
f"Light '{light_id}' state changed (key={state.key})"
)
break
# Log unknown entities
if entity_name is None:
logger.debug(f"Unknown entity state change (key={state.key})")
try:
self.client.subscribe_states(on_state_changed)
except Exception as e:
logger.error(f"Failed to subscribe to states: {e}")
# Set up reconnection logic
async def on_disconnect():
logger.warning("Disconnected from ESP32")
async def on_connect():
logger.info("Reconnected to ESP32")
self.reconnect_logic = ReconnectLogic(
client=self.client,
on_disconnect=on_disconnect,
on_connect=on_connect,
name=self.host,
)
return True
except (APIConnectionError, Exception) as e:
logger.error(f"Failed to connect to ESP32: {e}")
return False
def should_update_leds(self) -> bool:
"""Check if LEDs should be updated based on switch states"""
# Check if monitoring is enabled
monitoring_enabled = self.switch_states.get("monitoring_enabled", True)
if not monitoring_enabled:
logger.debug("Monitoring disabled - skipping LED update")
return False
# Check if quiet hours are active
quiet_hours = self.switch_states.get("quiet_hours", False)
if quiet_hours:
logger.debug("Quiet hours active - skipping LED update")
return False
return True
async def set_all_monitors_up(self):
"""Set green LED with breathing effect (all monitors UP)"""
if not self.should_update_leds():
logger.info("Skipping LED update due to switch states")
return
if self.current_state == "all_up":
return # Already in correct state
if not self.client:
logger.error("ESP32 client not connected")
return
try:
# Turn off red LED (sanitized name: rote_led)
if "rote_led" in self.light_keys:
self.client.light_command(key=self.light_keys["rote_led"], state=False)
# Turn on green LED with breathing effect (sanitized name: gr_ne_led)
if "gr_ne_led" in self.light_keys:
self.client.light_command(
key=self.light_keys["gr_ne_led"],
state=True,
brightness=1.0,
effect="Breathing",
)
self.current_state = "all_up"
logger.info("✓ All monitors UP - Green LED breathing")
except Exception as e:
logger.error(f"Failed to set all_up state: {e}")
async def set_monitors_down(self):
"""Set red LED with fast heartbeat effect (monitors DOWN)"""
if not self.should_update_leds():
logger.info("Skipping LED update due to switch states")
return
if self.current_state == "monitors_down":
return # Already in correct state
if not self.client:
logger.error("ESP32 client not connected")
return
try:
# Turn off green LED (sanitized name: gr_ne_led)
if "gr_ne_led" in self.light_keys:
self.client.light_command(key=self.light_keys["gr_ne_led"], state=False)
# Turn on red LED with fast heartbeat effect (sanitized name: rote_led)
if "rote_led" in self.light_keys:
self.client.light_command(
key=self.light_keys["rote_led"],
state=True,
brightness=1.0,
effect="Fast Heartbeat",
)
self.current_state = "monitors_down"
logger.warning("✗ Monitors DOWN - Red LED fast heartbeat")
except Exception as e:
logger.error(f"Failed to set monitors_down state: {e}")
async def disconnect(self):
"""Disconnect from ESP32"""
if self.client:
await self.client.disconnect()
logger.info("Disconnected from ESP32")
async def push_to_kuma(session: aiohttp.ClientSession, url: str):
"""Send a GET request to the Kuma push URL with retry logic"""
try:
# First attempt
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
logger.info(f"Kuma push: status {resp.status}")
return
except Exception as e:
logger.warning(f"Kuma push failed: {e} (retrying in 15s)")
await asyncio.sleep(15)
try:
# Retry attempt
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
logger.info(f"Kuma push retry succeeded: status {resp.status}")
except Exception as retry_e:
logger.error(f"Kuma push retry failed: {retry_e}")
async def kuma_push_monitor(
url: str, interval: int, session: Optional[aiohttp.ClientSession] = None
):
"""Periodically send GET requests to the Kuma push URL.
If a session is provided, it will be reused and not closed here.
If no session is provided, this function will create one and ensure it is
properly closed, even if the task is cancelled.
"""
own_session = False
if session is None:
session = aiohttp.ClientSession()
own_session = True
try:
while True:
await push_to_kuma(session, url)
await asyncio.sleep(interval)
except asyncio.CancelledError:
# Ensure proper cleanup on cancellation before propagating
raise
finally:
if own_session and not session.closed:
await session.close()
async def main():
"""Main monitoring loop"""
# Validate configuration
if not ESP_IP:
logger.error("ESP_IP not set in environment")
sys.exit(1)
if not KUMA_URL:
logger.error("KUMA_URL not set in environment")
sys.exit(1)
if not KUMA_KEY:
logger.error("KUMA_KEY not set in environment")
sys.exit(1)
# Initialize ESP32 controller
esp = ESP32Controller(ESP_IP, ESP_API_KEY)
# Initialize ESP32 controller
esp = ESP32Controller(ESP_IP, ESP_API_KEY)
# Connect to ESP32
logger.info(f"Connecting to ESP32 at {ESP_IP}...")
if not await esp.connect():
logger.error("Failed to connect to ESP32. Retrying in 10 seconds...")
await asyncio.sleep(10)
return
logger.info(f"Starting monitoring loop (interval: {POLL_INTERVAL}s)")
# Start Kuma push monitor if configured
push_task = None
if KUMA_PUSH_URL:
push_task = asyncio.create_task(kuma_push_monitor(KUMA_PUSH_URL, KUMA_PUSH_INTERVAL))
logger.info(f"Kuma push monitor started (URL: {KUMA_PUSH_URL}, Interval: {KUMA_PUSH_INTERVAL}s)")
else:
logger.info("Kuma push monitor not configured")
try:
async with aiohttp.ClientSession() as session:
# Wait for esp to be fully connected
logger.info("Waiting for ESP32 to stabilize...")
await asyncio.sleep(5)
logger.info("(actually) Starting Uptime Kuma monitoring...")
# Initialize Uptime Kuma monitor
kuma = UptimeKumaMonitor(session, KUMA_URL, KUMA_KEY)
while True:
# Check Uptime Kuma status
all_up = await kuma.get_all_monitors_status()
# Update ESP32 LEDs based on status
if all_up:
await esp.set_all_monitors_up()
else:
await esp.set_monitors_down()
# Wait before next check
await asyncio.sleep(POLL_INTERVAL)
except KeyboardInterrupt:
logger.info("Shutting down...")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
# Cancel push monitor task if running
if push_task and not push_task.done():
push_task.cancel()
try:
await push_task
except asyncio.CancelledError:
pass
await esp.disconnect()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Stopped by user")