Validate link payloads and icon uploads
This commit is contained in:
@@ -7,6 +7,7 @@ from datetime import datetime
|
|||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
import bcrypt
|
import bcrypt
|
||||||
import pymysql
|
import pymysql
|
||||||
@@ -24,6 +25,12 @@ SESSION_COOKIE_SECURE = os.getenv("SESSION_COOKIE_SECURE", "false").lower() in (
|
|||||||
LOGIN_MAX_ATTEMPTS = int(os.getenv("LOGIN_MAX_ATTEMPTS", "5"))
|
LOGIN_MAX_ATTEMPTS = int(os.getenv("LOGIN_MAX_ATTEMPTS", "5"))
|
||||||
LOGIN_WINDOW_SECONDS = int(os.getenv("LOGIN_WINDOW_SECONDS", "300"))
|
LOGIN_WINDOW_SECONDS = int(os.getenv("LOGIN_WINDOW_SECONDS", "300"))
|
||||||
LOGIN_LOCKOUT_SECONDS = int(os.getenv("LOGIN_LOCKOUT_SECONDS", "900"))
|
LOGIN_LOCKOUT_SECONDS = int(os.getenv("LOGIN_LOCKOUT_SECONDS", "900"))
|
||||||
|
MAX_NAME_LEN = int(os.getenv("MAX_NAME_LEN", "255"))
|
||||||
|
MAX_CATEGORY_LEN = int(os.getenv("MAX_CATEGORY_LEN", "255"))
|
||||||
|
MAX_DESCRIPTION_LEN = int(os.getenv("MAX_DESCRIPTION_LEN", "2000"))
|
||||||
|
MAX_ICON_URL_LEN = int(os.getenv("MAX_ICON_URL_LEN", "2048"))
|
||||||
|
MAX_ICON_BYTES = int(os.getenv("MAX_ICON_BYTES", str(2 * 1024 * 1024)))
|
||||||
|
ALLOWED_ICON_MIME = {"image/png", "image/jpeg", "image/webp", "image/gif", "image/svg+xml", "image/x-icon"}
|
||||||
DB_HOST = os.getenv("DB_HOST", "mariadb")
|
DB_HOST = os.getenv("DB_HOST", "mariadb")
|
||||||
DB_PORT = int(os.getenv("DB_PORT", "3306"))
|
DB_PORT = int(os.getenv("DB_PORT", "3306"))
|
||||||
DB_USER = os.getenv("DB_USER", "jellomator")
|
DB_USER = os.getenv("DB_USER", "jellomator")
|
||||||
@@ -216,6 +223,38 @@ def link_name_exists(conn, name: str, *, exclude_id: int | None = None) -> bool:
|
|||||||
return cur.fetchone() is not None
|
return cur.fetchone() is not None
|
||||||
|
|
||||||
|
|
||||||
|
def validate_http_url(value: str, field_name: str = "url") -> None:
|
||||||
|
parsed = urlparse((value or "").strip())
|
||||||
|
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
|
||||||
|
raise HTTPException(422, f"{field_name} must be a valid http(s) URL")
|
||||||
|
|
||||||
|
|
||||||
|
def validate_length(value: str | None, limit: int, field_name: str) -> None:
|
||||||
|
if value is not None and len(value) > limit:
|
||||||
|
raise HTTPException(422, f"{field_name} exceeds max length of {limit}")
|
||||||
|
|
||||||
|
|
||||||
|
def validate_link_payload(name: str, url: str, description: str, category: str, icon_url: str | None) -> None:
|
||||||
|
validate_length(name, MAX_NAME_LEN, "name")
|
||||||
|
validate_length(description, MAX_DESCRIPTION_LEN, "description")
|
||||||
|
validate_length(category, MAX_CATEGORY_LEN, "category")
|
||||||
|
if icon_url:
|
||||||
|
validate_length(icon_url, MAX_ICON_URL_LEN, "icon_url")
|
||||||
|
validate_http_url(icon_url, "icon_url")
|
||||||
|
validate_http_url(url, "url")
|
||||||
|
|
||||||
|
|
||||||
|
def read_icon_blob(icon: UploadFile | None) -> tuple[bytes | None, str | None]:
|
||||||
|
if not icon:
|
||||||
|
return None, None
|
||||||
|
if icon.content_type not in ALLOWED_ICON_MIME:
|
||||||
|
raise HTTPException(422, "Unsupported icon file type")
|
||||||
|
blob = icon.file.read(MAX_ICON_BYTES + 1)
|
||||||
|
if len(blob) > MAX_ICON_BYTES:
|
||||||
|
raise HTTPException(422, f"Icon exceeds max size of {MAX_ICON_BYTES} bytes")
|
||||||
|
return blob, icon.content_type
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/me")
|
@app.get("/api/me")
|
||||||
def me(request: Request):
|
def me(request: Request):
|
||||||
with db() as c:
|
with db() as c:
|
||||||
@@ -331,13 +370,11 @@ def create_link(
|
|||||||
icon: UploadFile | None = File(None),
|
icon: UploadFile | None = File(None),
|
||||||
):
|
):
|
||||||
require_admin(request)
|
require_admin(request)
|
||||||
|
validate_link_payload(name, url, description, category, icon_url)
|
||||||
with db() as c:
|
with db() as c:
|
||||||
if link_name_exists(c, name):
|
if link_name_exists(c, name):
|
||||||
raise HTTPException(409, "Link name already exists")
|
raise HTTPException(409, "Link name already exists")
|
||||||
icon_blob = icon_mime = None
|
icon_blob, icon_mime = read_icon_blob(icon)
|
||||||
if icon:
|
|
||||||
icon_blob = icon.file.read()
|
|
||||||
icon_mime = icon.content_type
|
|
||||||
now = datetime.utcnow().isoformat()
|
now = datetime.utcnow().isoformat()
|
||||||
with db() as c:
|
with db() as c:
|
||||||
with c.cursor() as cur:
|
with c.cursor() as cur:
|
||||||
@@ -382,10 +419,8 @@ def update_link(
|
|||||||
icon: UploadFile | None = File(None),
|
icon: UploadFile | None = File(None),
|
||||||
):
|
):
|
||||||
require_admin(request)
|
require_admin(request)
|
||||||
icon_blob = icon_mime = None
|
validate_link_payload(name, url, description, category, icon_url)
|
||||||
if icon:
|
icon_blob, icon_mime = read_icon_blob(icon)
|
||||||
icon_blob = icon.file.read()
|
|
||||||
icon_mime = icon.content_type
|
|
||||||
now = datetime.utcnow().isoformat()
|
now = datetime.utcnow().isoformat()
|
||||||
with db() as c:
|
with db() as c:
|
||||||
if link_name_exists(c, name, exclude_id=link_id):
|
if link_name_exists(c, name, exclude_id=link_id):
|
||||||
|
|||||||
Reference in New Issue
Block a user