Files
adguard-monitor/monitor.py
2026-03-12 16:09:56 +01:00

227 lines
8.0 KiB
Python

import logging
import requests
import json
import time
import argparse
import os
import fnmatch
from dotenv import load_dotenv
from requests.auth import HTTPBasicAuth
from datetime import datetime
# Load environment variables
load_dotenv()
def get_env_or_fail(key):
value = os.getenv(key)
if not value or value.strip() == "":
raise EnvironmentError(f"CRITICAL: Missing required environment variable: {key}")
return value
# --- Configuration ---
try:
ADGUARD_URL = get_env_or_fail("ADGUARD_URL")
QUERY_LOG_URL = f"{ADGUARD_URL}/control/querylog"
USERNAME = get_env_or_fail("ADGUARD_USER")
PASSWORD = get_env_or_fail("ADGUARD_PASSWORD")
# Clients configuration via JSON string in .env: {"IP": "NAME"}
CLIENTS_RAW = get_env_or_fail("CLIENTS")
CLIENTS = json.loads(CLIENTS_RAW)
# --- Home Assistant Configuration ---
HASS_URL = get_env_or_fail("HASS_URL")
HASS_TOKEN = get_env_or_fail("HASS_TOKEN")
HASS_HEADERS = {
"Authorization": f"Bearer {HASS_TOKEN}",
"Content-Type": "application/json",
}
# --- Custom Domain List ---
CUSTOM_DOMAINS = []
NEGATIVE_FILTERS = []
list_file = "list.txt" if os.path.exists("list.txt") else "list.example.txt"
if os.path.exists(list_file):
with open(list_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip().lower()
if not line or line.startswith("#"):
continue
if line.startswith("!"):
NEGATIVE_FILTERS.append(line[1:])
else:
CUSTOM_DOMAINS.append(line)
status_msg = f"Loaded {len(CUSTOM_DOMAINS)} custom domains and {len(NEGATIVE_FILTERS)} negative filters from {list_file}"
if list_file == "list.example.txt" and not os.path.exists("list.txt"):
status_msg += " (FALLBACK)"
print(status_msg)
else:
print("Note: No domain list found. Create list.txt or list.example.txt to monitor custom domains.")
except (EnvironmentError, json.JSONDecodeError) as e:
print(f"Error during startup: {e}")
exit(1)
def notify_hass(client_name, host, reason, title="🛡️ Adguard Blocked a Request"):
msg = f"{client_name} tried to access \"{host}\""
payload = {
"message": msg,
"title": title,
"data": {
"push": {
"sound": "default",
"badge": 1,
},
"icon_url": "https://adguard.com/favicon.ico",
}
}
try:
requests.post(HASS_URL, headers=HASS_HEADERS, json=payload, timeout=5)
except Exception as e:
logger.error(f"Failed to send HASS notification: {e}")
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler("adguard_monitor.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# --- State Management ---
class MonitorStats:
def __init__(self):
self.api_calls = 0
self.total_blocked = 0
self.last_log_id = None
self.current_limit = 150
self.last_notified_key = None # To prevent double notifications
def log_api_call(self):
self.api_calls += 1
def log_blocked(self):
self.total_blocked += 1
def get_summary(self):
return f"API Polls: {self.api_calls} | Total Blocked: {self.total_blocked} | Current Limit: {self.current_limit}"
stats = MonitorStats()
def get_auth_session():
session = requests.Session()
session.auth = HTTPBasicAuth(USERNAME, PASSWORD)
return session
def is_custom_match(host):
# Check negative filters first
for pattern in NEGATIVE_FILTERS:
if fnmatch.fnmatchcase(host.lower(), pattern.lower()):
return False
# Check custom domains
for pattern in CUSTOM_DOMAINS:
if fnmatch.fnmatchcase(host.lower(), pattern.lower()):
return True
return False
def fetch_and_analyze(session, verbose=False):
stats.log_api_call()
try:
response = session.get(QUERY_LOG_URL, params={'limit': stats.current_limit}, timeout=10)
response.raise_for_status()
data = response.json()
logs = data.get('data', [])
# Log pull event
logger.info(f"Pulled {len(logs)} entries from API")
if not logs:
return
# Process logs in reverse order (oldest to newest) to maintain sequence
new_items_processed = 0
for log in reversed(logs):
log_id = log.get('time')
if stats.last_log_id and log_id <= stats.last_log_id:
continue
client_ip = log.get('client')
if client_ip not in CLIENTS:
continue
stats.last_log_id = log_id
new_items_processed += 1
reason = log.get('reason')
host = log.get('question', {}).get('name')
client_name = CLIENTS[client_ip]
if verbose:
logger.info(f"[{client_name}] Query: {host} | Reason: {reason}")
is_blocked = reason in ["FilteredBlackList", "FilteredParental", "FilteredSafeBrowsing"]
is_custom = is_custom_match(host)
if is_blocked or is_custom:
# Check for duplicate notifications within the same "event"
# Using host + client_ip as a unique key for the last notification
event_type = "BLOCKED" if is_blocked else "CUSTOM_MATCH"
notification_key = f"{client_ip}:{host}:{reason}:{hour_and_minute()}"
if stats.last_notified_key == notification_key:
continue
stats.log_blocked()
title = "🛡️ Adguard Blocked a Request" if is_blocked else "⚠️ Custom Domain Match"
logger.warning(f"{event_type}: {client_name} -> {host} ({reason if is_blocked else 'Manual match'})")
notify_hass(client_name, host, reason if is_blocked else "Custom List", title=title)
stats.last_notified_key = notification_key
# Dynamic limit adjustment
if new_items_processed == 0 and stats.current_limit < 350:
stats.current_limit = min(350, stats.current_limit + 50)
logger.info(f"No relevant client traffic found. Increasing pull limit to {stats.current_limit}")
elif new_items_processed > 0 and stats.current_limit > 150:
stats.current_limit = 150
logger.info(f"Client traffic found. Resetting pull limit to {stats.current_limit}")
logger.info(f"Analyzed {new_items_processed} new entries since last pull.")
logger.debug(f"Status Update: {stats.get_summary()}")
except requests.exceptions.RequestException as e:
logger.error(f"API Error: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="AdGuard Monitor Service")
parser.add_argument("--interval", type=int, default=15, help="Update interval in seconds")
parser.add_argument("--verbose", action="store_true", help="Log all queries for monitored clients")
parser.add_argument("--test", action="store_true", help="Send a fake test notification and exit")
args = parser.parse_args()
if args.test:
import random
test_ip, test_name = random.choice(list(CLIENTS.items()))
test_host = "test-domain.com"
test_reason = "ManualTest"
logger.info(f"Sending test notification for {test_name} ({test_ip})...")
notify_hass(test_name, test_host, test_reason, "🛠️ AdGuard Monitor Test")
logger.info("Test notification sent. Exiting.")
exit(0)
logger.info(f"AdGuard Monitor Service Started. Interval: {args.interval}s")
logger.info(f"Monitoring {len(CLIENTS)} clients: {', '.join([f'{name} ({ip})' for ip, name in CLIENTS.items()])}")
session = get_auth_session()
while True:
fetch_and_analyze(session, verbose=args.verbose)
time.sleep(args.interval)