import json import os import re import time import uuid from _db_com import database RATE_LIMIT_FILE = "/app/ratelimit.json" RATE_LIMIT_WINDOW_SECONDS = 60 * 60 RATE_LIMIT_MAX_REQUESTS = 5 ALLOWED_ORIGINS = { "https://luna.reversed.dev", "http://localhost:5173", "http://localhost:4173", } EMAIL_RE = re.compile(r"^[^\s@]+@[^\s@]+\.[^\s@]+$") def _cors_headers(origin=""): allowed_origin = origin if origin in ALLOWED_ORIGINS else "https://luna.reversed.dev" return { "Access-Control-Allow-Origin": allowed_origin, "Access-Control-Allow-Methods": "POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type", "Access-Control-Max-Age": "86400", "Vary": "Origin", "Content-Type": "application/json", } def _response(origin, status_code, payload): return { "_shsf": "v2", "_code": status_code, "_headers": _cors_headers(origin), "_res": payload, } def _load_rate_limits(): if not os.path.exists(RATE_LIMIT_FILE): return {} try: with open(RATE_LIMIT_FILE, "r", encoding="utf-8") as file: data = json.load(file) return data if isinstance(data, dict) else {} except (OSError, json.JSONDecodeError): return {} def _save_rate_limits(rate_limits): os.makedirs(os.path.dirname(RATE_LIMIT_FILE), exist_ok=True) temp_path = f"{RATE_LIMIT_FILE}.tmp" with open(temp_path, "w", encoding="utf-8") as file: json.dump(rate_limits, file) os.replace(temp_path, RATE_LIMIT_FILE) def _extract_ip(args): headers = args.get("headers", {}) or {} forwarded = headers.get("x-forwarded-for", "") if forwarded: return forwarded.split(",")[0].strip() for header in ("cf-connecting-ip", "x-real-ip"): value = headers.get(header, "").strip() if value: return value return "unknown" def _validate_payload(payload): username = str(payload.get("username", "")).strip() email = str(payload.get("email", "")).strip() message = str(payload.get("message", "")).strip() if len(username) < 2 or len(username) > 80: return None, "Username must be between 2 and 80 characters." if len(email) < 3 or len(email) > 200 or not EMAIL_RE.match(email): return None, "Please provide a valid email address." if len(message) < 10 or len(message) > 5000: return None, "Message must be between 10 and 5000 characters." return { "username": username, "email": email, "message": message, }, None def _check_rate_limit(ip_address, now): rate_limits = _load_rate_limits() fresh_after = now - RATE_LIMIT_WINDOW_SECONDS cleaned = { ip: timestamps for ip, timestamps in rate_limits.items() if isinstance(timestamps, list) for timestamps in [[ts for ts in timestamps if isinstance(ts, (int, float)) and ts >= fresh_after]] if timestamps } ip_timestamps = cleaned.get(ip_address, []) if len(ip_timestamps) >= RATE_LIMIT_MAX_REQUESTS: retry_after = max(1, int(RATE_LIMIT_WINDOW_SECONDS - (now - ip_timestamps[0]))) _save_rate_limits(cleaned) return False, retry_after ip_timestamps.append(now) cleaned[ip_address] = ip_timestamps _save_rate_limits(cleaned) return True, None def main(args): origin = (args.get("headers", {}) or {}).get("origin", "") method = str(args.get("method", "POST")).upper() if method == "OPTIONS": return _response(origin, 204, "") if method != "POST": return _response(origin, 405, {"ok": False, "error": "Method not allowed."}) raw_body = args.get("body", "{}") try: payload = raw_body if isinstance(raw_body, dict) else json.loads(raw_body) if not isinstance(payload, dict): raise ValueError("JSON body must be an object") except (TypeError, json.JSONDecodeError, ValueError): return _response(origin, 400, {"ok": False, "error": "Invalid JSON payload."}) if not payload and all(key in args for key in ("username", "email", "message")): payload = { "username": args.get("username", ""), "email": args.get("email", ""), "message": args.get("message", ""), } valid_payload, error_message = _validate_payload(payload) if error_message: return _response(origin, 400, {"ok": False, "error": error_message}) ip_address = _extract_ip(args) now = int(time.time()) allowed, retry_after = _check_rate_limit(ip_address, now) if not allowed: return { "_shsf": "v2", "_code": 429, "_headers": { **_cors_headers(origin), "Retry-After": str(retry_after), }, "_res": { "ok": False, "error": "Too many messages from this IP. Please try again later.", }, } db = database() try: db.create_storage("portfolio_contact_messages", purpose="Portfolio contact form submissions") except Exception: pass submission_id = f"message_{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}" record = { "id": submission_id, "username": valid_payload["username"], "email": valid_payload["email"], "message": valid_payload["message"], "ip": ip_address, "user_agent": (args.get("headers", {}) or {}).get("user-agent", ""), "created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)), } db.set("portfolio_contact_messages", submission_id, json.dumps(record)) return _response(origin, 201, {"ok": True, "message": "Message received.", "id": submission_id})