import logging import requests import json import time import argparse import os from dotenv import load_dotenv from requests.auth import HTTPBasicAuth from datetime import datetime # Load environment variables load_dotenv() def get_env_or_fail(key): value = os.getenv(key) if not value or value.strip() == "": raise EnvironmentError(f"CRITICAL: Missing required environment variable: {key}") return value # --- Configuration --- try: ADGUARD_URL = get_env_or_fail("ADGUARD_URL") QUERY_LOG_URL = f"{ADGUARD_URL}/control/querylog" USERNAME = get_env_or_fail("ADGUARD_USER") PASSWORD = get_env_or_fail("ADGUARD_PASSWORD") # Clients configuration via JSON string in .env: {"IP": "NAME"} CLIENTS_RAW = get_env_or_fail("CLIENTS") CLIENTS = json.loads(CLIENTS_RAW) # --- Home Assistant Configuration --- HASS_URL = get_env_or_fail("HASS_URL") HASS_TOKEN = get_env_or_fail("HASS_TOKEN") HASS_HEADERS = { "Authorization": f"Bearer {HASS_TOKEN}", "Content-Type": "application/json", } except (EnvironmentError, json.JSONDecodeError) as e: print(f"Error during startup: {e}") exit(1) def notify_hass(message,title): payload = { "message": message, "title": title, "data": { "push": { "sound": "default", "badge": 1, }, "icon_url": "https://adguard.com/favicon.ico", } } try: requests.post(HASS_URL, headers=HASS_HEADERS, json=payload, timeout=5) except Exception as e: logger.error(f"Failed to send HASS notification: {e}") # --- Logging Setup --- logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler("adguard_monitor.log", encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # --- State Management --- class MonitorStats: def __init__(self): self.api_calls = 0 self.total_blocked = 0 self.last_log_id = None def log_api_call(self): self.api_calls += 1 def log_blocked(self): self.total_blocked += 1 def get_summary(self): return f"API Polls: {self.api_calls} | Total Blocked: {self.total_blocked}" stats = MonitorStats() def get_auth_session(): session = requests.Session() session.auth = HTTPBasicAuth(USERNAME, PASSWORD) return session def fetch_and_analyze(session): stats.log_api_call() try: response = session.get(QUERY_LOG_URL, params={'limit': 50}, timeout=10) response.raise_for_status() data = response.json() logs = data.get('data', []) # Log pull event logger.info(f"Pulled {len(logs)} entries from API") if not logs: return # Process logs in reverse order (oldest to newest) to maintain sequence new_items_processed = 0 for log in reversed(logs): log_id = log.get('time') if stats.last_log_id and log_id <= stats.last_log_id: continue client_ip = log.get('client') if client_ip not in CLIENTS: continue stats.last_log_id = log_id new_items_processed += 1 reason = log.get('reason') host = log.get('question', {}).get('name') client_name = CLIENTS[client_ip] if reason in ["FilteredBlackList", "Rewrite"]: stats.log_blocked() msg = f"🚫 Blocked: {host}\nReason: {reason}" logger.warning(msg) notify_hass(msg, f"BLOCKED: {client_name}") logger.info(f"Analyzed {new_items_processed} new entries since last pull.") logger.debug(f"Status Update: {stats.get_summary()}") except requests.exceptions.RequestException as e: logger.error(f"API Error: {e}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="AdGuard Monitor Service") parser.add_argument("--interval", type=int, default=15, help="Update interval in seconds") args = parser.parse_args() logger.info(f"AdGuard Monitor Service Started. Interval: {args.interval}s") session = get_auth_session() while True: fetch_and_analyze(session) time.sleep(args.interval)