#!/usr/bin/env python3 """ Uptime Kuma to ESP32 LED Status Monitor Polls Uptime Kuma API and controls ESP32 LEDs based on monitor status. """ import asyncio import logging import os import sys from typing import Optional import aiohttp from pythonkuma import UptimeKuma from aioesphomeapi import APIClient, APIConnectionError, ReconnectLogic from dotenv import load_dotenv # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Configuration ESP_IP = os.getenv("ESP_IP") ESP_API_KEY = os.getenv("ESP_API_KEY", "") KUMA_URL = os.getenv("KUMA_URL") KUMA_KEY = os.getenv("KUMA_KEY") POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "30")) KUMA_PUSH_URL = os.getenv("KUMA_PUSH_URL") KUMA_PUSH_INTERVAL = int(os.getenv("KUMA_PUSH_INTERVAL", "60")) class UptimeKumaMonitor: """Handles communication with Uptime Kuma API""" def __init__(self, session: aiohttp.ClientSession, base_url: str, api_key: str): self.base_url = base_url.rstrip("/") self.api_key = api_key self.kuma = UptimeKuma(session, base_url, api_key) async def get_all_monitors_status(self) -> bool: """ Check if all monitors are UP. Returns True if all monitors are UP, False otherwise. """ try: # Get all monitors using metrics() - returns dict of monitor_id: UptimeKumaMonitor monitors = await self.kuma.metrics() if not monitors: logger.warning("No monitors found in Uptime Kuma") return False all_up = True monitor_count = 0 down_monitors = [] for monitor_id, monitor in monitors.items(): monitor_count += 1 # MonitorStatus.UP has value 1 # Check monitor_status attribute (it's an enum) if monitor.monitor_status.value != 1: all_up = False down_monitors.append( monitor.monitor_name or f"Monitor {monitor_id}" ) if down_monitors: logger.warning(f"Monitors DOWN: {', '.join(down_monitors)}") logger.info(f"Checked {monitor_count} monitors - All UP: {all_up}") return all_up except Exception as e: logger.error(f"Failed to fetch Uptime Kuma status: {e}") return False class ESP32Controller: """Handles communication with ESP32 via ESPHome API""" def __init__(self, host: str, encryption_key: str = ""): self.host = host self.encryption_key = encryption_key self.client: Optional[APIClient] = None self.reconnect_logic: Optional[ReconnectLogic] = None self.current_state = None # Track current LED state to minimize traffic self.light_keys = {} # Map light IDs to entity keys self.switch_keys = {} # Map switch IDs to entity keys self.switch_states = {} # Track switch states async def connect(self): """Connect to ESP32""" try: self.client = APIClient( address=self.host, port=6053, password="", noise_psk=self.encryption_key if self.encryption_key else None, ) await self.client.connect(login=True) logger.info(f"Connected to ESP32 at {self.host}") # Get entity list to map entity IDs to keys entities, services = await self.client.list_entities_services() switch_names = ['monitoring_enabled', 'force_red_led', 'force_green_led', 'quiet_hours_active'] for entity in entities: if hasattr(entity, "object_id") and hasattr(entity, "key"): # Separate switches from lights if entity.object_id in switch_names: self.switch_keys[entity.object_id] = entity.key else: self.light_keys[entity.object_id] = entity.key logger.info(f"Found lights: {list(self.light_keys.keys())}") logger.info(f"Found switches: {list(self.switch_keys.keys())}") # Fetch initial states for switches for entity in entities: if hasattr(entity, "object_id") and entity.object_id in self.switch_keys: if hasattr(entity, "state"): self.switch_states[entity.object_id] = entity.state logger.info(f"Initial state for '{entity.object_id}': {entity.state}") # Subscribe to state changes for switches def on_state_changed(state): # Log all state changes for debugging entity_name = None entity_type = None # Check if it's a switch for switch_id, key in self.switch_keys.items(): if state.key == key: self.switch_states[switch_id] = state.state entity_name = switch_id entity_type = "switch" logger.info( f"Switch '{switch_id}' changed to: {state.state}" ) break # Check if it's a light (for debugging) if entity_name is None: for light_id, key in self.light_keys.items(): if state.key == key: entity_name = light_id entity_type = "light" logger.debug( f"Light '{light_id}' state changed (key={state.key})" ) break # Log unknown entities if entity_name is None: logger.debug(f"Unknown entity state change (key={state.key})") try: self.client.subscribe_states(on_state_changed) except Exception as e: logger.error(f"Failed to subscribe to states: {e}") # Set up reconnection logic async def on_disconnect(): logger.warning("Disconnected from ESP32") async def on_connect(): logger.info("Reconnected to ESP32") self.reconnect_logic = ReconnectLogic( client=self.client, on_disconnect=on_disconnect, on_connect=on_connect, name=self.host, ) return True except (APIConnectionError, Exception) as e: logger.error(f"Failed to connect to ESP32: {e}") return False def should_update_leds(self) -> bool: """Check if LEDs should be updated based on switch states""" # Check if monitoring is enabled monitoring_enabled = self.switch_states.get("monitoring_enabled", True) if not monitoring_enabled: logger.debug("Monitoring disabled - skipping LED update") return False # Check if quiet hours are active quiet_hours = self.switch_states.get("quiet_hours", False) if quiet_hours: logger.debug("Quiet hours active - skipping LED update") return False return True async def set_all_monitors_up(self): """Set green LED with breathing effect (all monitors UP)""" if not self.should_update_leds(): logger.info("Skipping LED update due to switch states") return if self.current_state == "all_up": return # Already in correct state if not self.client: logger.error("ESP32 client not connected") return try: # Turn off red LED (sanitized name: rote_led) if "rote_led" in self.light_keys: self.client.light_command(key=self.light_keys["rote_led"], state=False) # Turn on green LED with breathing effect (sanitized name: gr_ne_led) if "gr_ne_led" in self.light_keys: self.client.light_command( key=self.light_keys["gr_ne_led"], state=True, brightness=1.0, effect="Breathing", ) self.current_state = "all_up" logger.info("✓ All monitors UP - Green LED breathing") except Exception as e: logger.error(f"Failed to set all_up state: {e}") async def set_monitors_down(self): """Set red LED with fast heartbeat effect (monitors DOWN)""" if not self.should_update_leds(): logger.info("Skipping LED update due to switch states") return if self.current_state == "monitors_down": return # Already in correct state if not self.client: logger.error("ESP32 client not connected") return try: # Turn off green LED (sanitized name: gr_ne_led) if "gr_ne_led" in self.light_keys: self.client.light_command(key=self.light_keys["gr_ne_led"], state=False) # Turn on red LED with fast heartbeat effect (sanitized name: rote_led) if "rote_led" in self.light_keys: self.client.light_command( key=self.light_keys["rote_led"], state=True, brightness=1.0, effect="Fast Heartbeat", ) self.current_state = "monitors_down" logger.warning("✗ Monitors DOWN - Red LED fast heartbeat") except Exception as e: logger.error(f"Failed to set monitors_down state: {e}") async def disconnect(self): """Disconnect from ESP32""" if self.client: await self.client.disconnect() logger.info("Disconnected from ESP32") async def push_to_kuma(session: aiohttp.ClientSession, url: str): """Send a GET request to the Kuma push URL with retry logic""" try: # First attempt async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp: logger.info(f"Kuma push: status {resp.status}") return except Exception as e: logger.warning(f"Kuma push failed: {e} (retrying in 15s)") await asyncio.sleep(15) try: # Retry attempt async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp: logger.info(f"Kuma push retry succeeded: status {resp.status}") except Exception as retry_e: logger.error(f"Kuma push retry failed: {retry_e}") async def kuma_push_monitor( url: str, interval: int, session: Optional[aiohttp.ClientSession] = None ): """Periodically send GET requests to the Kuma push URL. If a session is provided, it will be reused and not closed here. If no session is provided, this function will create one and ensure it is properly closed, even if the task is cancelled. """ own_session = False if session is None: session = aiohttp.ClientSession() own_session = True try: while True: await push_to_kuma(session, url) await asyncio.sleep(interval) except asyncio.CancelledError: # Ensure proper cleanup on cancellation before propagating raise finally: if own_session and not session.closed: await session.close() async def main(): """Main monitoring loop""" # Validate configuration if not ESP_IP: logger.error("ESP_IP not set in environment") sys.exit(1) if not KUMA_URL: logger.error("KUMA_URL not set in environment") sys.exit(1) if not KUMA_KEY: logger.error("KUMA_KEY not set in environment") sys.exit(1) # Initialize ESP32 controller esp = ESP32Controller(ESP_IP, ESP_API_KEY) # Initialize ESP32 controller esp = ESP32Controller(ESP_IP, ESP_API_KEY) # Connect to ESP32 logger.info(f"Connecting to ESP32 at {ESP_IP}...") if not await esp.connect(): logger.error("Failed to connect to ESP32. Retrying in 10 seconds...") await asyncio.sleep(10) return logger.info(f"Starting monitoring loop (interval: {POLL_INTERVAL}s)") # Start Kuma push monitor if configured push_task = None if KUMA_PUSH_URL: push_task = asyncio.create_task(kuma_push_monitor(KUMA_PUSH_URL, KUMA_PUSH_INTERVAL)) logger.info(f"Kuma push monitor started (URL: {KUMA_PUSH_URL}, Interval: {KUMA_PUSH_INTERVAL}s)") else: logger.info("Kuma push monitor not configured") try: async with aiohttp.ClientSession() as session: # Wait for esp to be fully connected logger.info("Waiting for ESP32 to stabilize...") await asyncio.sleep(5) logger.info("(actually) Starting Uptime Kuma monitoring...") # Initialize Uptime Kuma monitor kuma = UptimeKumaMonitor(session, KUMA_URL, KUMA_KEY) while True: # Check Uptime Kuma status all_up = await kuma.get_all_monitors_status() # Update ESP32 LEDs based on status if all_up: await esp.set_all_monitors_up() else: await esp.set_monitors_down() # Wait before next check await asyncio.sleep(POLL_INTERVAL) except KeyboardInterrupt: logger.info("Shutting down...") except Exception as e: logger.error(f"Unexpected error: {e}") finally: # Cancel push monitor task if running if push_task and not push_task.done(): push_task.cancel() try: await push_task except asyncio.CancelledError: pass await esp.disconnect() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Stopped by user")