import logging import requests import json import time import argparse import os import fnmatch from dotenv import load_dotenv from requests.auth import HTTPBasicAuth from datetime import datetime # Load environment variables load_dotenv() def get_env_or_fail(key): value = os.getenv(key) if not value or value.strip() == "": raise EnvironmentError(f"CRITICAL: Missing required environment variable: {key}") return value # --- Configuration --- try: ADGUARD_URL = get_env_or_fail("ADGUARD_URL") QUERY_LOG_URL = f"{ADGUARD_URL}/control/querylog" USERNAME = get_env_or_fail("ADGUARD_USER") PASSWORD = get_env_or_fail("ADGUARD_PASSWORD") # Clients configuration via JSON string in .env: {"IP": "NAME"} CLIENTS_RAW = get_env_or_fail("CLIENTS") CLIENTS = json.loads(CLIENTS_RAW) # --- Home Assistant Configuration --- HASS_URL = get_env_or_fail("HASS_URL") HASS_TOKEN = get_env_or_fail("HASS_TOKEN") HASS_HEADERS = { "Authorization": f"Bearer {HASS_TOKEN}", "Content-Type": "application/json", } # Optional Notification Overrides HASS_TITLE_TEMPLATE = os.getenv("HASS_TITLE_TEMPLATE", "{event_type}") HASS_MSG_TEMPLATE = os.getenv("HASS_MSG_TEMPLATE", "{client_name} tried to access \"{host}\"") # --- Custom Domain List --- CUSTOM_DOMAINS = [] NEGATIVE_FILTERS = [] list_file = "list.txt" if os.path.exists("list.txt") else "list.example.txt" if os.path.exists(list_file): with open(list_file, "r", encoding="utf-8") as f: for line in f: line = line.strip().lower() if not line or line.startswith("#"): continue if line.startswith("!"): NEGATIVE_FILTERS.append(line[1:]) else: CUSTOM_DOMAINS.append(line) status_msg = f"Loaded {len(CUSTOM_DOMAINS)} custom domains and {len(NEGATIVE_FILTERS)} negative filters from {list_file}" if list_file == "list.example.txt" and not os.path.exists("list.txt"): status_msg += " (FALLBACK)" print(status_msg) else: print("Note: No domain list found. Create list.txt or list.example.txt to monitor custom domains.") except (EnvironmentError, json.JSONDecodeError) as e: print(f"Error during startup: {e}") exit(1) def notify_hass(client_name, host, reason, event_type="🛡️ Adguard Blocked"): # Format message and title using templates (supporting simple {placeholders}) try: msg = HASS_MSG_TEMPLATE.format(client_name=client_name, host=host, reason=reason, event_type=event_type) title = HASS_TITLE_TEMPLATE.format(client_name=client_name, host=host, reason=reason, event_type=event_type) except KeyError as e: logger.error(f"Template error: Missing key {e}. Falling back to defaults.") msg = f"{client_name} tried to access \"{host}\"" title = event_type payload = { "title": title, "message": msg, "data": { "push": { "sound": "default", "badge": 1, }, "icon_url": "https://adguard.com/favicon.ico", } } try: requests.post(HASS_URL, headers=HASS_HEADERS, json=payload, timeout=5) except Exception as e: logger.error(f"Failed to send HASS notification: {e}") # --- Logging Setup --- logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler("adguard_monitor.log", encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # --- State Management --- class MonitorStats: def __init__(self): self.api_calls = 0 self.total_blocked = 0 self.last_log_id = None self.current_limit = 150 self.last_notified_key = None # To prevent double notifications def log_api_call(self): self.api_calls += 1 def log_blocked(self): self.total_blocked += 1 def get_summary(self): return f"API Polls: {self.api_calls} | Total Blocked: {self.total_blocked} | Current Limit: {self.current_limit}" stats = MonitorStats() def get_auth_session(): session = requests.Session() session.auth = HTTPBasicAuth(USERNAME, PASSWORD) return session def hour_and_minute(): now = datetime.now() return now.strftime("%H:%M") def is_custom_match(host): # Check negative filters first for pattern in NEGATIVE_FILTERS: if fnmatch.fnmatchcase(host.lower(), pattern.lower()): return False # Check custom domains for pattern in CUSTOM_DOMAINS: if fnmatch.fnmatchcase(host.lower(), pattern.lower()): return True return False def fetch_and_analyze(session, verbose=False): stats.log_api_call() try: response = session.get(QUERY_LOG_URL, params={'limit': stats.current_limit}, timeout=10) response.raise_for_status() data = response.json() logs = data.get('data', []) # Log pull event logger.info(f"Pulled {len(logs)} entries from API") if not logs: return # Process logs in reverse order (oldest to newest) to maintain sequence new_items_processed = 0 for log in reversed(logs): log_id = log.get('time') if stats.last_log_id and log_id <= stats.last_log_id: continue client_ip = log.get('client') if client_ip not in CLIENTS: continue stats.last_log_id = log_id new_items_processed += 1 reason = log.get('reason') host = log.get('question', {}).get('name') client_name = CLIENTS[client_ip] if verbose: logger.info(f"[{client_name}] Query: {host} | Reason: {reason}") is_blocked = reason in ["FilteredBlackList", "FilteredParental", "FilteredSafeBrowsing"] is_custom = is_custom_match(host) if is_blocked or is_custom: # Check for duplicate notifications within the same "event" # Using host + client_ip as a unique key for the last notification event_label = "BLOCKED" if is_blocked else "CUSTOM_MATCH" notification_key = f"{client_ip}:{host}:{reason}:{hour_and_minute()}" if stats.last_notified_key == notification_key: continue stats.log_blocked() default_title = "🛡️ Blocked" if is_blocked else "⚠️ Custom" logger.warning(f"{event_label}: {client_name} -> {host} ({reason if is_blocked else 'Manual match'})") # Pass details for templating notify_hass( client_name=client_name, host=host, reason=reason if is_blocked else "Custom List", event_type=default_title ) stats.last_notified_key = notification_key # Dynamic limit adjustment if new_items_processed == 0 and stats.current_limit < 350: stats.current_limit = min(350, stats.current_limit + 50) logger.info(f"No relevant client traffic found. Increasing pull limit to {stats.current_limit}") elif new_items_processed > 0 and stats.current_limit > 150: stats.current_limit = 150 logger.info(f"Client traffic found. Resetting pull limit to {stats.current_limit}") logger.info(f"Analyzed {new_items_processed} new entries since last pull.") logger.debug(f"Status Update: {stats.get_summary()}") except requests.exceptions.RequestException as e: logger.error(f"API Error: {e}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="AdGuard Monitor Service") parser.add_argument("--interval", type=int, default=int(os.getenv("MONITOR_INTERVAL", 15)), help="Update interval in seconds") parser.add_argument("--verbose", action="store_true", help="Log all queries for monitored clients") parser.add_argument("--test", action="store_true", help="Send a fake test notification and exit") args = parser.parse_args() # Update interval from environment if provided as override interval = int(os.getenv("MONITOR_INTERVAL", args.interval)) if args.test: import random test_ip, test_name = random.choice(list(CLIENTS.items())) test_host = "test-domain.com" test_reason = "ManualTest" logger.info(f"Sending test notification for {test_name} ({test_ip})...") notify_hass(test_name, test_host, test_reason, "🛠️ AdGuard Monitor Test") logger.info("Test notification sent. Exiting.") exit(0) logger.info(f"AdGuard Monitor Service Started. Interval: {interval}s") logger.info(f"Monitoring {len(CLIENTS)} clients: {', '.join([f'{name} ({ip})' for ip, name in CLIENTS.items()])}") session = get_auth_session() while True: fetch_and_analyze(session, verbose=args.verbose) time.sleep(interval)