256 lines
8.6 KiB
Python
256 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Uptime Kuma to ESP32 LED Status Monitor
|
|
Polls Uptime Kuma API and controls ESP32 LEDs based on monitor status.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import sys
|
|
from typing import Optional
|
|
|
|
import aiohttp
|
|
from pythonkuma import UptimeKuma
|
|
from aioesphomeapi import APIClient, APIConnectionError, ReconnectLogic
|
|
from dotenv import load_dotenv
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
ESP_IP = os.getenv('ESP_IP')
|
|
ESP_API_KEY = os.getenv('ESP_API_KEY', '')
|
|
KUMA_URL = os.getenv('KUMA_URL')
|
|
KUMA_KEY = os.getenv('KUMA_KEY')
|
|
POLL_INTERVAL = int(os.getenv('POLL_INTERVAL', '30'))
|
|
|
|
|
|
class UptimeKumaMonitor:
|
|
"""Handles communication with Uptime Kuma API"""
|
|
|
|
def __init__(self, session: aiohttp.ClientSession, base_url: str, api_key: str):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.api_key = api_key
|
|
self.kuma = UptimeKuma(session, base_url, api_key)
|
|
|
|
async def get_all_monitors_status(self) -> bool:
|
|
"""
|
|
Check if all monitors are UP.
|
|
Returns True if all monitors are UP, False otherwise.
|
|
"""
|
|
try:
|
|
# Get all monitors using metrics() - returns dict of monitor_id: UptimeKumaMonitor
|
|
monitors = await self.kuma.metrics()
|
|
|
|
if not monitors:
|
|
logger.warning("No monitors found in Uptime Kuma")
|
|
return False
|
|
|
|
all_up = True
|
|
monitor_count = 0
|
|
down_monitors = []
|
|
|
|
for monitor_id, monitor in monitors.items():
|
|
monitor_count += 1
|
|
# MonitorStatus.UP has value 1
|
|
# Check monitor_status attribute (it's an enum)
|
|
if monitor.monitor_status.value != 1:
|
|
all_up = False
|
|
down_monitors.append(monitor.monitor_name or f"Monitor {monitor_id}")
|
|
|
|
if down_monitors:
|
|
logger.warning(f"Monitors DOWN: {', '.join(down_monitors)}")
|
|
|
|
logger.info(f"Checked {monitor_count} monitors - All UP: {all_up}")
|
|
return all_up
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch Uptime Kuma status: {e}")
|
|
return False
|
|
|
|
|
|
class ESP32Controller:
|
|
"""Handles communication with ESP32 via ESPHome API"""
|
|
|
|
def __init__(self, host: str, encryption_key: str = ''):
|
|
self.host = host
|
|
self.encryption_key = encryption_key
|
|
self.client: Optional[APIClient] = None
|
|
self.reconnect_logic: Optional[ReconnectLogic] = None
|
|
self.current_state = None # Track current LED state to minimize traffic
|
|
self.light_keys = {} # Map light IDs to entity keys
|
|
|
|
async def connect(self):
|
|
"""Connect to ESP32"""
|
|
try:
|
|
self.client = APIClient(
|
|
address=self.host,
|
|
port=6053,
|
|
password='',
|
|
noise_psk=self.encryption_key if self.encryption_key else None
|
|
)
|
|
|
|
await self.client.connect(login=True)
|
|
logger.info(f"Connected to ESP32 at {self.host}")
|
|
|
|
# Get entity list to map light IDs to keys
|
|
entities, services = await self.client.list_entities_services()
|
|
for entity in entities:
|
|
if hasattr(entity, 'object_id') and hasattr(entity, 'key'):
|
|
self.light_keys[entity.object_id] = entity.key
|
|
|
|
logger.info(f"Found lights: {list(self.light_keys.keys())}")
|
|
|
|
# Set up reconnection logic
|
|
async def on_disconnect():
|
|
logger.warning("Disconnected from ESP32")
|
|
|
|
async def on_connect():
|
|
logger.info("Reconnected to ESP32")
|
|
|
|
self.reconnect_logic = ReconnectLogic(
|
|
client=self.client,
|
|
on_disconnect=on_disconnect,
|
|
on_connect=on_connect,
|
|
name=self.host
|
|
)
|
|
|
|
return True
|
|
|
|
except (APIConnectionError, Exception) as e:
|
|
logger.error(f"Failed to connect to ESP32: {e}")
|
|
return False
|
|
|
|
async def set_all_monitors_up(self):
|
|
"""Set green LED with breathing effect (all monitors UP)"""
|
|
if self.current_state == 'all_up':
|
|
return # Already in correct state
|
|
|
|
if not self.client:
|
|
logger.error("ESP32 client not connected")
|
|
return
|
|
|
|
try:
|
|
# Turn off red LED (sanitized name: rote_led)
|
|
if 'rote_led' in self.light_keys:
|
|
self.client.light_command(key=self.light_keys['rote_led'], state=False)
|
|
|
|
# Turn on green LED with breathing effect (sanitized name: gr_ne_led)
|
|
if 'gr_ne_led' in self.light_keys:
|
|
self.client.light_command(
|
|
key=self.light_keys['gr_ne_led'],
|
|
state=True,
|
|
brightness=1.0,
|
|
effect='Breathing'
|
|
)
|
|
|
|
self.current_state = 'all_up'
|
|
logger.info("✓ All monitors UP - Green LED breathing")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to set all_up state: {e}")
|
|
|
|
async def set_monitors_down(self):
|
|
"""Set red LED with fast heartbeat effect (monitors DOWN)"""
|
|
if self.current_state == 'monitors_down':
|
|
return # Already in correct state
|
|
|
|
if not self.client:
|
|
logger.error("ESP32 client not connected")
|
|
return
|
|
|
|
try:
|
|
# Turn off green LED (sanitized name: gr_ne_led)
|
|
if 'gr_ne_led' in self.light_keys:
|
|
self.client.light_command(key=self.light_keys['gr_ne_led'], state=False)
|
|
|
|
# Turn on red LED with fast heartbeat effect (sanitized name: rote_led)
|
|
if 'rote_led' in self.light_keys:
|
|
self.client.light_command(
|
|
key=self.light_keys['rote_led'],
|
|
state=True,
|
|
brightness=1.0,
|
|
effect='Fast Heartbeat'
|
|
)
|
|
|
|
self.current_state = 'monitors_down'
|
|
logger.warning("✗ Monitors DOWN - Red LED fast heartbeat")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to set monitors_down state: {e}")
|
|
|
|
async def disconnect(self):
|
|
"""Disconnect from ESP32"""
|
|
if self.client:
|
|
await self.client.disconnect()
|
|
logger.info("Disconnected from ESP32")
|
|
|
|
|
|
async def main():
|
|
"""Main monitoring loop"""
|
|
|
|
# Validate configuration
|
|
if not ESP_IP:
|
|
logger.error("ESP_IP not set in environment")
|
|
sys.exit(1)
|
|
|
|
if not KUMA_URL:
|
|
logger.error("KUMA_URL not set in environment")
|
|
sys.exit(1)
|
|
|
|
if not KUMA_KEY:
|
|
logger.error("KUMA_KEY not set in environment")
|
|
logger.error("Get your API key from Uptime Kuma: Settings → API Keys")
|
|
sys.exit(1)
|
|
|
|
# Initialize ESP32 controller
|
|
esp = ESP32Controller(ESP_IP, ESP_API_KEY)
|
|
|
|
# Connect to ESP32
|
|
logger.info(f"Connecting to ESP32 at {ESP_IP}...")
|
|
if not await esp.connect():
|
|
logger.error("Failed to connect to ESP32. Retrying in 10 seconds...")
|
|
await asyncio.sleep(10)
|
|
return
|
|
|
|
logger.info(f"Starting monitoring loop (interval: {POLL_INTERVAL}s)")
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
# Initialize Uptime Kuma monitor
|
|
kuma = UptimeKumaMonitor(session, KUMA_URL, KUMA_KEY)
|
|
|
|
while True:
|
|
# Check Uptime Kuma status
|
|
all_up = await kuma.get_all_monitors_status()
|
|
|
|
# Update ESP32 LEDs based on status
|
|
if all_up:
|
|
await esp.set_all_monitors_up()
|
|
else:
|
|
await esp.set_monitors_down()
|
|
|
|
# Wait before next check
|
|
await asyncio.sleep(POLL_INTERVAL)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Shutting down...")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {e}")
|
|
finally:
|
|
await esp.disconnect()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
logger.info("Stopped by user")
|