Files
jellomator/backend/main.py
Space-Banane fd874c9499
All checks were successful
docker / test (push) Successful in 14s
docker / build-and-push (push) Successful in 1m36s
admin: add backup/restore flow and structured request logging
2026-05-20 22:44:02 +02:00

760 lines
29 KiB
Python

from __future__ import annotations
import secrets
import os
import time
import json
import logging
import uuid
import base64
from datetime import datetime, timezone
from contextlib import contextmanager
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
import bcrypt
import pymysql
from fastapi import FastAPI, File, Form, HTTPException, Request, Response, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
STATIC_DIR = Path("frontend/dist")
PUBLIC_DIR = Path("public")
SESSION_COOKIE = "jellomator_session"
SESSION_TTL_SECONDS = int(os.getenv("SESSION_TTL_SECONDS", "86400"))
SESSION_COOKIE_SECURE = os.getenv("SESSION_COOKIE_SECURE", "false").lower() in ("1", "true", "yes", "on")
SESSION_ROTATE_SECONDS = int(os.getenv("SESSION_ROTATE_SECONDS", "3600"))
LOGIN_MAX_ATTEMPTS = int(os.getenv("LOGIN_MAX_ATTEMPTS", "5"))
LOGIN_WINDOW_SECONDS = int(os.getenv("LOGIN_WINDOW_SECONDS", "300"))
LOGIN_LOCKOUT_SECONDS = int(os.getenv("LOGIN_LOCKOUT_SECONDS", "900"))
REQUIRE_CSRF = os.getenv("REQUIRE_CSRF", "false").lower() in ("1", "true", "yes", "on")
MAX_NAME_LEN = int(os.getenv("MAX_NAME_LEN", "255"))
MAX_CATEGORY_LEN = int(os.getenv("MAX_CATEGORY_LEN", "255"))
MAX_DESCRIPTION_LEN = int(os.getenv("MAX_DESCRIPTION_LEN", "2000"))
MAX_ICON_URL_LEN = int(os.getenv("MAX_ICON_URL_LEN", "2048"))
MAX_ICON_BYTES = int(os.getenv("MAX_ICON_BYTES", str(2 * 1024 * 1024)))
ALLOWED_ICON_MIME = {"image/png", "image/jpeg", "image/webp", "image/gif", "image/svg+xml", "image/x-icon"}
USERNAME_MAX_LEN = int(os.getenv("USERNAME_MAX_LEN", "64"))
PASSWORD_MIN_LEN = int(os.getenv("PASSWORD_MIN_LEN", "12"))
DB_HOST = os.getenv("DB_HOST", "mariadb")
DB_PORT = int(os.getenv("DB_PORT", "3306"))
DB_USER = os.getenv("DB_USER", "jellomator")
DB_PASSWORD = os.getenv("DB_PASSWORD", "jellomator")
DB_NAME = os.getenv("DB_NAME", "jellomator")
app = FastAPI(title="Jellomator")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
login_attempts: dict[str, list[float]] = {}
login_lockouts: dict[str, float] = {}
logger = logging.getLogger("jellomator")
if not logger.handlers:
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO").upper(), format="%(message)s")
@app.get("/healthz")
def healthz():
return {"ok": True}
@app.get("/readyz")
def readyz(write_test: bool = False):
try:
with db() as c:
with c.cursor() as cur:
cur.execute("select 1 as ok")
cur.fetchone()
if write_test:
cur.execute("create temporary table if not exists readyz_probe(id int)")
cur.execute("insert into readyz_probe(id) values (1)")
cur.execute("truncate table readyz_probe")
except Exception:
raise HTTPException(503, "Database not ready")
return {"ok": True}
@contextmanager
def db():
conn = pymysql.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)
try:
with conn.cursor() as cur:
cur.execute(f"create database if not exists `{DB_NAME}` default charset=utf8mb4")
conn.select_db(DB_NAME)
yield conn
finally:
conn.close()
def init_db():
with db() as c:
with c.cursor() as cur:
cur.execute("""
create table if not exists users(
id bigint auto_increment primary key,
username varchar(255) not null unique,
password_hash varbinary(255) not null,
role varchar(32) not null
) engine=InnoDB default charset=utf8mb4
""")
cur.execute("""
create table if not exists sessions(
token varchar(255) primary key,
user_id bigint not null,
created_at varchar(64) not null,
expires_at varchar(64) null,
last_seen_at varchar(64) null,
index (user_id),
constraint sessions_user_fk foreign key (user_id) references users(id) on delete cascade
) engine=InnoDB default charset=utf8mb4
""")
cur.execute("show columns from sessions like 'expires_at'")
if cur.fetchone() is None:
cur.execute("alter table sessions add column expires_at varchar(64) null after created_at")
cur.execute("show columns from sessions like 'last_seen_at'")
if cur.fetchone() is None:
cur.execute("alter table sessions add column last_seen_at varchar(64) null after expires_at")
cur.execute("""
create table if not exists links(
id bigint auto_increment primary key,
name varchar(255) not null unique,
url text not null,
description text,
category varchar(255),
sort_order int not null default 0,
icon_blob longblob,
icon_mime varchar(255),
icon_url text,
enabled tinyint(1) not null default 1,
created_at datetime not null,
updated_at datetime not null
) engine=InnoDB default charset=utf8mb4
""")
cur.execute("show columns from links like 'sort_order'")
if cur.fetchone() is None:
cur.execute("alter table links add column sort_order int not null default 0 after category")
cur.execute("show columns from links like 'created_at'")
created_col = cur.fetchone()
cur.execute("show columns from links like 'updated_at'")
updated_col = cur.fetchone()
created_is_varchar = bool(created_col and str(created_col.get("Type", "")).startswith("varchar"))
updated_is_varchar = bool(updated_col and str(updated_col.get("Type", "")).startswith("varchar"))
if created_is_varchar or updated_is_varchar:
cur.execute("alter table links add column created_at_dt datetime null, add column updated_at_dt datetime null")
cur.execute(
"""
update links
set created_at_dt=coalesce(
str_to_date(created_at, '%Y-%m-%dT%H:%i:%s.%f'),
str_to_date(created_at, '%Y-%m-%dT%H:%i:%s'),
str_to_date(created_at, '%Y-%m-%d %H:%i:%s'),
utc_timestamp()
),
updated_at_dt=coalesce(
str_to_date(updated_at, '%Y-%m-%dT%H:%i:%s.%f'),
str_to_date(updated_at, '%Y-%m-%dT%H:%i:%s'),
str_to_date(updated_at, '%Y-%m-%d %H:%i:%s'),
utc_timestamp()
)
"""
)
cur.execute("alter table links drop column created_at, drop column updated_at")
cur.execute(
"alter table links change column created_at_dt created_at datetime not null, "
"change column updated_at_dt updated_at datetime not null"
)
init_db()
class SetupIn(BaseModel):
username: str
password: str
class LinkIn(BaseModel):
name: str
url: str
description: str = ""
category: str = "General"
icon_url: Optional[str] = None
enabled: bool = True
class LoginIn(BaseModel):
username: str
password: str
class RestoreIn(BaseModel):
data: dict
confirm: bool = False
def utc_now_iso() -> str:
return datetime.utcnow().isoformat()
def utc_now_db() -> datetime:
return datetime.now(timezone.utc).replace(tzinfo=None)
def utc_now_iso_z() -> str:
return datetime.now(timezone.utc).isoformat()
def expires_at_iso() -> str:
now = datetime.utcnow().timestamp()
return datetime.utcfromtimestamp(now + SESSION_TTL_SECONDS).isoformat()
def current_user(request: Request, response: Response | None = None):
token = request.cookies.get(SESSION_COOKIE)
if not token:
return None
with db() as c:
with c.cursor() as cur:
cur.execute(
"select s.created_at,s.last_seen_at,s.expires_at,u.username,u.role from sessions s join users u on u.id=s.user_id where s.token=%s",
(token,),
)
row = cur.fetchone()
if not row:
return None
expires_at = row.get("expires_at")
now = datetime.utcnow()
if expires_at:
try:
if now >= datetime.fromisoformat(expires_at):
cur.execute("delete from sessions where token=%s", (token,))
return None
except ValueError:
cur.execute("delete from sessions where token=%s", (token,))
return None
now = datetime.utcnow()
now_iso = now.isoformat()
new_expires_at = expires_at_iso()
last_seen_at = row.get("last_seen_at") or row.get("created_at")
should_rotate = False
if last_seen_at:
try:
should_rotate = (now - datetime.fromisoformat(last_seen_at)).total_seconds() >= SESSION_ROTATE_SECONDS
except ValueError:
should_rotate = True
if should_rotate and response is not None:
new_token = secrets.token_urlsafe(32)
cur.execute(
"update sessions set token=%s,last_seen_at=%s,expires_at=%s where token=%s",
(new_token, now_iso, new_expires_at, token),
)
response.set_cookie(
SESSION_COOKIE,
new_token,
httponly=True,
samesite="lax",
secure=SESSION_COOKIE_SECURE,
max_age=SESSION_TTL_SECONDS,
path="/",
)
else:
cur.execute(
"update sessions set last_seen_at=%s,expires_at=%s where token=%s",
(now_iso, new_expires_at, token),
)
return {"username": row["username"], "role": row["role"]}
def client_ip(request: Request) -> str:
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def login_key(request: Request, username: str) -> str:
return f"{client_ip(request)}::{username.strip().lower()}"
def prune_login_tracking(now: float) -> None:
for key, until in list(login_lockouts.items()):
if until <= now:
del login_lockouts[key]
cutoff = now - LOGIN_WINDOW_SECONDS
for key, entries in list(login_attempts.items()):
filtered = [t for t in entries if t >= cutoff]
if filtered:
login_attempts[key] = filtered
else:
del login_attempts[key]
def require_admin(request: Request):
user = current_user(request)
if not user:
raise HTTPException(401, "Unauthorized")
return user
def require_csrf(request: Request):
if not REQUIRE_CSRF:
return
origin = request.headers.get("origin")
referer = request.headers.get("referer")
target = f"{request.url.scheme}://{request.url.netloc}"
if origin and origin != target:
raise HTTPException(403, "Invalid CSRF origin")
if referer and not referer.startswith(target):
raise HTTPException(403, "Invalid CSRF referer")
if not origin and not referer:
raise HTTPException(403, "Invalid CSRF token")
def log_event(request: Request | None, event: str, **fields):
payload = {"event": event, "ts": utc_now_iso_z(), **fields}
if request is not None:
payload["request_id"] = getattr(request.state, "request_id", None)
payload["method"] = request.method
payload["path"] = request.url.path
payload["client_ip"] = client_ip(request)
logger.info(json.dumps(payload, separators=(",", ":")))
def parse_backup_payload(data: dict) -> tuple[list[dict], list[dict]]:
if not isinstance(data, dict):
raise HTTPException(422, "backup payload must be an object")
users = data.get("users")
links = data.get("links")
if not isinstance(users, list) or not isinstance(links, list):
raise HTTPException(422, "backup payload must include users[] and links[]")
parsed_users: list[dict] = []
parsed_links: list[dict] = []
for idx, user in enumerate(users):
if not isinstance(user, dict):
raise HTTPException(422, f"users[{idx}] must be an object")
username = str(user.get("username", "")).strip()
role = str(user.get("role", "")).strip() or "admin"
password_hash_b64 = str(user.get("password_hash_b64", "")).strip()
if not username or not password_hash_b64:
raise HTTPException(422, f"users[{idx}] must include username and password_hash_b64")
try:
password_hash = base64.b64decode(password_hash_b64.encode("ascii"), validate=True)
except Exception as exc:
raise HTTPException(422, f"users[{idx}] has invalid password_hash_b64") from exc
parsed_users.append({"username": username, "role": role, "password_hash": password_hash})
for idx, link in enumerate(links):
if not isinstance(link, dict):
raise HTTPException(422, f"links[{idx}] must be an object")
name = str(link.get("name", "")).strip()
url = str(link.get("url", "")).strip()
description = str(link.get("description", "") or "")
category = str(link.get("category", "") or "General")
icon_url = link.get("icon_url")
sort_order = int(link.get("sort_order", 0) or 0)
enabled = bool(link.get("enabled", True))
validate_link_payload(name, url, description, category, icon_url)
parsed_links.append(
{
"name": name,
"url": url,
"description": description,
"category": category,
"icon_url": icon_url,
"sort_order": sort_order,
"enabled": enabled,
}
)
lower_names = [l["name"].lower() for l in parsed_links]
if len(lower_names) != len(set(lower_names)):
raise HTTPException(422, "backup has duplicate link names")
return parsed_users, parsed_links
def link_name_exists(conn, name: str, *, exclude_id: int | None = None) -> bool:
with conn.cursor() as cur:
if exclude_id is None:
cur.execute("select 1 from links where lower(name)=lower(%s) limit 1", (name,))
else:
cur.execute("select 1 from links where lower(name)=lower(%s) and id<>%s limit 1", (name, exclude_id))
return cur.fetchone() is not None
def validate_http_url(value: str, field_name: str = "url") -> None:
parsed = urlparse((value or "").strip())
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise HTTPException(422, f"{field_name} must be a valid http(s) URL")
def validate_credentials(username: str, password: str) -> None:
if not username or len(username.strip()) == 0:
raise HTTPException(422, "username is required")
if len(username) > USERNAME_MAX_LEN:
raise HTTPException(422, f"username exceeds max length of {USERNAME_MAX_LEN}")
if len(password or "") < PASSWORD_MIN_LEN:
raise HTTPException(422, f"password must be at least {PASSWORD_MIN_LEN} characters")
def validate_length(value: str | None, limit: int, field_name: str) -> None:
if value is not None and len(value) > limit:
raise HTTPException(422, f"{field_name} exceeds max length of {limit}")
def validate_link_payload(name: str, url: str, description: str, category: str, icon_url: str | None) -> None:
validate_length(name, MAX_NAME_LEN, "name")
validate_length(description, MAX_DESCRIPTION_LEN, "description")
validate_length(category, MAX_CATEGORY_LEN, "category")
if icon_url:
validate_length(icon_url, MAX_ICON_URL_LEN, "icon_url")
validate_http_url(icon_url, "icon_url")
validate_http_url(url, "url")
def read_icon_blob(icon: UploadFile | None) -> tuple[bytes | None, str | None]:
if not icon:
return None, None
if icon.content_type not in ALLOWED_ICON_MIME:
raise HTTPException(422, "Unsupported icon file type")
blob = icon.file.read(MAX_ICON_BYTES + 1)
if len(blob) > MAX_ICON_BYTES:
raise HTTPException(422, f"Icon exceeds max size of {MAX_ICON_BYTES} bytes")
return blob, icon.content_type
@app.middleware("http")
async def request_context(request: Request, call_next):
request_id = request.headers.get("x-request-id") or uuid.uuid4().hex
request.state.request_id = request_id
response = await call_next(request)
response.headers["x-request-id"] = request_id
return response
@app.get("/api/me")
def me(request: Request, response: Response):
current = current_user(request, response)
with db() as c:
with c.cursor() as cur:
cur.execute("select count(*) as count from users")
needs_setup = cur.fetchone()["count"] == 0
return {"needs_setup": needs_setup, "current_user": current}
@app.post("/api/setup")
def setup(inp: SetupIn):
validate_credentials(inp.username, inp.password)
with db() as c:
with c.cursor() as cur:
cur.execute("select count(*) as count from users")
if cur.fetchone()["count"] > 0:
raise HTTPException(400, "Setup already complete")
pw = bcrypt.hashpw(inp.password.encode(), bcrypt.gensalt())
cur.execute("insert into users(username,password_hash,role) values (%s,%s,%s)", (inp.username, pw, "admin"))
log_event(None, "auth.setup_complete", username=inp.username)
return {"ok": True}
@app.post("/api/login")
def login(request: Request, inp: LoginIn):
validate_credentials(inp.username, inp.password)
now_ts = time.time()
prune_login_tracking(now_ts)
key = login_key(request, inp.username)
locked_until = login_lockouts.get(key)
if locked_until and locked_until > now_ts:
log_event(request, "auth.login_blocked", username=inp.username, locked_until=locked_until)
raise HTTPException(429, "Too many login attempts. Try again later.")
with db() as c:
with c.cursor() as cur:
cur.execute("select id,password_hash from users where username=%s", (inp.username,))
row = cur.fetchone()
if not row or not bcrypt.checkpw(inp.password.encode(), row["password_hash"]):
attempts = login_attempts.get(key, [])
attempts.append(now_ts)
login_attempts[key] = [t for t in attempts if t >= now_ts - LOGIN_WINDOW_SECONDS]
if len(login_attempts[key]) >= LOGIN_MAX_ATTEMPTS:
login_lockouts[key] = now_ts + LOGIN_LOCKOUT_SECONDS
log_event(request, "auth.login_failed", username=inp.username)
raise HTTPException(401, "Invalid credentials")
login_attempts.pop(key, None)
login_lockouts.pop(key, None)
old_token = request.cookies.get(SESSION_COOKIE)
if old_token:
with c.cursor() as cur:
cur.execute("delete from sessions where token=%s", (old_token,))
token = secrets.token_urlsafe(32)
with c.cursor() as cur:
now = utc_now_iso()
cur.execute(
"insert into sessions(token,user_id,created_at,expires_at,last_seen_at) values (%s,%s,%s,%s,%s)",
(token, row["id"], now, expires_at_iso(), now),
)
response = JSONResponse({"ok": True})
response.set_cookie(
SESSION_COOKIE,
token,
httponly=True,
samesite="lax",
secure=SESSION_COOKIE_SECURE,
max_age=SESSION_TTL_SECONDS,
path="/",
)
log_event(request, "auth.login_success", username=inp.username)
return response
@app.post("/api/logout")
def logout(request: Request):
require_csrf(request)
token = request.cookies.get(SESSION_COOKIE)
with db() as c:
if token:
with c.cursor() as cur:
cur.execute("delete from sessions where token=%s", (token,))
resp = JSONResponse({"ok": True})
resp.delete_cookie(SESSION_COOKIE, path="/")
log_event(request, "auth.logout")
return resp
@app.get("/api/links")
def links():
with db() as c:
with c.cursor() as cur:
cur.execute("select * from links order by enabled desc, sort_order asc, name asc")
rows = cur.fetchall()
out = []
for r in rows:
icon_url = None
if r["icon_blob"]:
icon_url = f"/api/links/{r['id']}/icon"
elif r["icon_url"]:
icon_url = r["icon_url"]
out.append({k: r[k] for k in ["id", "name", "url", "description", "category", "enabled"]} | {"icon_url": icon_url})
return out
@app.get("/api/links/{link_id}")
def get_link(link_id: int):
with db() as c:
with c.cursor() as cur:
cur.execute("select * from links where id=%s", (link_id,))
row = cur.fetchone()
if not row:
raise HTTPException(404, "Not found")
icon_url = f"/api/links/{row['id']}/icon" if row["icon_blob"] else row["icon_url"]
return {k: row[k] for k in ["id", "name", "url", "description", "category", "enabled", "icon_url"]}
@app.post("/api/links")
def create_link(
request: Request,
name: str = Form(...),
url: str = Form(...),
description: str = Form(""),
category: str = Form("General"),
icon_url: str | None = Form(None),
enabled: bool = Form(True),
icon: UploadFile | None = File(None),
):
require_admin(request)
require_csrf(request)
validate_link_payload(name, url, description, category, icon_url)
icon_blob, icon_mime = read_icon_blob(icon)
now = utc_now_db()
with db() as c:
if link_name_exists(c, name):
raise HTTPException(409, "Link name already exists")
with c.cursor() as cur:
cur.execute(
"""insert into links(name,url,description,category,sort_order,icon_blob,icon_mime,icon_url,enabled,created_at,updated_at)
values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(name, url, description, category, 0, icon_blob, icon_mime, icon_url, int(enabled), now, now),
)
log_event(request, "links.create", name=name)
return {"ok": True}
@app.get("/api/links/{link_id}/icon")
def link_icon(link_id: int):
with db() as c:
with c.cursor() as cur:
cur.execute("select icon_blob,icon_mime from links where id=%s", (link_id,))
row = cur.fetchone()
if not row or not row["icon_blob"]:
raise HTTPException(404, "Not found")
return Response(content=row["icon_blob"], media_type=row["icon_mime"] or "image/png")
@app.delete("/api/links/{link_id}")
def delete_link(request: Request, link_id: int):
require_admin(request)
require_csrf(request)
with db() as c:
with c.cursor() as cur:
cur.execute("delete from links where id=%s", (link_id,))
log_event(request, "links.delete", link_id=link_id)
return {"ok": True}
@app.patch("/api/links/{link_id}")
def update_link(
request: Request,
link_id: int,
name: str = Form(...),
url: str = Form(...),
description: str = Form(""),
category: str = Form("General"),
icon_url: str | None = Form(None),
enabled: bool = Form(True),
icon: UploadFile | None = File(None),
):
require_admin(request)
require_csrf(request)
validate_link_payload(name, url, description, category, icon_url)
icon_blob, icon_mime = read_icon_blob(icon)
now = utc_now_db()
with db() as c:
if link_name_exists(c, name, exclude_id=link_id):
raise HTTPException(409, "Link name already exists")
with c.cursor() as cur:
if icon_blob:
cur.execute(
"""update links set name=%s,url=%s,description=%s,category=%s,icon_blob=%s,icon_mime=%s,icon_url=%s,enabled=%s,updated_at=%s where id=%s""",
(name, url, description, category, icon_blob, icon_mime, icon_url, int(enabled), now, link_id),
)
else:
cur.execute(
"""update links set name=%s,url=%s,description=%s,category=%s,icon_url=%s,enabled=%s,updated_at=%s where id=%s""",
(name, url, description, category, icon_url, int(enabled), now, link_id),
)
log_event(request, "links.update", link_id=link_id, name=name)
return {"ok": True}
@app.get("/api/admin/backup")
def backup(request: Request):
require_admin(request)
with db() as c:
with c.cursor() as cur:
cur.execute("select username, role, password_hash from users order by id asc")
users = cur.fetchall()
cur.execute(
"select name,url,description,category,sort_order,icon_url,enabled,created_at,updated_at from links order by id asc"
)
links_rows = cur.fetchall()
users_out = [
{
"username": u["username"],
"role": u["role"],
"password_hash_b64": base64.b64encode(u["password_hash"]).decode("ascii"),
}
for u in users
]
links_out = []
for link in links_rows:
links_out.append(
{
"name": link["name"],
"url": link["url"],
"description": link["description"],
"category": link["category"],
"sort_order": link["sort_order"],
"icon_url": link["icon_url"],
"enabled": bool(link["enabled"]),
"created_at": link["created_at"].replace(tzinfo=timezone.utc).isoformat() if link["created_at"] else None,
"updated_at": link["updated_at"].replace(tzinfo=timezone.utc).isoformat() if link["updated_at"] else None,
}
)
out = {"version": 1, "exported_at": utc_now_iso_z(), "users": users_out, "links": links_out}
log_event(request, "backup.export", users=len(users_out), links=len(links_out))
return out
@app.post("/api/admin/restore")
def restore(request: Request, inp: RestoreIn, dry_run: bool = True):
require_admin(request)
require_csrf(request)
parsed_users, parsed_links = parse_backup_payload(inp.data)
if dry_run:
log_event(request, "backup.restore_dry_run", users=len(parsed_users), links=len(parsed_links))
return {"ok": True, "dry_run": True, "users": len(parsed_users), "links": len(parsed_links)}
if not inp.confirm:
raise HTTPException(400, "confirm=true is required to apply restore")
now = utc_now_db()
with db() as c:
c.begin()
try:
with c.cursor() as cur:
cur.execute("set foreign_key_checks=0")
cur.execute("delete from sessions")
cur.execute("delete from users")
cur.execute("delete from links")
for user in parsed_users:
cur.execute(
"insert into users(username,password_hash,role) values (%s,%s,%s)",
(user["username"], user["password_hash"], user["role"]),
)
for link in parsed_links:
cur.execute(
"""
insert into links(name,url,description,category,sort_order,icon_blob,icon_mime,icon_url,enabled,created_at,updated_at)
values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""",
(
link["name"],
link["url"],
link["description"],
link["category"],
link["sort_order"],
None,
None,
link["icon_url"],
int(link["enabled"]),
now,
now,
),
)
cur.execute("set foreign_key_checks=1")
c.commit()
except Exception:
c.rollback()
raise
log_event(request, "backup.restore_apply", users=len(parsed_users), links=len(parsed_links))
return {"ok": True, "dry_run": False, "users": len(parsed_users), "links": len(parsed_links)}
if STATIC_DIR.exists():
app.mount("/assets", StaticFiles(directory=STATIC_DIR / "assets"), name="assets")
if PUBLIC_DIR.exists():
app.mount("/static", StaticFiles(directory=PUBLIC_DIR), name="public")
elif STATIC_DIR.exists():
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="public-dist")
@app.get("/jellomator.png")
def root_icon():
icon = PUBLIC_DIR / "jellomator.png"
if not icon.exists():
icon = STATIC_DIR / "jellomator.png"
if not icon.exists():
raise HTTPException(404, "Not found")
return FileResponse(icon)
@app.get("/{path:path}")
def spa(path: str):
if path.startswith("api/"):
raise HTTPException(404)
if "." in path:
raise HTTPException(404)
index = STATIC_DIR / "index.html"
if index.exists():
return FileResponse(index)
return HTMLResponse("<h1>Jellomator</h1>")