252 lines
9.1 KiB
Python
252 lines
9.1 KiB
Python
import logging
|
|
import requests
|
|
import json
|
|
import time
|
|
import argparse
|
|
import os
|
|
import fnmatch
|
|
from dotenv import load_dotenv
|
|
from requests.auth import HTTPBasicAuth
|
|
from datetime import datetime
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
def get_env_or_fail(key):
|
|
value = os.getenv(key)
|
|
if not value or value.strip() == "":
|
|
raise EnvironmentError(f"CRITICAL: Missing required environment variable: {key}")
|
|
return value
|
|
|
|
# --- Configuration ---
|
|
try:
|
|
ADGUARD_URL = get_env_or_fail("ADGUARD_URL")
|
|
QUERY_LOG_URL = f"{ADGUARD_URL}/control/querylog"
|
|
USERNAME = get_env_or_fail("ADGUARD_USER")
|
|
PASSWORD = get_env_or_fail("ADGUARD_PASSWORD")
|
|
|
|
# Clients configuration via JSON string in .env: {"IP": "NAME"}
|
|
CLIENTS_RAW = get_env_or_fail("CLIENTS")
|
|
CLIENTS = json.loads(CLIENTS_RAW)
|
|
|
|
# --- Home Assistant Configuration ---
|
|
HASS_URL = get_env_or_fail("HASS_URL")
|
|
HASS_TOKEN = get_env_or_fail("HASS_TOKEN")
|
|
HASS_HEADERS = {
|
|
"Authorization": f"Bearer {HASS_TOKEN}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
# Optional Notification Overrides
|
|
HASS_TITLE_TEMPLATE = os.getenv("HASS_TITLE_TEMPLATE", "{event_type}")
|
|
HASS_MSG_TEMPLATE = os.getenv("HASS_MSG_TEMPLATE", "{client_name} tried to access \"{host}\"")
|
|
|
|
# --- Custom Domain List ---
|
|
CUSTOM_DOMAINS = []
|
|
NEGATIVE_FILTERS = []
|
|
|
|
list_file = "list.txt" if os.path.exists("list.txt") else "list.example.txt"
|
|
|
|
if os.path.exists(list_file):
|
|
with open(list_file, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip().lower()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
if line.startswith("!"):
|
|
NEGATIVE_FILTERS.append(line[1:])
|
|
else:
|
|
CUSTOM_DOMAINS.append(line)
|
|
|
|
status_msg = f"Loaded {len(CUSTOM_DOMAINS)} custom domains and {len(NEGATIVE_FILTERS)} negative filters from {list_file}"
|
|
if list_file == "list.example.txt" and not os.path.exists("list.txt"):
|
|
status_msg += " (FALLBACK)"
|
|
print(status_msg)
|
|
else:
|
|
print("Note: No domain list found. Create list.txt or list.example.txt to monitor custom domains.")
|
|
|
|
except (EnvironmentError, json.JSONDecodeError) as e:
|
|
print(f"Error during startup: {e}")
|
|
exit(1)
|
|
|
|
def notify_hass(client_name, host, reason, event_type="🛡️ Adguard Blocked"):
|
|
# Format message and title using templates (supporting simple {placeholders})
|
|
try:
|
|
msg = HASS_MSG_TEMPLATE.format(client_name=client_name, host=host, reason=reason, event_type=event_type)
|
|
title = HASS_TITLE_TEMPLATE.format(client_name=client_name, host=host, reason=reason, event_type=event_type)
|
|
except KeyError as e:
|
|
logger.error(f"Template error: Missing key {e}. Falling back to defaults.")
|
|
msg = f"{client_name} tried to access \"{host}\""
|
|
title = event_type
|
|
|
|
payload = {
|
|
"title": title,
|
|
"message": msg,
|
|
"data": {
|
|
"push": {
|
|
"sound": "default",
|
|
"badge": 1,
|
|
},
|
|
"icon_url": "https://adguard.com/favicon.ico",
|
|
}
|
|
}
|
|
try:
|
|
requests.post(HASS_URL, headers=HASS_HEADERS, json=payload, timeout=5)
|
|
except Exception as e:
|
|
logger.error(f"Failed to send HASS notification: {e}")
|
|
|
|
|
|
# --- Logging Setup ---
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s [%(levelname)s] %(message)s',
|
|
handlers=[
|
|
logging.FileHandler("adguard_monitor.log", encoding='utf-8'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- State Management ---
|
|
class MonitorStats:
|
|
def __init__(self):
|
|
self.api_calls = 0
|
|
self.total_blocked = 0
|
|
self.last_log_id = None
|
|
self.current_limit = 150
|
|
self.last_notified_key = None # To prevent double notifications
|
|
|
|
def log_api_call(self):
|
|
self.api_calls += 1
|
|
|
|
def log_blocked(self):
|
|
self.total_blocked += 1
|
|
|
|
def get_summary(self):
|
|
return f"API Polls: {self.api_calls} | Total Blocked: {self.total_blocked} | Current Limit: {self.current_limit}"
|
|
|
|
stats = MonitorStats()
|
|
|
|
def get_auth_session():
|
|
session = requests.Session()
|
|
session.auth = HTTPBasicAuth(USERNAME, PASSWORD)
|
|
return session
|
|
|
|
def hour_and_minute():
|
|
now = datetime.now()
|
|
return now.strftime("%H:%M")
|
|
|
|
def is_custom_match(host):
|
|
# Check negative filters first
|
|
for pattern in NEGATIVE_FILTERS:
|
|
if fnmatch.fnmatchcase(host.lower(), pattern.lower()):
|
|
return False
|
|
|
|
# Check custom domains
|
|
for pattern in CUSTOM_DOMAINS:
|
|
if fnmatch.fnmatchcase(host.lower(), pattern.lower()):
|
|
return True
|
|
return False
|
|
|
|
def fetch_and_analyze(session, verbose=False):
|
|
stats.log_api_call()
|
|
try:
|
|
response = session.get(QUERY_LOG_URL, params={'limit': stats.current_limit}, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
logs = data.get('data', [])
|
|
|
|
# Log pull event
|
|
logger.info(f"Pulled {len(logs)} entries from API")
|
|
|
|
if not logs:
|
|
return
|
|
|
|
# Process logs in reverse order (oldest to newest) to maintain sequence
|
|
new_items_processed = 0
|
|
for log in reversed(logs):
|
|
log_id = log.get('time')
|
|
if stats.last_log_id and log_id <= stats.last_log_id:
|
|
continue
|
|
|
|
client_ip = log.get('client')
|
|
if client_ip not in CLIENTS:
|
|
continue
|
|
|
|
stats.last_log_id = log_id
|
|
new_items_processed += 1
|
|
|
|
reason = log.get('reason')
|
|
host = log.get('question', {}).get('name')
|
|
client_name = CLIENTS[client_ip]
|
|
|
|
if verbose:
|
|
logger.info(f"[{client_name}] Query: {host} | Reason: {reason}")
|
|
|
|
is_blocked = reason in ["FilteredBlackList", "FilteredParental", "FilteredSafeBrowsing"]
|
|
is_custom = is_custom_match(host)
|
|
|
|
if is_blocked or is_custom:
|
|
# Check for duplicate notifications within the same "event"
|
|
# Using host + client_ip as a unique key for the last notification
|
|
event_label = "BLOCKED" if is_blocked else "CUSTOM_MATCH"
|
|
notification_key = f"{client_ip}:{host}:{reason}:{hour_and_minute()}"
|
|
if stats.last_notified_key == notification_key:
|
|
continue
|
|
|
|
stats.log_blocked()
|
|
default_title = "🛡️ Blocked" if is_blocked else "⚠️ Custom"
|
|
logger.warning(f"{event_label}: {client_name} -> {host} ({reason if is_blocked else 'Manual match'})")
|
|
|
|
# Pass details for templating
|
|
notify_hass(
|
|
client_name=client_name,
|
|
host=host,
|
|
reason=reason if is_blocked else "Custom List",
|
|
event_type=default_title
|
|
)
|
|
stats.last_notified_key = notification_key
|
|
|
|
# Dynamic limit adjustment
|
|
if new_items_processed == 0 and stats.current_limit < 350:
|
|
stats.current_limit = min(350, stats.current_limit + 50)
|
|
logger.info(f"No relevant client traffic found. Increasing pull limit to {stats.current_limit}")
|
|
elif new_items_processed > 0 and stats.current_limit > 150:
|
|
stats.current_limit = 150
|
|
logger.info(f"Client traffic found. Resetting pull limit to {stats.current_limit}")
|
|
|
|
logger.info(f"Analyzed {new_items_processed} new entries since last pull.")
|
|
logger.debug(f"Status Update: {stats.get_summary()}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"API Error: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="AdGuard Monitor Service")
|
|
parser.add_argument("--interval", type=int, default=int(os.getenv("MONITOR_INTERVAL", 15)), help="Update interval in seconds")
|
|
parser.add_argument("--verbose", action="store_true", help="Log all queries for monitored clients")
|
|
parser.add_argument("--test", action="store_true", help="Send a fake test notification and exit")
|
|
args = parser.parse_args()
|
|
|
|
# Update interval from environment if provided as override
|
|
interval = int(os.getenv("MONITOR_INTERVAL", args.interval))
|
|
|
|
if args.test:
|
|
import random
|
|
test_ip, test_name = random.choice(list(CLIENTS.items()))
|
|
test_host = "test-domain.com"
|
|
test_reason = "ManualTest"
|
|
logger.info(f"Sending test notification for {test_name} ({test_ip})...")
|
|
notify_hass(test_name, test_host, test_reason, "🛠️ AdGuard Monitor Test")
|
|
logger.info("Test notification sent. Exiting.")
|
|
exit(0)
|
|
|
|
logger.info(f"AdGuard Monitor Service Started. Interval: {interval}s")
|
|
logger.info(f"Monitoring {len(CLIENTS)} clients: {', '.join([f'{name} ({ip})' for ip, name in CLIENTS.items()])}")
|
|
session = get_auth_session()
|
|
|
|
while True:
|
|
fetch_and_analyze(session, verbose=args.verbose)
|
|
time.sleep(interval)
|