Add optional CSRF enforcement for write routes
All checks were successful
docker / build-and-push (push) Successful in 49s
All checks were successful
docker / build-and-push (push) Successful in 49s
This commit is contained in:
@@ -36,6 +36,11 @@ The app expects a MariaDB instance configured through environment variables.
|
|||||||
|
|
||||||
- `SESSION_TTL_SECONDS` (default: `86400`)
|
- `SESSION_TTL_SECONDS` (default: `86400`)
|
||||||
- `SESSION_COOKIE_SECURE` (default: `false`, set `true` in production HTTPS)
|
- `SESSION_COOKIE_SECURE` (default: `false`, set `true` in production HTTPS)
|
||||||
|
- `REQUIRE_CSRF` (default: `false`, checks same-origin/same-referer for write routes when enabled)
|
||||||
|
- `LOGIN_MAX_ATTEMPTS` (default: `5`)
|
||||||
|
- `LOGIN_WINDOW_SECONDS` (default: `300`)
|
||||||
|
- `LOGIN_LOCKOUT_SECONDS` (default: `900`)
|
||||||
|
- `MAX_ICON_BYTES` (default: `2097152`)
|
||||||
|
|
||||||
## Gitea CI/CD
|
## Gitea CI/CD
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ SESSION_COOKIE_SECURE = os.getenv("SESSION_COOKIE_SECURE", "false").lower() in (
|
|||||||
LOGIN_MAX_ATTEMPTS = int(os.getenv("LOGIN_MAX_ATTEMPTS", "5"))
|
LOGIN_MAX_ATTEMPTS = int(os.getenv("LOGIN_MAX_ATTEMPTS", "5"))
|
||||||
LOGIN_WINDOW_SECONDS = int(os.getenv("LOGIN_WINDOW_SECONDS", "300"))
|
LOGIN_WINDOW_SECONDS = int(os.getenv("LOGIN_WINDOW_SECONDS", "300"))
|
||||||
LOGIN_LOCKOUT_SECONDS = int(os.getenv("LOGIN_LOCKOUT_SECONDS", "900"))
|
LOGIN_LOCKOUT_SECONDS = int(os.getenv("LOGIN_LOCKOUT_SECONDS", "900"))
|
||||||
|
REQUIRE_CSRF = os.getenv("REQUIRE_CSRF", "false").lower() in ("1", "true", "yes", "on")
|
||||||
MAX_NAME_LEN = int(os.getenv("MAX_NAME_LEN", "255"))
|
MAX_NAME_LEN = int(os.getenv("MAX_NAME_LEN", "255"))
|
||||||
MAX_CATEGORY_LEN = int(os.getenv("MAX_CATEGORY_LEN", "255"))
|
MAX_CATEGORY_LEN = int(os.getenv("MAX_CATEGORY_LEN", "255"))
|
||||||
MAX_DESCRIPTION_LEN = int(os.getenv("MAX_DESCRIPTION_LEN", "2000"))
|
MAX_DESCRIPTION_LEN = int(os.getenv("MAX_DESCRIPTION_LEN", "2000"))
|
||||||
@@ -214,6 +215,20 @@ def require_admin(request: Request):
|
|||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
def require_csrf(request: Request):
|
||||||
|
if not REQUIRE_CSRF:
|
||||||
|
return
|
||||||
|
origin = request.headers.get("origin")
|
||||||
|
referer = request.headers.get("referer")
|
||||||
|
target = f"{request.url.scheme}://{request.url.netloc}"
|
||||||
|
if origin and origin != target:
|
||||||
|
raise HTTPException(403, "Invalid CSRF origin")
|
||||||
|
if referer and not referer.startswith(target):
|
||||||
|
raise HTTPException(403, "Invalid CSRF referer")
|
||||||
|
if not origin and not referer:
|
||||||
|
raise HTTPException(403, "Invalid CSRF token")
|
||||||
|
|
||||||
|
|
||||||
def link_name_exists(conn, name: str, *, exclude_id: int | None = None) -> bool:
|
def link_name_exists(conn, name: str, *, exclude_id: int | None = None) -> bool:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
if exclude_id is None:
|
if exclude_id is None:
|
||||||
@@ -319,6 +334,7 @@ def login(request: Request, inp: LoginIn):
|
|||||||
|
|
||||||
@app.post("/api/logout")
|
@app.post("/api/logout")
|
||||||
def logout(request: Request):
|
def logout(request: Request):
|
||||||
|
require_csrf(request)
|
||||||
token = request.cookies.get(SESSION_COOKIE)
|
token = request.cookies.get(SESSION_COOKIE)
|
||||||
with db() as c:
|
with db() as c:
|
||||||
if token:
|
if token:
|
||||||
@@ -370,6 +386,7 @@ def create_link(
|
|||||||
icon: UploadFile | None = File(None),
|
icon: UploadFile | None = File(None),
|
||||||
):
|
):
|
||||||
require_admin(request)
|
require_admin(request)
|
||||||
|
require_csrf(request)
|
||||||
validate_link_payload(name, url, description, category, icon_url)
|
validate_link_payload(name, url, description, category, icon_url)
|
||||||
with db() as c:
|
with db() as c:
|
||||||
if link_name_exists(c, name):
|
if link_name_exists(c, name):
|
||||||
@@ -400,6 +417,7 @@ def link_icon(link_id: int):
|
|||||||
@app.delete("/api/links/{link_id}")
|
@app.delete("/api/links/{link_id}")
|
||||||
def delete_link(request: Request, link_id: int):
|
def delete_link(request: Request, link_id: int):
|
||||||
require_admin(request)
|
require_admin(request)
|
||||||
|
require_csrf(request)
|
||||||
with db() as c:
|
with db() as c:
|
||||||
with c.cursor() as cur:
|
with c.cursor() as cur:
|
||||||
cur.execute("delete from links where id=%s", (link_id,))
|
cur.execute("delete from links where id=%s", (link_id,))
|
||||||
@@ -419,6 +437,7 @@ def update_link(
|
|||||||
icon: UploadFile | None = File(None),
|
icon: UploadFile | None = File(None),
|
||||||
):
|
):
|
||||||
require_admin(request)
|
require_admin(request)
|
||||||
|
require_csrf(request)
|
||||||
validate_link_payload(name, url, description, category, icon_url)
|
validate_link_payload(name, url, description, category, icon_url)
|
||||||
icon_blob, icon_mime = read_icon_blob(icon)
|
icon_blob, icon_mime = read_icon_blob(icon)
|
||||||
now = datetime.utcnow().isoformat()
|
now = datetime.utcnow().isoformat()
|
||||||
|
|||||||
Reference in New Issue
Block a user