diff --git a/listen.py b/listen.py index a24a78b..5eb219e 100644 --- a/listen.py +++ b/listen.py @@ -16,8 +16,7 @@ from dotenv import load_dotenv # Configure logging logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) @@ -25,21 +24,21 @@ logger = logging.getLogger(__name__) load_dotenv() # Configuration -ESP_IP = os.getenv('ESP_IP') -ESP_API_KEY = os.getenv('ESP_API_KEY', '') -KUMA_URL = os.getenv('KUMA_URL') -KUMA_KEY = os.getenv('KUMA_KEY') -POLL_INTERVAL = int(os.getenv('POLL_INTERVAL', '30')) +ESP_IP = os.getenv("ESP_IP") +ESP_API_KEY = os.getenv("ESP_API_KEY", "") +KUMA_URL = os.getenv("KUMA_URL") +KUMA_KEY = os.getenv("KUMA_KEY") +POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "30")) class UptimeKumaMonitor: """Handles communication with Uptime Kuma API""" - + def __init__(self, session: aiohttp.ClientSession, base_url: str, api_key: str): - self.base_url = base_url.rstrip('/') + self.base_url = base_url.rstrip("/") self.api_key = api_key self.kuma = UptimeKuma(session, base_url, api_key) - + async def get_all_monitors_status(self) -> bool: """ Check if all monitors are UP. @@ -48,29 +47,31 @@ class UptimeKumaMonitor: try: # Get all monitors using metrics() - returns dict of monitor_id: UptimeKumaMonitor monitors = await self.kuma.metrics() - + if not monitors: logger.warning("No monitors found in Uptime Kuma") return False - + all_up = True monitor_count = 0 down_monitors = [] - + for monitor_id, monitor in monitors.items(): monitor_count += 1 # MonitorStatus.UP has value 1 # Check monitor_status attribute (it's an enum) if monitor.monitor_status.value != 1: all_up = False - down_monitors.append(monitor.monitor_name or f"Monitor {monitor_id}") - + down_monitors.append( + monitor.monitor_name or f"Monitor {monitor_id}" + ) + if down_monitors: logger.warning(f"Monitors DOWN: {', '.join(down_monitors)}") - + logger.info(f"Checked {monitor_count} monitors - All UP: {all_up}") return all_up - + except Exception as e: logger.error(f"Failed to fetch Uptime Kuma status: {e}") return False @@ -78,114 +79,191 @@ class UptimeKumaMonitor: class ESP32Controller: """Handles communication with ESP32 via ESPHome API""" - - def __init__(self, host: str, encryption_key: str = ''): + + def __init__(self, host: str, encryption_key: str = ""): self.host = host self.encryption_key = encryption_key self.client: Optional[APIClient] = None self.reconnect_logic: Optional[ReconnectLogic] = None self.current_state = None # Track current LED state to minimize traffic self.light_keys = {} # Map light IDs to entity keys - + self.switch_keys = {} # Map switch IDs to entity keys + self.switch_states = {} # Track switch states + async def connect(self): """Connect to ESP32""" try: self.client = APIClient( address=self.host, port=6053, - password='', - noise_psk=self.encryption_key if self.encryption_key else None + password="", + noise_psk=self.encryption_key if self.encryption_key else None, ) - + await self.client.connect(login=True) logger.info(f"Connected to ESP32 at {self.host}") - - # Get entity list to map light IDs to keys + + # Get entity list to map entity IDs to keys entities, services = await self.client.list_entities_services() + switch_names = ['monitoring_enabled', 'force_red_led', 'force_green_led', 'quiet_hours_active'] + for entity in entities: - if hasattr(entity, 'object_id') and hasattr(entity, 'key'): - self.light_keys[entity.object_id] = entity.key - + if hasattr(entity, "object_id") and hasattr(entity, "key"): + # Separate switches from lights + if entity.object_id in switch_names: + self.switch_keys[entity.object_id] = entity.key + else: + self.light_keys[entity.object_id] = entity.key + logger.info(f"Found lights: {list(self.light_keys.keys())}") - + logger.info(f"Found switches: {list(self.switch_keys.keys())}") + + # Fetch initial states for switches + for entity in entities: + if hasattr(entity, "object_id") and entity.object_id in self.switch_keys: + if hasattr(entity, "state"): + self.switch_states[entity.object_id] = entity.state + logger.info(f"Initial state for '{entity.object_id}': {entity.state}") + + # Subscribe to state changes for switches + def on_state_changed(state): + # Log all state changes for debugging + entity_name = None + entity_type = None + + # Check if it's a switch + for switch_id, key in self.switch_keys.items(): + if state.key == key: + self.switch_states[switch_id] = state.state + entity_name = switch_id + entity_type = "switch" + logger.info( + f"Switch '{switch_id}' changed to: {state.state}" + ) + break + + # Check if it's a light (for debugging) + if entity_name is None: + for light_id, key in self.light_keys.items(): + if state.key == key: + entity_name = light_id + entity_type = "light" + logger.debug( + f"Light '{light_id}' state changed (key={state.key})" + ) + break + + # Log unknown entities + if entity_name is None: + logger.debug(f"Unknown entity state change (key={state.key})") + + try: + self.client.subscribe_states(on_state_changed) + except Exception as e: + logger.error(f"Failed to subscribe to states: {e}") + # Set up reconnection logic async def on_disconnect(): logger.warning("Disconnected from ESP32") - + async def on_connect(): logger.info("Reconnected to ESP32") - + self.reconnect_logic = ReconnectLogic( client=self.client, on_disconnect=on_disconnect, on_connect=on_connect, - name=self.host + name=self.host, ) - + return True - + except (APIConnectionError, Exception) as e: logger.error(f"Failed to connect to ESP32: {e}") return False - + + def should_update_leds(self) -> bool: + """Check if LEDs should be updated based on switch states""" + # Check if monitoring is enabled + monitoring_enabled = self.switch_states.get("monitoring_enabled", True) + if not monitoring_enabled: + logger.debug("Monitoring disabled - skipping LED update") + return False + + # Check if quiet hours are active + quiet_hours = self.switch_states.get("quiet_hours", False) + if quiet_hours: + logger.debug("Quiet hours active - skipping LED update") + return False + + return True + async def set_all_monitors_up(self): """Set green LED with breathing effect (all monitors UP)""" - if self.current_state == 'all_up': + if not self.should_update_leds(): + logger.info("Skipping LED update due to switch states") + return + + if self.current_state == "all_up": return # Already in correct state - + if not self.client: logger.error("ESP32 client not connected") return - + try: # Turn off red LED (sanitized name: rote_led) - if 'rote_led' in self.light_keys: - self.client.light_command(key=self.light_keys['rote_led'], state=False) - + if "rote_led" in self.light_keys: + self.client.light_command(key=self.light_keys["rote_led"], state=False) + # Turn on green LED with breathing effect (sanitized name: gr_ne_led) - if 'gr_ne_led' in self.light_keys: + if "gr_ne_led" in self.light_keys: self.client.light_command( - key=self.light_keys['gr_ne_led'], + key=self.light_keys["gr_ne_led"], state=True, brightness=1.0, - effect='Breathing' + effect="Breathing", ) - - self.current_state = 'all_up' + + self.current_state = "all_up" logger.info("✓ All monitors UP - Green LED breathing") - + except Exception as e: logger.error(f"Failed to set all_up state: {e}") - + async def set_monitors_down(self): """Set red LED with fast heartbeat effect (monitors DOWN)""" - if self.current_state == 'monitors_down': + if not self.should_update_leds(): + logger.info("Skipping LED update due to switch states") + return + + if self.current_state == "monitors_down": return # Already in correct state - + if not self.client: logger.error("ESP32 client not connected") return - + try: # Turn off green LED (sanitized name: gr_ne_led) - if 'gr_ne_led' in self.light_keys: - self.client.light_command(key=self.light_keys['gr_ne_led'], state=False) - + if "gr_ne_led" in self.light_keys: + self.client.light_command(key=self.light_keys["gr_ne_led"], state=False) + # Turn on red LED with fast heartbeat effect (sanitized name: rote_led) - if 'rote_led' in self.light_keys: + if "rote_led" in self.light_keys: self.client.light_command( - key=self.light_keys['rote_led'], + key=self.light_keys["rote_led"], state=True, brightness=1.0, - effect='Fast Heartbeat' + effect="Fast Heartbeat", ) - - self.current_state = 'monitors_down' + + self.current_state = "monitors_down" logger.warning("✗ Monitors DOWN - Red LED fast heartbeat") - + except Exception as e: logger.error(f"Failed to set monitors_down state: {e}") - + async def disconnect(self): """Disconnect from ESP32""" if self.client: @@ -195,51 +273,55 @@ class ESP32Controller: async def main(): """Main monitoring loop""" - + # Validate configuration if not ESP_IP: logger.error("ESP_IP not set in environment") sys.exit(1) - + if not KUMA_URL: logger.error("KUMA_URL not set in environment") sys.exit(1) - + if not KUMA_KEY: logger.error("KUMA_KEY not set in environment") logger.error("Get your API key from Uptime Kuma: Settings → API Keys") sys.exit(1) - + # Initialize ESP32 controller esp = ESP32Controller(ESP_IP, ESP_API_KEY) - + # Connect to ESP32 logger.info(f"Connecting to ESP32 at {ESP_IP}...") if not await esp.connect(): logger.error("Failed to connect to ESP32. Retrying in 10 seconds...") await asyncio.sleep(10) return - + logger.info(f"Starting monitoring loop (interval: {POLL_INTERVAL}s)") - + try: async with aiohttp.ClientSession() as session: + # Wait for esp to be fully connected + logger.info("Waiting for ESP32 to stabilize...") + await asyncio.sleep(5) + logger.info("(actually) Starting Uptime Kuma monitoring...") # Initialize Uptime Kuma monitor kuma = UptimeKumaMonitor(session, KUMA_URL, KUMA_KEY) - + while True: # Check Uptime Kuma status all_up = await kuma.get_all_monitors_status() - + # Update ESP32 LEDs based on status if all_up: await esp.set_all_monitors_up() else: await esp.set_monitors_down() - + # Wait before next check await asyncio.sleep(POLL_INTERVAL) - + except KeyboardInterrupt: logger.info("Shutting down...") except Exception as e: @@ -248,7 +330,7 @@ async def main(): await esp.disconnect() -if __name__ == '__main__': +if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: