Add SHSF-backed contact form

This commit is contained in:
2026-04-14 18:43:26 +02:00
parent 4107c407f5
commit e47c51ae29
4 changed files with 364 additions and 32 deletions

180
shsf/contact-api/main.py Normal file
View File

@@ -0,0 +1,180 @@
import json
import os
import re
import time
import uuid
from _db_com import database
RATE_LIMIT_FILE = "/app/ratelimit.json"
RATE_LIMIT_WINDOW_SECONDS = 60 * 60
RATE_LIMIT_MAX_REQUESTS = 5
ALLOWED_ORIGINS = {
"https://luna.reversed.dev",
"http://localhost:5173",
"http://localhost:4173",
}
EMAIL_RE = re.compile(r"^[^\s@]+@[^\s@]+\.[^\s@]+$")
def _cors_headers(origin=""):
allowed_origin = origin if origin in ALLOWED_ORIGINS else "https://luna.reversed.dev"
return {
"Access-Control-Allow-Origin": allowed_origin,
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Max-Age": "86400",
"Vary": "Origin",
"Content-Type": "application/json",
}
def _response(origin, status_code, payload):
return {
"_shsf": "v2",
"_code": status_code,
"_headers": _cors_headers(origin),
"_res": payload,
}
def _load_rate_limits():
if not os.path.exists(RATE_LIMIT_FILE):
return {}
try:
with open(RATE_LIMIT_FILE, "r", encoding="utf-8") as file:
data = json.load(file)
return data if isinstance(data, dict) else {}
except (OSError, json.JSONDecodeError):
return {}
def _save_rate_limits(rate_limits):
os.makedirs(os.path.dirname(RATE_LIMIT_FILE), exist_ok=True)
temp_path = f"{RATE_LIMIT_FILE}.tmp"
with open(temp_path, "w", encoding="utf-8") as file:
json.dump(rate_limits, file)
os.replace(temp_path, RATE_LIMIT_FILE)
def _extract_ip(args):
headers = args.get("headers", {}) or {}
forwarded = headers.get("x-forwarded-for", "")
if forwarded:
return forwarded.split(",")[0].strip()
for header in ("cf-connecting-ip", "x-real-ip"):
value = headers.get(header, "").strip()
if value:
return value
return "unknown"
def _validate_payload(payload):
username = str(payload.get("username", "")).strip()
email = str(payload.get("email", "")).strip()
message = str(payload.get("message", "")).strip()
if len(username) < 2 or len(username) > 80:
return None, "Username must be between 2 and 80 characters."
if len(email) < 3 or len(email) > 200 or not EMAIL_RE.match(email):
return None, "Please provide a valid email address."
if len(message) < 10 or len(message) > 5000:
return None, "Message must be between 10 and 5000 characters."
return {
"username": username,
"email": email,
"message": message,
}, None
def _check_rate_limit(ip_address, now):
rate_limits = _load_rate_limits()
fresh_after = now - RATE_LIMIT_WINDOW_SECONDS
cleaned = {
ip: timestamps
for ip, timestamps in rate_limits.items()
if isinstance(timestamps, list)
for timestamps in [[ts for ts in timestamps if isinstance(ts, (int, float)) and ts >= fresh_after]]
if timestamps
}
ip_timestamps = cleaned.get(ip_address, [])
if len(ip_timestamps) >= RATE_LIMIT_MAX_REQUESTS:
retry_after = max(1, int(RATE_LIMIT_WINDOW_SECONDS - (now - ip_timestamps[0])))
_save_rate_limits(cleaned)
return False, retry_after
ip_timestamps.append(now)
cleaned[ip_address] = ip_timestamps
_save_rate_limits(cleaned)
return True, None
def main(args):
origin = (args.get("headers", {}) or {}).get("origin", "")
method = str(args.get("method", "POST")).upper()
if method == "OPTIONS":
return _response(origin, 204, "")
if method != "POST":
return _response(origin, 405, {"ok": False, "error": "Method not allowed."})
raw_body = args.get("body", "{}")
try:
payload = raw_body if isinstance(raw_body, dict) else json.loads(raw_body)
if not isinstance(payload, dict):
raise ValueError("JSON body must be an object")
except (TypeError, json.JSONDecodeError, ValueError):
return _response(origin, 400, {"ok": False, "error": "Invalid JSON payload."})
if not payload and all(key in args for key in ("username", "email", "message")):
payload = {
"username": args.get("username", ""),
"email": args.get("email", ""),
"message": args.get("message", ""),
}
valid_payload, error_message = _validate_payload(payload)
if error_message:
return _response(origin, 400, {"ok": False, "error": error_message})
ip_address = _extract_ip(args)
now = int(time.time())
allowed, retry_after = _check_rate_limit(ip_address, now)
if not allowed:
return {
"_shsf": "v2",
"_code": 429,
"_headers": {
**_cors_headers(origin),
"Retry-After": str(retry_after),
},
"_res": {
"ok": False,
"error": "Too many messages from this IP. Please try again later.",
},
}
db = database()
try:
db.create_storage("portfolio_contact_messages", purpose="Portfolio contact form submissions")
except Exception:
pass
submission_id = f"message_{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}"
record = {
"id": submission_id,
"username": valid_payload["username"],
"email": valid_payload["email"],
"message": valid_payload["message"],
"ip": ip_address,
"user_agent": (args.get("headers", {}) or {}).get("user-agent", ""),
"created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)),
}
db.set("portfolio_contact_messages", submission_id, json.dumps(record))
return _response(origin, 201, {"ok": True, "message": "Message received.", "id": submission_id})

View File

@@ -0,0 +1 @@
requests==2.31.0