diff --git a/backend/main.py b/backend/main.py index afe4ef8..0026039 100644 --- a/backend/main.py +++ b/backend/main.py @@ -7,6 +7,7 @@ from datetime import datetime from contextlib import contextmanager from pathlib import Path from typing import Optional +from urllib.parse import urlparse import bcrypt import pymysql @@ -24,6 +25,12 @@ SESSION_COOKIE_SECURE = os.getenv("SESSION_COOKIE_SECURE", "false").lower() in ( LOGIN_MAX_ATTEMPTS = int(os.getenv("LOGIN_MAX_ATTEMPTS", "5")) LOGIN_WINDOW_SECONDS = int(os.getenv("LOGIN_WINDOW_SECONDS", "300")) LOGIN_LOCKOUT_SECONDS = int(os.getenv("LOGIN_LOCKOUT_SECONDS", "900")) +MAX_NAME_LEN = int(os.getenv("MAX_NAME_LEN", "255")) +MAX_CATEGORY_LEN = int(os.getenv("MAX_CATEGORY_LEN", "255")) +MAX_DESCRIPTION_LEN = int(os.getenv("MAX_DESCRIPTION_LEN", "2000")) +MAX_ICON_URL_LEN = int(os.getenv("MAX_ICON_URL_LEN", "2048")) +MAX_ICON_BYTES = int(os.getenv("MAX_ICON_BYTES", str(2 * 1024 * 1024))) +ALLOWED_ICON_MIME = {"image/png", "image/jpeg", "image/webp", "image/gif", "image/svg+xml", "image/x-icon"} DB_HOST = os.getenv("DB_HOST", "mariadb") DB_PORT = int(os.getenv("DB_PORT", "3306")) DB_USER = os.getenv("DB_USER", "jellomator") @@ -216,6 +223,38 @@ def link_name_exists(conn, name: str, *, exclude_id: int | None = None) -> bool: return cur.fetchone() is not None +def validate_http_url(value: str, field_name: str = "url") -> None: + parsed = urlparse((value or "").strip()) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise HTTPException(422, f"{field_name} must be a valid http(s) URL") + + +def validate_length(value: str | None, limit: int, field_name: str) -> None: + if value is not None and len(value) > limit: + raise HTTPException(422, f"{field_name} exceeds max length of {limit}") + + +def validate_link_payload(name: str, url: str, description: str, category: str, icon_url: str | None) -> None: + validate_length(name, MAX_NAME_LEN, "name") + validate_length(description, MAX_DESCRIPTION_LEN, "description") + validate_length(category, MAX_CATEGORY_LEN, "category") + if icon_url: + validate_length(icon_url, MAX_ICON_URL_LEN, "icon_url") + validate_http_url(icon_url, "icon_url") + validate_http_url(url, "url") + + +def read_icon_blob(icon: UploadFile | None) -> tuple[bytes | None, str | None]: + if not icon: + return None, None + if icon.content_type not in ALLOWED_ICON_MIME: + raise HTTPException(422, "Unsupported icon file type") + blob = icon.file.read(MAX_ICON_BYTES + 1) + if len(blob) > MAX_ICON_BYTES: + raise HTTPException(422, f"Icon exceeds max size of {MAX_ICON_BYTES} bytes") + return blob, icon.content_type + + @app.get("/api/me") def me(request: Request): with db() as c: @@ -331,13 +370,11 @@ def create_link( icon: UploadFile | None = File(None), ): require_admin(request) + validate_link_payload(name, url, description, category, icon_url) with db() as c: if link_name_exists(c, name): raise HTTPException(409, "Link name already exists") - icon_blob = icon_mime = None - if icon: - icon_blob = icon.file.read() - icon_mime = icon.content_type + icon_blob, icon_mime = read_icon_blob(icon) now = datetime.utcnow().isoformat() with db() as c: with c.cursor() as cur: @@ -382,10 +419,8 @@ def update_link( icon: UploadFile | None = File(None), ): require_admin(request) - icon_blob = icon_mime = None - if icon: - icon_blob = icon.file.read() - icon_mime = icon.content_type + validate_link_payload(name, url, description, category, icon_url) + icon_blob, icon_mime = read_icon_blob(icon) now = datetime.utcnow().isoformat() with db() as c: if link_name_exists(c, name, exclude_id=link_id):