Files
jellomator/backend/main.py
Space-Banane 975e0a4a7e
All checks were successful
docker / build-and-push (push) Successful in 48s
Simplify UI and seed default Arr links
2026-05-20 21:03:40 +02:00

341 lines
12 KiB
Python

from __future__ import annotations
import secrets
import os
from datetime import datetime
from contextlib import contextmanager
from pathlib import Path
from typing import Optional
import bcrypt
import pymysql
from fastapi import FastAPI, File, Form, HTTPException, Request, Response, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
STATIC_DIR = Path("frontend/dist")
SESSION_COOKIE = "jellomator_session"
DB_HOST = os.getenv("DB_HOST", "mariadb")
DB_PORT = int(os.getenv("DB_PORT", "3306"))
DB_USER = os.getenv("DB_USER", "jellomator")
DB_PASSWORD = os.getenv("DB_PASSWORD", "jellomator")
DB_NAME = os.getenv("DB_NAME", "jellomator")
DEFAULT_LINKS = [
{"name": "Jellyfin", "url": "http://localhost:8096", "description": "Media server", "category": "Media"},
{"name": "Jellyseerr", "url": "http://localhost:5055", "description": "Request management", "category": "Media"},
{"name": "Sonarr", "url": "http://localhost:8989", "description": "TV automation", "category": "Arr"},
{"name": "Radarr", "url": "http://localhost:7878", "description": "Movie automation", "category": "Arr"},
{"name": "Lidarr", "url": "http://localhost:8686", "description": "Music automation", "category": "Arr"},
{"name": "Readarr", "url": "http://localhost:8787", "description": "Book automation", "category": "Arr"},
{"name": "Bazarr", "url": "http://localhost:6767", "description": "Subtitle management", "category": "Arr"},
{"name": "Prowlarr", "url": "http://localhost:9696", "description": "Indexer management", "category": "Arr"},
{"name": "qBittorrent", "url": "http://localhost:8080", "description": "Torrent client", "category": "Download"},
{"name": "SABnzbd", "url": "http://localhost:8085", "description": "Usenet client", "category": "Download"},
]
app = FastAPI(title="Jellomator")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
@contextmanager
def db():
conn = pymysql.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)
try:
with conn.cursor() as cur:
cur.execute(f"create database if not exists `{DB_NAME}` default charset=utf8mb4")
conn.select_db(DB_NAME)
yield conn
finally:
conn.close()
def seed_default_links(conn):
now = datetime.utcnow().isoformat()
with conn.cursor() as cur:
for item in DEFAULT_LINKS:
cur.execute(
"""insert into links(name,url,description,category,enabled,created_at,updated_at)
values (%s,%s,%s,%s,1,%s,%s)
on duplicate key update
url=values(url),
description=values(description),
category=values(category),
updated_at=values(updated_at)
""",
(item["name"], item["url"], item["description"], item["category"], now, now),
)
def init_db():
with db() as c:
with c.cursor() as cur:
cur.execute("""
create table if not exists users(
id bigint auto_increment primary key,
username varchar(255) not null unique,
password_hash varbinary(255) not null,
role varchar(32) not null
) engine=InnoDB default charset=utf8mb4
""")
cur.execute("""
create table if not exists sessions(
token varchar(255) primary key,
user_id bigint not null,
created_at varchar(64) not null,
index (user_id),
constraint sessions_user_fk foreign key (user_id) references users(id) on delete cascade
) engine=InnoDB default charset=utf8mb4
""")
cur.execute("""
create table if not exists links(
id bigint auto_increment primary key,
name varchar(255) not null unique,
url text not null,
description text,
category varchar(255),
icon_blob longblob,
icon_mime varchar(255),
icon_url text,
enabled tinyint(1) not null default 1,
created_at varchar(64) not null,
updated_at varchar(64) not null
) engine=InnoDB default charset=utf8mb4
""")
seed_default_links(c)
init_db()
class SetupIn(BaseModel):
username: str
password: str
class LinkIn(BaseModel):
name: str
url: str
description: str = ""
category: str = "General"
icon_url: Optional[str] = None
enabled: bool = True
class LoginIn(BaseModel):
username: str
password: str
def current_user(request: Request):
token = request.cookies.get(SESSION_COOKIE)
if not token:
return None
with db() as c:
with c.cursor() as cur:
cur.execute("select u.username,u.role from sessions s join users u on u.id=s.user_id where s.token=%s", (token,))
row = cur.fetchone()
return row if row else None
def require_admin(request: Request):
user = current_user(request)
if not user:
raise HTTPException(401, "Unauthorized")
return user
def link_name_exists(conn, name: str, *, exclude_id: int | None = None) -> bool:
with conn.cursor() as cur:
if exclude_id is None:
cur.execute("select 1 from links where lower(name)=lower(%s) limit 1", (name,))
else:
cur.execute("select 1 from links where lower(name)=lower(%s) and id<>%s limit 1", (name, exclude_id))
return cur.fetchone() is not None
@app.get("/api/me")
def me(request: Request):
with db() as c:
with c.cursor() as cur:
cur.execute("select count(*) as count from users")
needs_setup = cur.fetchone()["count"] == 0
return {"needs_setup": needs_setup, "current_user": current_user(request)}
@app.post("/api/setup")
def setup(inp: SetupIn):
with db() as c:
with c.cursor() as cur:
cur.execute("select count(*) as count from users")
if cur.fetchone()["count"] > 0:
raise HTTPException(400, "Setup already complete")
pw = bcrypt.hashpw(inp.password.encode(), bcrypt.gensalt())
cur.execute("insert into users(username,password_hash,role) values (%s,%s,%s)", (inp.username, pw, "admin"))
return {"ok": True}
@app.post("/api/login")
def login(inp: LoginIn):
with db() as c:
with c.cursor() as cur:
cur.execute("select id,password_hash from users where username=%s", (inp.username,))
row = cur.fetchone()
if not row or not bcrypt.checkpw(inp.password.encode(), row["password_hash"]):
raise HTTPException(401, "Invalid credentials")
token = secrets.token_urlsafe(32)
with c.cursor() as cur:
cur.execute("insert into sessions(token,user_id,created_at) values (%s,%s,%s)", (token, row["id"], datetime.utcnow().isoformat()))
response = JSONResponse({"ok": True})
response.set_cookie(SESSION_COOKIE, token, httponly=True, samesite="lax", secure=False, path="/")
return response
@app.post("/api/logout")
def logout(request: Request):
token = request.cookies.get(SESSION_COOKIE)
with db() as c:
if token:
with c.cursor() as cur:
cur.execute("delete from sessions where token=%s", (token,))
resp = JSONResponse({"ok": True})
resp.delete_cookie(SESSION_COOKIE, path="/")
return resp
@app.get("/api/links")
def links():
with db() as c:
with c.cursor() as cur:
cur.execute("select * from links order by enabled desc, category, name")
rows = cur.fetchall()
out = []
for r in rows:
icon_url = None
if r["icon_blob"]:
icon_url = f"/api/links/{r['id']}/icon"
elif r["icon_url"]:
icon_url = r["icon_url"]
out.append({k: r[k] for k in ["id", "name", "url", "description", "category", "enabled"]} | {"icon_url": icon_url})
return out
@app.get("/api/links/{link_id}")
def get_link(link_id: int):
with db() as c:
with c.cursor() as cur:
cur.execute("select * from links where id=%s", (link_id,))
row = cur.fetchone()
if not row:
raise HTTPException(404, "Not found")
icon_url = f"/api/links/{row['id']}/icon" if row["icon_blob"] else row["icon_url"]
return {k: row[k] for k in ["id", "name", "url", "description", "category", "enabled", "icon_url"]}
@app.post("/api/links")
def create_link(
request: Request,
name: str = Form(...),
url: str = Form(...),
description: str = Form(""),
category: str = Form("General"),
icon_url: str | None = Form(None),
enabled: bool = Form(True),
icon: UploadFile | None = File(None),
):
require_admin(request)
with db() as c:
if link_name_exists(c, name):
raise HTTPException(409, "Link name already exists")
icon_blob = icon_mime = None
if icon:
icon_blob = icon.file.read()
icon_mime = icon.content_type
now = datetime.utcnow().isoformat()
with db() as c:
with c.cursor() as cur:
cur.execute(
"""insert into links(name,url,description,category,icon_blob,icon_mime,icon_url,enabled,created_at,updated_at)
values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(name, url, description, category, icon_blob, icon_mime, icon_url, int(enabled), now, now),
)
return {"ok": True}
@app.get("/api/links/{link_id}/icon")
def link_icon(link_id: int):
with db() as c:
with c.cursor() as cur:
cur.execute("select icon_blob,icon_mime from links where id=%s", (link_id,))
row = cur.fetchone()
if not row or not row["icon_blob"]:
raise HTTPException(404, "Not found")
return Response(content=row["icon_blob"], media_type=row["icon_mime"] or "image/png")
@app.delete("/api/links/{link_id}")
def delete_link(request: Request, link_id: int):
require_admin(request)
with db() as c:
with c.cursor() as cur:
cur.execute("delete from links where id=%s", (link_id,))
return {"ok": True}
@app.patch("/api/links/{link_id}")
def update_link(
request: Request,
link_id: int,
name: str = Form(...),
url: str = Form(...),
description: str = Form(""),
category: str = Form("General"),
icon_url: str | None = Form(None),
enabled: bool = Form(True),
icon: UploadFile | None = File(None),
):
require_admin(request)
icon_blob = icon_mime = None
if icon:
icon_blob = icon.file.read()
icon_mime = icon.content_type
now = datetime.utcnow().isoformat()
with db() as c:
if link_name_exists(c, name, exclude_id=link_id):
raise HTTPException(409, "Link name already exists")
with c.cursor() as cur:
if icon_blob:
cur.execute(
"""update links set name=%s,url=%s,description=%s,category=%s,icon_blob=%s,icon_mime=%s,icon_url=%s,enabled=%s,updated_at=%s where id=%s""",
(name, url, description, category, icon_blob, icon_mime, icon_url, int(enabled), now, link_id),
)
else:
cur.execute(
"""update links set name=%s,url=%s,description=%s,category=%s,icon_url=%s,enabled=%s,updated_at=%s where id=%s""",
(name, url, description, category, icon_url, int(enabled), now, link_id),
)
return {"ok": True}
if STATIC_DIR.exists():
app.mount("/assets", StaticFiles(directory=STATIC_DIR / "assets"), name="assets")
@app.get("/{path:path}")
def spa(path: str):
if path.startswith("api/"):
raise HTTPException(404)
index = STATIC_DIR / "index.html"
if index.exists():
return FileResponse(index)
return HTMLResponse("<h1>Jellomator</h1>")