Fixed state tracking across service & esp for cleaner communication and experience

This commit is contained in:
Space-Banane
2026-01-31 19:17:56 +01:00
parent 16c28d1ea4
commit d5d106c0ab

228
listen.py
View File

@@ -16,8 +16,7 @@ from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
@@ -25,21 +24,21 @@ logger = logging.getLogger(__name__)
load_dotenv()
# Configuration
ESP_IP = os.getenv('ESP_IP')
ESP_API_KEY = os.getenv('ESP_API_KEY', '')
KUMA_URL = os.getenv('KUMA_URL')
KUMA_KEY = os.getenv('KUMA_KEY')
POLL_INTERVAL = int(os.getenv('POLL_INTERVAL', '30'))
ESP_IP = os.getenv("ESP_IP")
ESP_API_KEY = os.getenv("ESP_API_KEY", "")
KUMA_URL = os.getenv("KUMA_URL")
KUMA_KEY = os.getenv("KUMA_KEY")
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "30"))
class UptimeKumaMonitor:
"""Handles communication with Uptime Kuma API"""
def __init__(self, session: aiohttp.ClientSession, base_url: str, api_key: str):
self.base_url = base_url.rstrip('/')
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.kuma = UptimeKuma(session, base_url, api_key)
async def get_all_monitors_status(self) -> bool:
"""
Check if all monitors are UP.
@@ -48,29 +47,31 @@ class UptimeKumaMonitor:
try:
# Get all monitors using metrics() - returns dict of monitor_id: UptimeKumaMonitor
monitors = await self.kuma.metrics()
if not monitors:
logger.warning("No monitors found in Uptime Kuma")
return False
all_up = True
monitor_count = 0
down_monitors = []
for monitor_id, monitor in monitors.items():
monitor_count += 1
# MonitorStatus.UP has value 1
# Check monitor_status attribute (it's an enum)
if monitor.monitor_status.value != 1:
all_up = False
down_monitors.append(monitor.monitor_name or f"Monitor {monitor_id}")
down_monitors.append(
monitor.monitor_name or f"Monitor {monitor_id}"
)
if down_monitors:
logger.warning(f"Monitors DOWN: {', '.join(down_monitors)}")
logger.info(f"Checked {monitor_count} monitors - All UP: {all_up}")
return all_up
except Exception as e:
logger.error(f"Failed to fetch Uptime Kuma status: {e}")
return False
@@ -78,114 +79,191 @@ class UptimeKumaMonitor:
class ESP32Controller:
"""Handles communication with ESP32 via ESPHome API"""
def __init__(self, host: str, encryption_key: str = ''):
def __init__(self, host: str, encryption_key: str = ""):
self.host = host
self.encryption_key = encryption_key
self.client: Optional[APIClient] = None
self.reconnect_logic: Optional[ReconnectLogic] = None
self.current_state = None # Track current LED state to minimize traffic
self.light_keys = {} # Map light IDs to entity keys
self.switch_keys = {} # Map switch IDs to entity keys
self.switch_states = {} # Track switch states
async def connect(self):
"""Connect to ESP32"""
try:
self.client = APIClient(
address=self.host,
port=6053,
password='',
noise_psk=self.encryption_key if self.encryption_key else None
password="",
noise_psk=self.encryption_key if self.encryption_key else None,
)
await self.client.connect(login=True)
logger.info(f"Connected to ESP32 at {self.host}")
# Get entity list to map light IDs to keys
# Get entity list to map entity IDs to keys
entities, services = await self.client.list_entities_services()
switch_names = ['monitoring_enabled', 'force_red_led', 'force_green_led', 'quiet_hours_active']
for entity in entities:
if hasattr(entity, 'object_id') and hasattr(entity, 'key'):
self.light_keys[entity.object_id] = entity.key
if hasattr(entity, "object_id") and hasattr(entity, "key"):
# Separate switches from lights
if entity.object_id in switch_names:
self.switch_keys[entity.object_id] = entity.key
else:
self.light_keys[entity.object_id] = entity.key
logger.info(f"Found lights: {list(self.light_keys.keys())}")
logger.info(f"Found switches: {list(self.switch_keys.keys())}")
# Fetch initial states for switches
for entity in entities:
if hasattr(entity, "object_id") and entity.object_id in self.switch_keys:
if hasattr(entity, "state"):
self.switch_states[entity.object_id] = entity.state
logger.info(f"Initial state for '{entity.object_id}': {entity.state}")
# Subscribe to state changes for switches
def on_state_changed(state):
# Log all state changes for debugging
entity_name = None
entity_type = None
# Check if it's a switch
for switch_id, key in self.switch_keys.items():
if state.key == key:
self.switch_states[switch_id] = state.state
entity_name = switch_id
entity_type = "switch"
logger.info(
f"Switch '{switch_id}' changed to: {state.state}"
)
break
# Check if it's a light (for debugging)
if entity_name is None:
for light_id, key in self.light_keys.items():
if state.key == key:
entity_name = light_id
entity_type = "light"
logger.debug(
f"Light '{light_id}' state changed (key={state.key})"
)
break
# Log unknown entities
if entity_name is None:
logger.debug(f"Unknown entity state change (key={state.key})")
try:
self.client.subscribe_states(on_state_changed)
except Exception as e:
logger.error(f"Failed to subscribe to states: {e}")
# Set up reconnection logic
async def on_disconnect():
logger.warning("Disconnected from ESP32")
async def on_connect():
logger.info("Reconnected to ESP32")
self.reconnect_logic = ReconnectLogic(
client=self.client,
on_disconnect=on_disconnect,
on_connect=on_connect,
name=self.host
name=self.host,
)
return True
except (APIConnectionError, Exception) as e:
logger.error(f"Failed to connect to ESP32: {e}")
return False
def should_update_leds(self) -> bool:
"""Check if LEDs should be updated based on switch states"""
# Check if monitoring is enabled
monitoring_enabled = self.switch_states.get("monitoring_enabled", True)
if not monitoring_enabled:
logger.debug("Monitoring disabled - skipping LED update")
return False
# Check if quiet hours are active
quiet_hours = self.switch_states.get("quiet_hours", False)
if quiet_hours:
logger.debug("Quiet hours active - skipping LED update")
return False
return True
async def set_all_monitors_up(self):
"""Set green LED with breathing effect (all monitors UP)"""
if self.current_state == 'all_up':
if not self.should_update_leds():
logger.info("Skipping LED update due to switch states")
return
if self.current_state == "all_up":
return # Already in correct state
if not self.client:
logger.error("ESP32 client not connected")
return
try:
# Turn off red LED (sanitized name: rote_led)
if 'rote_led' in self.light_keys:
self.client.light_command(key=self.light_keys['rote_led'], state=False)
if "rote_led" in self.light_keys:
self.client.light_command(key=self.light_keys["rote_led"], state=False)
# Turn on green LED with breathing effect (sanitized name: gr_ne_led)
if 'gr_ne_led' in self.light_keys:
if "gr_ne_led" in self.light_keys:
self.client.light_command(
key=self.light_keys['gr_ne_led'],
key=self.light_keys["gr_ne_led"],
state=True,
brightness=1.0,
effect='Breathing'
effect="Breathing",
)
self.current_state = 'all_up'
self.current_state = "all_up"
logger.info("✓ All monitors UP - Green LED breathing")
except Exception as e:
logger.error(f"Failed to set all_up state: {e}")
async def set_monitors_down(self):
"""Set red LED with fast heartbeat effect (monitors DOWN)"""
if self.current_state == 'monitors_down':
if not self.should_update_leds():
logger.info("Skipping LED update due to switch states")
return
if self.current_state == "monitors_down":
return # Already in correct state
if not self.client:
logger.error("ESP32 client not connected")
return
try:
# Turn off green LED (sanitized name: gr_ne_led)
if 'gr_ne_led' in self.light_keys:
self.client.light_command(key=self.light_keys['gr_ne_led'], state=False)
if "gr_ne_led" in self.light_keys:
self.client.light_command(key=self.light_keys["gr_ne_led"], state=False)
# Turn on red LED with fast heartbeat effect (sanitized name: rote_led)
if 'rote_led' in self.light_keys:
if "rote_led" in self.light_keys:
self.client.light_command(
key=self.light_keys['rote_led'],
key=self.light_keys["rote_led"],
state=True,
brightness=1.0,
effect='Fast Heartbeat'
effect="Fast Heartbeat",
)
self.current_state = 'monitors_down'
self.current_state = "monitors_down"
logger.warning("✗ Monitors DOWN - Red LED fast heartbeat")
except Exception as e:
logger.error(f"Failed to set monitors_down state: {e}")
async def disconnect(self):
"""Disconnect from ESP32"""
if self.client:
@@ -195,51 +273,55 @@ class ESP32Controller:
async def main():
"""Main monitoring loop"""
# Validate configuration
if not ESP_IP:
logger.error("ESP_IP not set in environment")
sys.exit(1)
if not KUMA_URL:
logger.error("KUMA_URL not set in environment")
sys.exit(1)
if not KUMA_KEY:
logger.error("KUMA_KEY not set in environment")
logger.error("Get your API key from Uptime Kuma: Settings → API Keys")
sys.exit(1)
# Initialize ESP32 controller
esp = ESP32Controller(ESP_IP, ESP_API_KEY)
# Connect to ESP32
logger.info(f"Connecting to ESP32 at {ESP_IP}...")
if not await esp.connect():
logger.error("Failed to connect to ESP32. Retrying in 10 seconds...")
await asyncio.sleep(10)
return
logger.info(f"Starting monitoring loop (interval: {POLL_INTERVAL}s)")
try:
async with aiohttp.ClientSession() as session:
# Wait for esp to be fully connected
logger.info("Waiting for ESP32 to stabilize...")
await asyncio.sleep(5)
logger.info("(actually) Starting Uptime Kuma monitoring...")
# Initialize Uptime Kuma monitor
kuma = UptimeKumaMonitor(session, KUMA_URL, KUMA_KEY)
while True:
# Check Uptime Kuma status
all_up = await kuma.get_all_monitors_status()
# Update ESP32 LEDs based on status
if all_up:
await esp.set_all_monitors_up()
else:
await esp.set_monitors_down()
# Wait before next check
await asyncio.sleep(POLL_INTERVAL)
except KeyboardInterrupt:
logger.info("Shutting down...")
except Exception as e:
@@ -248,7 +330,7 @@ async def main():
await esp.disconnect()
if __name__ == '__main__':
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt: