795 lines
30 KiB
Python
795 lines
30 KiB
Python
from __future__ import annotations
|
|
|
|
import secrets
|
|
import os
|
|
import time
|
|
import json
|
|
import logging
|
|
import uuid
|
|
import base64
|
|
from datetime import datetime, timezone
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from urllib.parse import urlparse
|
|
|
|
import bcrypt
|
|
import pymysql
|
|
from fastapi import FastAPI, File, Form, HTTPException, Request, Response, UploadFile
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
|
|
STATIC_DIR = Path("frontend/dist")
|
|
PUBLIC_DIR = Path("public")
|
|
SESSION_COOKIE = "jellomator_session"
|
|
SESSION_TTL_SECONDS = int(os.getenv("SESSION_TTL_SECONDS", "86400"))
|
|
SESSION_COOKIE_SECURE = os.getenv("SESSION_COOKIE_SECURE", "false").lower() in ("1", "true", "yes", "on")
|
|
SESSION_ROTATE_SECONDS = int(os.getenv("SESSION_ROTATE_SECONDS", "3600"))
|
|
LOGIN_MAX_ATTEMPTS = int(os.getenv("LOGIN_MAX_ATTEMPTS", "5"))
|
|
LOGIN_WINDOW_SECONDS = int(os.getenv("LOGIN_WINDOW_SECONDS", "300"))
|
|
LOGIN_LOCKOUT_SECONDS = int(os.getenv("LOGIN_LOCKOUT_SECONDS", "900"))
|
|
REQUIRE_CSRF = os.getenv("REQUIRE_CSRF", "false").lower() in ("1", "true", "yes", "on")
|
|
MAX_NAME_LEN = int(os.getenv("MAX_NAME_LEN", "255"))
|
|
MAX_CATEGORY_LEN = int(os.getenv("MAX_CATEGORY_LEN", "255"))
|
|
MAX_DESCRIPTION_LEN = int(os.getenv("MAX_DESCRIPTION_LEN", "2000"))
|
|
MAX_ICON_URL_LEN = int(os.getenv("MAX_ICON_URL_LEN", "2048"))
|
|
MAX_ICON_BYTES = int(os.getenv("MAX_ICON_BYTES", str(2 * 1024 * 1024)))
|
|
ALLOWED_ICON_MIME = {"image/png", "image/jpeg", "image/webp", "image/gif", "image/svg+xml", "image/x-icon"}
|
|
USERNAME_MAX_LEN = int(os.getenv("USERNAME_MAX_LEN", "64"))
|
|
PASSWORD_MIN_LEN = int(os.getenv("PASSWORD_MIN_LEN", "12"))
|
|
DB_HOST = os.getenv("DB_HOST", "mariadb")
|
|
DB_PORT = int(os.getenv("DB_PORT", "3306"))
|
|
DB_USER = os.getenv("DB_USER", "jellomator")
|
|
DB_PASSWORD = os.getenv("DB_PASSWORD", "jellomator")
|
|
DB_NAME = os.getenv("DB_NAME", "jellomator")
|
|
|
|
app = FastAPI(title="Jellomator")
|
|
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
|
|
login_attempts: dict[str, list[float]] = {}
|
|
login_lockouts: dict[str, float] = {}
|
|
logger = logging.getLogger("jellomator")
|
|
if not logger.handlers:
|
|
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO").upper(), format="%(message)s")
|
|
|
|
|
|
@app.get("/healthz")
|
|
def healthz():
|
|
return {"ok": True}
|
|
|
|
|
|
@app.get("/readyz")
|
|
def readyz(write_test: bool = False):
|
|
try:
|
|
with db() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute("select 1 as ok")
|
|
cur.fetchone()
|
|
if write_test:
|
|
cur.execute("create temporary table if not exists readyz_probe(id int)")
|
|
cur.execute("insert into readyz_probe(id) values (1)")
|
|
cur.execute("truncate table readyz_probe")
|
|
except Exception:
|
|
raise HTTPException(503, "Database not ready")
|
|
return {"ok": True}
|
|
|
|
|
|
@contextmanager
|
|
def db():
|
|
conn = pymysql.connect(
|
|
host=DB_HOST,
|
|
port=DB_PORT,
|
|
user=DB_USER,
|
|
password=DB_PASSWORD,
|
|
cursorclass=pymysql.cursors.DictCursor,
|
|
autocommit=True,
|
|
)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(f"create database if not exists `{DB_NAME}` default charset=utf8mb4")
|
|
conn.select_db(DB_NAME)
|
|
yield conn
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def init_db():
|
|
with db() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute("""
|
|
create table if not exists users(
|
|
id bigint auto_increment primary key,
|
|
username varchar(255) not null unique,
|
|
password_hash varbinary(255) not null,
|
|
role varchar(32) not null
|
|
) engine=InnoDB default charset=utf8mb4
|
|
""")
|
|
cur.execute("""
|
|
create table if not exists sessions(
|
|
token varchar(255) primary key,
|
|
user_id bigint not null,
|
|
created_at varchar(64) not null,
|
|
expires_at varchar(64) null,
|
|
last_seen_at varchar(64) null,
|
|
index (user_id),
|
|
constraint sessions_user_fk foreign key (user_id) references users(id) on delete cascade
|
|
) engine=InnoDB default charset=utf8mb4
|
|
""")
|
|
cur.execute("show columns from sessions like 'expires_at'")
|
|
if cur.fetchone() is None:
|
|
cur.execute("alter table sessions add column expires_at varchar(64) null after created_at")
|
|
cur.execute("show columns from sessions like 'last_seen_at'")
|
|
if cur.fetchone() is None:
|
|
cur.execute("alter table sessions add column last_seen_at varchar(64) null after expires_at")
|
|
cur.execute("""
|
|
create table if not exists links(
|
|
id bigint auto_increment primary key,
|
|
name varchar(255) not null unique,
|
|
url text not null,
|
|
description text,
|
|
category varchar(255),
|
|
sort_order int not null default 0,
|
|
icon_blob longblob,
|
|
icon_mime varchar(255),
|
|
icon_url text,
|
|
enabled tinyint(1) not null default 1,
|
|
created_at datetime not null,
|
|
updated_at datetime not null
|
|
) engine=InnoDB default charset=utf8mb4
|
|
""")
|
|
cur.execute("show columns from links like 'sort_order'")
|
|
if cur.fetchone() is None:
|
|
cur.execute("alter table links add column sort_order int not null default 0 after category")
|
|
cur.execute("show columns from links like 'created_at'")
|
|
created_col = cur.fetchone()
|
|
cur.execute("show columns from links like 'updated_at'")
|
|
updated_col = cur.fetchone()
|
|
created_is_varchar = bool(created_col and str(created_col.get("Type", "")).startswith("varchar"))
|
|
updated_is_varchar = bool(updated_col and str(updated_col.get("Type", "")).startswith("varchar"))
|
|
if created_is_varchar or updated_is_varchar:
|
|
cur.execute("alter table links add column created_at_dt datetime null, add column updated_at_dt datetime null")
|
|
cur.execute(
|
|
"""
|
|
update links
|
|
set created_at_dt=coalesce(
|
|
str_to_date(created_at, '%Y-%m-%dT%H:%i:%s.%f'),
|
|
str_to_date(created_at, '%Y-%m-%dT%H:%i:%s'),
|
|
str_to_date(created_at, '%Y-%m-%d %H:%i:%s'),
|
|
utc_timestamp()
|
|
),
|
|
updated_at_dt=coalesce(
|
|
str_to_date(updated_at, '%Y-%m-%dT%H:%i:%s.%f'),
|
|
str_to_date(updated_at, '%Y-%m-%dT%H:%i:%s'),
|
|
str_to_date(updated_at, '%Y-%m-%d %H:%i:%s'),
|
|
utc_timestamp()
|
|
)
|
|
"""
|
|
)
|
|
cur.execute("alter table links drop column created_at, drop column updated_at")
|
|
cur.execute(
|
|
"alter table links change column created_at_dt created_at datetime not null, "
|
|
"change column updated_at_dt updated_at datetime not null"
|
|
)
|
|
init_db()
|
|
|
|
|
|
class SetupIn(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
|
|
class LinkIn(BaseModel):
|
|
name: str
|
|
url: str
|
|
description: str = ""
|
|
category: str = "General"
|
|
icon_url: Optional[str] = None
|
|
enabled: bool = True
|
|
|
|
|
|
class LoginIn(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
|
|
class RestoreIn(BaseModel):
|
|
data: dict
|
|
confirm: bool = False
|
|
|
|
|
|
class ReorderItem(BaseModel):
|
|
id: int
|
|
sort_order: int
|
|
|
|
|
|
class ReorderIn(BaseModel):
|
|
items: list[ReorderItem]
|
|
|
|
|
|
def utc_now_iso() -> str:
|
|
return datetime.utcnow().isoformat()
|
|
|
|
|
|
def utc_now_db() -> datetime:
|
|
return datetime.now(timezone.utc).replace(tzinfo=None)
|
|
|
|
|
|
def utc_now_iso_z() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def expires_at_iso() -> str:
|
|
now = datetime.utcnow().timestamp()
|
|
return datetime.utcfromtimestamp(now + SESSION_TTL_SECONDS).isoformat()
|
|
|
|
|
|
def current_user(request: Request, response: Response | None = None):
|
|
token = request.cookies.get(SESSION_COOKIE)
|
|
if not token:
|
|
return None
|
|
with db() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute(
|
|
"select s.created_at,s.last_seen_at,s.expires_at,u.username,u.role from sessions s join users u on u.id=s.user_id where s.token=%s",
|
|
(token,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
expires_at = row.get("expires_at")
|
|
now = datetime.utcnow()
|
|
if expires_at:
|
|
try:
|
|
if now >= datetime.fromisoformat(expires_at):
|
|
cur.execute("delete from sessions where token=%s", (token,))
|
|
return None
|
|
except ValueError:
|
|
cur.execute("delete from sessions where token=%s", (token,))
|
|
return None
|
|
now = datetime.utcnow()
|
|
now_iso = now.isoformat()
|
|
new_expires_at = expires_at_iso()
|
|
last_seen_at = row.get("last_seen_at") or row.get("created_at")
|
|
should_rotate = False
|
|
if last_seen_at:
|
|
try:
|
|
should_rotate = (now - datetime.fromisoformat(last_seen_at)).total_seconds() >= SESSION_ROTATE_SECONDS
|
|
except ValueError:
|
|
should_rotate = True
|
|
if should_rotate and response is not None:
|
|
new_token = secrets.token_urlsafe(32)
|
|
cur.execute(
|
|
"update sessions set token=%s,last_seen_at=%s,expires_at=%s where token=%s",
|
|
(new_token, now_iso, new_expires_at, token),
|
|
)
|
|
response.set_cookie(
|
|
SESSION_COOKIE,
|
|
new_token,
|
|
httponly=True,
|
|
samesite="lax",
|
|
secure=SESSION_COOKIE_SECURE,
|
|
max_age=SESSION_TTL_SECONDS,
|
|
path="/",
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"update sessions set last_seen_at=%s,expires_at=%s where token=%s",
|
|
(now_iso, new_expires_at, token),
|
|
)
|
|
return {"username": row["username"], "role": row["role"]}
|
|
|
|
|
|
def client_ip(request: Request) -> str:
|
|
forwarded = request.headers.get("x-forwarded-for")
|
|
if forwarded:
|
|
return forwarded.split(",")[0].strip()
|
|
return request.client.host if request.client else "unknown"
|
|
|
|
|
|
def login_key(request: Request, username: str) -> str:
|
|
return f"{client_ip(request)}::{username.strip().lower()}"
|
|
|
|
|
|
def prune_login_tracking(now: float) -> None:
|
|
for key, until in list(login_lockouts.items()):
|
|
if until <= now:
|
|
del login_lockouts[key]
|
|
cutoff = now - LOGIN_WINDOW_SECONDS
|
|
for key, entries in list(login_attempts.items()):
|
|
filtered = [t for t in entries if t >= cutoff]
|
|
if filtered:
|
|
login_attempts[key] = filtered
|
|
else:
|
|
del login_attempts[key]
|
|
|
|
|
|
def require_admin(request: Request):
|
|
user = current_user(request)
|
|
if not user:
|
|
raise HTTPException(401, "Unauthorized")
|
|
return user
|
|
|
|
|
|
def require_csrf(request: Request):
|
|
if not REQUIRE_CSRF:
|
|
return
|
|
origin = request.headers.get("origin")
|
|
referer = request.headers.get("referer")
|
|
target = f"{request.url.scheme}://{request.url.netloc}"
|
|
if origin and origin != target:
|
|
raise HTTPException(403, "Invalid CSRF origin")
|
|
if referer and not referer.startswith(target):
|
|
raise HTTPException(403, "Invalid CSRF referer")
|
|
if not origin and not referer:
|
|
raise HTTPException(403, "Invalid CSRF token")
|
|
|
|
|
|
def log_event(request: Request | None, event: str, **fields):
|
|
payload = {"event": event, "ts": utc_now_iso_z(), **fields}
|
|
if request is not None:
|
|
payload["request_id"] = getattr(request.state, "request_id", None)
|
|
payload["method"] = request.method
|
|
payload["path"] = request.url.path
|
|
payload["client_ip"] = client_ip(request)
|
|
logger.info(json.dumps(payload, separators=(",", ":")))
|
|
|
|
|
|
def parse_backup_payload(data: dict) -> tuple[list[dict], list[dict]]:
|
|
if not isinstance(data, dict):
|
|
raise HTTPException(422, "backup payload must be an object")
|
|
users = data.get("users")
|
|
links = data.get("links")
|
|
if not isinstance(users, list) or not isinstance(links, list):
|
|
raise HTTPException(422, "backup payload must include users[] and links[]")
|
|
parsed_users: list[dict] = []
|
|
parsed_links: list[dict] = []
|
|
for idx, user in enumerate(users):
|
|
if not isinstance(user, dict):
|
|
raise HTTPException(422, f"users[{idx}] must be an object")
|
|
username = str(user.get("username", "")).strip()
|
|
role = str(user.get("role", "")).strip() or "admin"
|
|
password_hash_b64 = str(user.get("password_hash_b64", "")).strip()
|
|
if not username or not password_hash_b64:
|
|
raise HTTPException(422, f"users[{idx}] must include username and password_hash_b64")
|
|
try:
|
|
password_hash = base64.b64decode(password_hash_b64.encode("ascii"), validate=True)
|
|
except Exception as exc:
|
|
raise HTTPException(422, f"users[{idx}] has invalid password_hash_b64") from exc
|
|
parsed_users.append({"username": username, "role": role, "password_hash": password_hash})
|
|
for idx, link in enumerate(links):
|
|
if not isinstance(link, dict):
|
|
raise HTTPException(422, f"links[{idx}] must be an object")
|
|
name = str(link.get("name", "")).strip()
|
|
url = str(link.get("url", "")).strip()
|
|
description = str(link.get("description", "") or "")
|
|
category = str(link.get("category", "") or "General")
|
|
icon_url = link.get("icon_url")
|
|
sort_order = int(link.get("sort_order", 0) or 0)
|
|
enabled = bool(link.get("enabled", True))
|
|
validate_link_payload(name, url, description, category, icon_url)
|
|
parsed_links.append(
|
|
{
|
|
"name": name,
|
|
"url": url,
|
|
"description": description,
|
|
"category": category,
|
|
"icon_url": icon_url,
|
|
"sort_order": sort_order,
|
|
"enabled": enabled,
|
|
}
|
|
)
|
|
lower_names = [l["name"].lower() for l in parsed_links]
|
|
if len(lower_names) != len(set(lower_names)):
|
|
raise HTTPException(422, "backup has duplicate link names")
|
|
return parsed_users, parsed_links
|
|
|
|
|
|
def link_name_exists(conn, name: str, *, exclude_id: int | None = None) -> bool:
|
|
with conn.cursor() as cur:
|
|
if exclude_id is None:
|
|
cur.execute("select 1 from links where lower(name)=lower(%s) limit 1", (name,))
|
|
else:
|
|
cur.execute("select 1 from links where lower(name)=lower(%s) and id<>%s limit 1", (name, exclude_id))
|
|
return cur.fetchone() is not None
|
|
|
|
|
|
def validate_http_url(value: str, field_name: str = "url") -> None:
|
|
parsed = urlparse((value or "").strip())
|
|
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
|
|
raise HTTPException(422, f"{field_name} must be a valid http(s) URL")
|
|
|
|
|
|
def validate_credentials(username: str, password: str) -> None:
|
|
if not username or len(username.strip()) == 0:
|
|
raise HTTPException(422, "username is required")
|
|
if len(username) > USERNAME_MAX_LEN:
|
|
raise HTTPException(422, f"username exceeds max length of {USERNAME_MAX_LEN}")
|
|
if len(password or "") < PASSWORD_MIN_LEN:
|
|
raise HTTPException(422, f"password must be at least {PASSWORD_MIN_LEN} characters")
|
|
|
|
|
|
def validate_length(value: str | None, limit: int, field_name: str) -> None:
|
|
if value is not None and len(value) > limit:
|
|
raise HTTPException(422, f"{field_name} exceeds max length of {limit}")
|
|
|
|
|
|
def validate_link_payload(name: str, url: str, description: str, category: str, icon_url: str | None) -> None:
|
|
validate_length(name, MAX_NAME_LEN, "name")
|
|
validate_length(description, MAX_DESCRIPTION_LEN, "description")
|
|
validate_length(category, MAX_CATEGORY_LEN, "category")
|
|
if icon_url:
|
|
validate_length(icon_url, MAX_ICON_URL_LEN, "icon_url")
|
|
validate_http_url(icon_url, "icon_url")
|
|
validate_http_url(url, "url")
|
|
|
|
|
|
def read_icon_blob(icon: UploadFile | None) -> tuple[bytes | None, str | None]:
|
|
if not icon:
|
|
return None, None
|
|
if icon.content_type not in ALLOWED_ICON_MIME:
|
|
raise HTTPException(422, "Unsupported icon file type")
|
|
blob = icon.file.read(MAX_ICON_BYTES + 1)
|
|
if len(blob) > MAX_ICON_BYTES:
|
|
raise HTTPException(422, f"Icon exceeds max size of {MAX_ICON_BYTES} bytes")
|
|
return blob, icon.content_type
|
|
|
|
|
|
@app.middleware("http")
|
|
async def request_context(request: Request, call_next):
|
|
request_id = request.headers.get("x-request-id") or uuid.uuid4().hex
|
|
request.state.request_id = request_id
|
|
response = await call_next(request)
|
|
response.headers["x-request-id"] = request_id
|
|
return response
|
|
|
|
|
|
@app.get("/api/me")
|
|
def me(request: Request, response: Response):
|
|
current = current_user(request, response)
|
|
with db() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute("select count(*) as count from users")
|
|
needs_setup = cur.fetchone()["count"] == 0
|
|
return {"needs_setup": needs_setup, "current_user": current}
|
|
|
|
|
|
@app.post("/api/setup")
|
|
def setup(inp: SetupIn):
|
|
validate_credentials(inp.username, inp.password)
|
|
with db() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute("select count(*) as count from users")
|
|
if cur.fetchone()["count"] > 0:
|
|
raise HTTPException(400, "Setup already complete")
|
|
pw = bcrypt.hashpw(inp.password.encode(), bcrypt.gensalt())
|
|
cur.execute("insert into users(username,password_hash,role) values (%s,%s,%s)", (inp.username, pw, "admin"))
|
|
log_event(None, "auth.setup_complete", username=inp.username)
|
|
return {"ok": True}
|
|
|
|
|
|
@app.post("/api/login")
|
|
def login(request: Request, inp: LoginIn):
|
|
validate_credentials(inp.username, inp.password)
|
|
now_ts = time.time()
|
|
prune_login_tracking(now_ts)
|
|
key = login_key(request, inp.username)
|
|
locked_until = login_lockouts.get(key)
|
|
if locked_until and locked_until > now_ts:
|
|
log_event(request, "auth.login_blocked", username=inp.username, locked_until=locked_until)
|
|
raise HTTPException(429, "Too many login attempts. Try again later.")
|
|
with db() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute("select id,password_hash from users where username=%s", (inp.username,))
|
|
row = cur.fetchone()
|
|
if not row or not bcrypt.checkpw(inp.password.encode(), row["password_hash"]):
|
|
attempts = login_attempts.get(key, [])
|
|
attempts.append(now_ts)
|
|
login_attempts[key] = [t for t in attempts if t >= now_ts - LOGIN_WINDOW_SECONDS]
|
|
if len(login_attempts[key]) >= LOGIN_MAX_ATTEMPTS:
|
|
login_lockouts[key] = now_ts + LOGIN_LOCKOUT_SECONDS
|
|
log_event(request, "auth.login_failed", username=inp.username)
|
|
raise HTTPException(401, "Invalid credentials")
|
|
login_attempts.pop(key, None)
|
|
login_lockouts.pop(key, None)
|
|
old_token = request.cookies.get(SESSION_COOKIE)
|
|
if old_token:
|
|
with c.cursor() as cur:
|
|
cur.execute("delete from sessions where token=%s", (old_token,))
|
|
token = secrets.token_urlsafe(32)
|
|
with c.cursor() as cur:
|
|
now = utc_now_iso()
|
|
cur.execute(
|
|
"insert into sessions(token,user_id,created_at,expires_at,last_seen_at) values (%s,%s,%s,%s,%s)",
|
|
(token, row["id"], now, expires_at_iso(), now),
|
|
)
|
|
response = JSONResponse({"ok": True})
|
|
response.set_cookie(
|
|
SESSION_COOKIE,
|
|
token,
|
|
httponly=True,
|
|
samesite="lax",
|
|
secure=SESSION_COOKIE_SECURE,
|
|
max_age=SESSION_TTL_SECONDS,
|
|
path="/",
|
|
)
|
|
log_event(request, "auth.login_success", username=inp.username)
|
|
return response
|
|
|
|
|
|
@app.post("/api/logout")
|
|
def logout(request: Request):
|
|
require_csrf(request)
|
|
token = request.cookies.get(SESSION_COOKIE)
|
|
with db() as c:
|
|
if token:
|
|
with c.cursor() as cur:
|
|
cur.execute("delete from sessions where token=%s", (token,))
|
|
resp = JSONResponse({"ok": True})
|
|
resp.delete_cookie(SESSION_COOKIE, path="/")
|
|
log_event(request, "auth.logout")
|
|
return resp
|
|
|
|
|
|
@app.get("/api/links")
|
|
def links():
|
|
with db() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute("select * from links order by enabled desc, sort_order asc, name asc")
|
|
rows = cur.fetchall()
|
|
out = []
|
|
for r in rows:
|
|
icon_url = None
|
|
if r["icon_blob"]:
|
|
icon_url = f"/api/links/{r['id']}/icon"
|
|
elif r["icon_url"]:
|
|
icon_url = r["icon_url"]
|
|
out.append({k: r[k] for k in ["id", "name", "url", "description", "category", "enabled"]} | {"icon_url": icon_url})
|
|
return out
|
|
|
|
|
|
@app.get("/api/links/{link_id}")
|
|
def get_link(link_id: int):
|
|
with db() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute("select * from links where id=%s", (link_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "Not found")
|
|
icon_url = f"/api/links/{row['id']}/icon" if row["icon_blob"] else row["icon_url"]
|
|
return {k: row[k] for k in ["id", "name", "url", "description", "category", "enabled", "icon_url"]}
|
|
|
|
|
|
@app.post("/api/links")
|
|
def create_link(
|
|
request: Request,
|
|
name: str = Form(...),
|
|
url: str = Form(...),
|
|
description: str = Form(""),
|
|
category: str = Form("General"),
|
|
icon_url: str | None = Form(None),
|
|
enabled: bool = Form(True),
|
|
icon: UploadFile | None = File(None),
|
|
):
|
|
require_admin(request)
|
|
require_csrf(request)
|
|
validate_link_payload(name, url, description, category, icon_url)
|
|
icon_blob, icon_mime = read_icon_blob(icon)
|
|
now = utc_now_db()
|
|
with db() as c:
|
|
if link_name_exists(c, name):
|
|
raise HTTPException(409, "Link name already exists")
|
|
with c.cursor() as cur:
|
|
cur.execute(
|
|
"""insert into links(name,url,description,category,sort_order,icon_blob,icon_mime,icon_url,enabled,created_at,updated_at)
|
|
values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
|
(name, url, description, category, 0, icon_blob, icon_mime, icon_url, int(enabled), now, now),
|
|
)
|
|
log_event(request, "links.create", name=name)
|
|
return {"ok": True}
|
|
|
|
|
|
@app.get("/api/links/{link_id}/icon")
|
|
def link_icon(link_id: int):
|
|
with db() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute("select icon_blob,icon_mime from links where id=%s", (link_id,))
|
|
row = cur.fetchone()
|
|
if not row or not row["icon_blob"]:
|
|
raise HTTPException(404, "Not found")
|
|
return Response(content=row["icon_blob"], media_type=row["icon_mime"] or "image/png")
|
|
|
|
|
|
@app.delete("/api/links/{link_id}")
|
|
def delete_link(request: Request, link_id: int):
|
|
require_admin(request)
|
|
require_csrf(request)
|
|
with db() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute("delete from links where id=%s", (link_id,))
|
|
log_event(request, "links.delete", link_id=link_id)
|
|
return {"ok": True}
|
|
|
|
|
|
@app.patch("/api/links/{link_id}")
|
|
def update_link(
|
|
request: Request,
|
|
link_id: int,
|
|
name: str = Form(...),
|
|
url: str = Form(...),
|
|
description: str = Form(""),
|
|
category: str = Form("General"),
|
|
icon_url: str | None = Form(None),
|
|
enabled: bool = Form(True),
|
|
icon: UploadFile | None = File(None),
|
|
):
|
|
require_admin(request)
|
|
require_csrf(request)
|
|
validate_link_payload(name, url, description, category, icon_url)
|
|
icon_blob, icon_mime = read_icon_blob(icon)
|
|
now = utc_now_db()
|
|
with db() as c:
|
|
if link_name_exists(c, name, exclude_id=link_id):
|
|
raise HTTPException(409, "Link name already exists")
|
|
with c.cursor() as cur:
|
|
if icon_blob:
|
|
cur.execute(
|
|
"""update links set name=%s,url=%s,description=%s,category=%s,icon_blob=%s,icon_mime=%s,icon_url=%s,enabled=%s,updated_at=%s where id=%s""",
|
|
(name, url, description, category, icon_blob, icon_mime, icon_url, int(enabled), now, link_id),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"""update links set name=%s,url=%s,description=%s,category=%s,icon_url=%s,enabled=%s,updated_at=%s where id=%s""",
|
|
(name, url, description, category, icon_url, int(enabled), now, link_id),
|
|
)
|
|
log_event(request, "links.update", link_id=link_id, name=name)
|
|
return {"ok": True}
|
|
|
|
|
|
@app.patch("/api/links/order")
|
|
def reorder_links(request: Request, inp: ReorderIn):
|
|
require_admin(request)
|
|
require_csrf(request)
|
|
if not inp.items:
|
|
raise HTTPException(422, "items must not be empty")
|
|
seen: set[int] = set()
|
|
with db() as c:
|
|
c.begin()
|
|
try:
|
|
with c.cursor() as cur:
|
|
for item in inp.items:
|
|
if item.id in seen:
|
|
raise HTTPException(422, "duplicate link id in reorder payload")
|
|
seen.add(item.id)
|
|
cur.execute("update links set sort_order=%s,updated_at=%s where id=%s", (item.sort_order, utc_now_db(), item.id))
|
|
if cur.rowcount == 0:
|
|
raise HTTPException(404, f"link {item.id} not found")
|
|
c.commit()
|
|
except Exception:
|
|
c.rollback()
|
|
raise
|
|
log_event(request, "links.reorder", count=len(inp.items))
|
|
return {"ok": True}
|
|
|
|
|
|
@app.get("/api/admin/backup")
|
|
def backup(request: Request):
|
|
require_admin(request)
|
|
with db() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute("select username, role, password_hash from users order by id asc")
|
|
users = cur.fetchall()
|
|
cur.execute(
|
|
"select name,url,description,category,sort_order,icon_url,enabled,created_at,updated_at from links order by id asc"
|
|
)
|
|
links_rows = cur.fetchall()
|
|
users_out = [
|
|
{
|
|
"username": u["username"],
|
|
"role": u["role"],
|
|
"password_hash_b64": base64.b64encode(u["password_hash"]).decode("ascii"),
|
|
}
|
|
for u in users
|
|
]
|
|
links_out = []
|
|
for link in links_rows:
|
|
links_out.append(
|
|
{
|
|
"name": link["name"],
|
|
"url": link["url"],
|
|
"description": link["description"],
|
|
"category": link["category"],
|
|
"sort_order": link["sort_order"],
|
|
"icon_url": link["icon_url"],
|
|
"enabled": bool(link["enabled"]),
|
|
"created_at": link["created_at"].replace(tzinfo=timezone.utc).isoformat() if link["created_at"] else None,
|
|
"updated_at": link["updated_at"].replace(tzinfo=timezone.utc).isoformat() if link["updated_at"] else None,
|
|
}
|
|
)
|
|
out = {"version": 1, "exported_at": utc_now_iso_z(), "users": users_out, "links": links_out}
|
|
log_event(request, "backup.export", users=len(users_out), links=len(links_out))
|
|
return out
|
|
|
|
|
|
@app.post("/api/admin/restore")
|
|
def restore(request: Request, inp: RestoreIn, dry_run: bool = True):
|
|
require_admin(request)
|
|
require_csrf(request)
|
|
parsed_users, parsed_links = parse_backup_payload(inp.data)
|
|
if dry_run:
|
|
log_event(request, "backup.restore_dry_run", users=len(parsed_users), links=len(parsed_links))
|
|
return {"ok": True, "dry_run": True, "users": len(parsed_users), "links": len(parsed_links)}
|
|
if not inp.confirm:
|
|
raise HTTPException(400, "confirm=true is required to apply restore")
|
|
now = utc_now_db()
|
|
with db() as c:
|
|
c.begin()
|
|
try:
|
|
with c.cursor() as cur:
|
|
cur.execute("set foreign_key_checks=0")
|
|
cur.execute("delete from sessions")
|
|
cur.execute("delete from users")
|
|
cur.execute("delete from links")
|
|
for user in parsed_users:
|
|
cur.execute(
|
|
"insert into users(username,password_hash,role) values (%s,%s,%s)",
|
|
(user["username"], user["password_hash"], user["role"]),
|
|
)
|
|
for link in parsed_links:
|
|
cur.execute(
|
|
"""
|
|
insert into links(name,url,description,category,sort_order,icon_blob,icon_mime,icon_url,enabled,created_at,updated_at)
|
|
values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
""",
|
|
(
|
|
link["name"],
|
|
link["url"],
|
|
link["description"],
|
|
link["category"],
|
|
link["sort_order"],
|
|
None,
|
|
None,
|
|
link["icon_url"],
|
|
int(link["enabled"]),
|
|
now,
|
|
now,
|
|
),
|
|
)
|
|
cur.execute("set foreign_key_checks=1")
|
|
c.commit()
|
|
except Exception:
|
|
c.rollback()
|
|
raise
|
|
log_event(request, "backup.restore_apply", users=len(parsed_users), links=len(parsed_links))
|
|
return {"ok": True, "dry_run": False, "users": len(parsed_users), "links": len(parsed_links)}
|
|
|
|
|
|
if STATIC_DIR.exists():
|
|
app.mount("/assets", StaticFiles(directory=STATIC_DIR / "assets"), name="assets")
|
|
if PUBLIC_DIR.exists():
|
|
app.mount("/static", StaticFiles(directory=PUBLIC_DIR), name="public")
|
|
elif STATIC_DIR.exists():
|
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="public-dist")
|
|
|
|
|
|
@app.get("/jellomator.png")
|
|
def root_icon():
|
|
icon = PUBLIC_DIR / "jellomator.png"
|
|
if not icon.exists():
|
|
icon = STATIC_DIR / "jellomator.png"
|
|
if not icon.exists():
|
|
raise HTTPException(404, "Not found")
|
|
return FileResponse(icon)
|
|
|
|
|
|
@app.get("/{path:path}")
|
|
def spa(path: str):
|
|
if path.startswith("api/"):
|
|
raise HTTPException(404)
|
|
if "." in path:
|
|
raise HTTPException(404)
|
|
index = STATIC_DIR / "index.html"
|
|
if index.exists():
|
|
return FileResponse(index)
|
|
return HTMLResponse("<h1>Jellomator</h1>")
|