from __future__ import annotations import secrets import sqlite3 from datetime import datetime from pathlib import Path from typing import Optional import bcrypt from fastapi import FastAPI, File, Form, HTTPException, Request, Response, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel DB_PATH = Path("/app/data/jellomator.sqlite") STATIC_DIR = Path("frontend/dist") SESSION_COOKIE = "jellomator_session" app = FastAPI(title="Jellomator") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) PRESET_LINKS = [ {"name": "Sonarr", "url": "http://sonarr:8989", "description": "TV library automation", "category": "Arr*", "enabled": True}, {"name": "Radarr", "url": "http://radarr:7878", "description": "Movie library automation", "category": "Arr*", "enabled": True}, {"name": "Lidarr", "url": "http://lidarr:8686", "description": "Music library automation", "category": "Arr*", "enabled": True}, {"name": "Readarr", "url": "http://readarr:8787", "description": "Book library automation", "category": "Arr*", "enabled": True}, {"name": "Prowlarr", "url": "http://prowlarr:9696", "description": "Indexer management", "category": "Arr*", "enabled": True}, {"name": "Bazarr", "url": "http://bazarr:6767", "description": "Subtitle management", "category": "Arr*", "enabled": True}, {"name": "qBittorrent", "url": "http://qbittorrent:8080", "description": "Torrent client", "category": "Downloads", "enabled": True}, {"name": "Jellyfin", "url": "http://jellyfin:8096", "description": "Media server", "category": "Media", "enabled": True}, {"name": "Jellyseerr", "url": "http://jellyseerr:5055", "description": "Request management", "category": "Requests", "enabled": True}, {"name": "Overseerr", "url": "http://overseerr:5055", "description": "Request management", "category": "Requests", "enabled": True}, ] def db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): DB_PATH.parent.mkdir(parents=True, exist_ok=True) with db() as c: c.executescript(""" create table if not exists users(id integer primary key, username text unique, password_hash blob, role text not null); create table if not exists sessions(token text primary key, user_id integer not null, created_at text not null); create table if not exists links(id integer primary key, name text not null, url text not null, description text, category text, icon_blob blob, icon_mime text, icon_url text, enabled integer not null default 1, created_at text not null, updated_at text not null); """) init_db() class SetupIn(BaseModel): username: str password: str class LinkIn(BaseModel): name: str url: str description: str = "" category: str = "General" icon_url: Optional[str] = None enabled: bool = True class LoginIn(BaseModel): username: str password: str def current_user(request: Request): token = request.cookies.get(SESSION_COOKIE) if not token: return None with db() as c: row = c.execute("select u.username,u.role from sessions s join users u on u.id=s.user_id where s.token=?", (token,)).fetchone() return dict(row) if row else None def require_admin(request: Request): user = current_user(request) if not user: raise HTTPException(401, "Unauthorized") return user def seed_preset_links(conn: sqlite3.Connection): now = datetime.utcnow().isoformat() for preset in PRESET_LINKS: conn.execute( """insert into links(name,url,description,category,enabled,created_at,updated_at) values (?,?,?,?,?,?,?)""", (preset["name"], preset["url"], preset["description"], preset["category"], int(preset["enabled"]), now, now), ) @app.get("/api/me") def me(request: Request): with db() as c: needs_setup = c.execute("select count(*) from users").fetchone()[0] == 0 return {"needs_setup": needs_setup, "current_user": current_user(request)} @app.post("/api/setup") def setup(inp: SetupIn): with db() as c: if c.execute("select count(*) from users").fetchone()[0] > 0: raise HTTPException(400, "Setup already complete") pw = bcrypt.hashpw(inp.password.encode(), bcrypt.gensalt()) c.execute("insert into users(username,password_hash,role) values (?,?,?)", (inp.username, pw, "admin")) seed_preset_links(c) return {"ok": True} @app.post("/api/login") def login(inp: LoginIn): with db() as c: row = c.execute("select id,password_hash from users where username=?", (inp.username,)).fetchone() if not row or not bcrypt.checkpw(inp.password.encode(), row["password_hash"]): raise HTTPException(401, "Invalid credentials") token = secrets.token_urlsafe(32) c.execute("insert into sessions(token,user_id,created_at) values (?,?,?)", (token, row["id"], datetime.utcnow().isoformat())) response = JSONResponse({"ok": True}) response.set_cookie(SESSION_COOKIE, token, httponly=True, samesite="lax", secure=False, path="/") return response @app.post("/api/logout") def logout(request: Request): token = request.cookies.get(SESSION_COOKIE) with db() as c: if token: c.execute("delete from sessions where token=?", (token,)) resp = JSONResponse({"ok": True}) resp.delete_cookie(SESSION_COOKIE, path="/") return resp @app.get("/api/links") def links(): with db() as c: rows = c.execute("select * from links order by enabled desc, category, name").fetchall() out = [] for r in rows: icon_url = None if r["icon_blob"]: icon_url = f"/api/links/{r['id']}/icon" elif r["icon_url"]: icon_url = r["icon_url"] out.append({k: r[k] for k in ["id","name","url","description","category","enabled"]} | {"icon_url": icon_url}) return out @app.get("/api/links/{link_id}") def get_link(link_id: int): with db() as c: row = c.execute("select * from links where id=?", (link_id,)).fetchone() if not row: raise HTTPException(404, "Not found") icon_url = f"/api/links/{row['id']}/icon" if row["icon_blob"] else row["icon_url"] return {k: row[k] for k in ["id", "name", "url", "description", "category", "enabled", "icon_url"]} @app.post("/api/links") def create_link( request: Request, name: str = Form(...), url: str = Form(...), description: str = Form(""), category: str = Form("General"), icon_url: str | None = Form(None), enabled: bool = Form(True), icon: UploadFile | None = File(None), ): require_admin(request) icon_blob = icon_mime = None if icon: icon_blob = icon.file.read() icon_mime = icon.content_type now = datetime.utcnow().isoformat() with db() as c: c.execute("""insert into links(name,url,description,category,icon_blob,icon_mime,icon_url,enabled,created_at,updated_at) values (?,?,?,?,?,?,?,?,?,?)""", (name, url, description, category, icon_blob, icon_mime, icon_url, int(enabled), now, now)) return {"ok": True} @app.get("/api/links/{link_id}/icon") def link_icon(link_id: int): with db() as c: row = c.execute("select icon_blob,icon_mime from links where id=?", (link_id,)).fetchone() if not row or not row["icon_blob"]: raise HTTPException(404, "Not found") return Response(content=row["icon_blob"], media_type=row["icon_mime"] or "image/png") @app.delete("/api/links/{link_id}") def delete_link(request: Request, link_id: int): require_admin(request) with db() as c: c.execute("delete from links where id=?", (link_id,)) return {"ok": True} @app.patch("/api/links/{link_id}") def update_link( request: Request, link_id: int, name: str = Form(...), url: str = Form(...), description: str = Form(""), category: str = Form("General"), icon_url: str | None = Form(None), enabled: bool = Form(True), icon: UploadFile | None = File(None), ): require_admin(request) icon_blob = icon_mime = None if icon: icon_blob = icon.file.read() icon_mime = icon.content_type now = datetime.utcnow().isoformat() with db() as c: if icon_blob: c.execute("""update links set name=?,url=?,description=?,category=?,icon_blob=?,icon_mime=?,icon_url=?,enabled=?,updated_at=? where id=?""", (name,url,description,category,icon_blob,icon_mime,icon_url,int(enabled),now,link_id)) else: c.execute("""update links set name=?,url=?,description=?,category=?,icon_url=?,enabled=?,updated_at=? where id=?""", (name,url,description,category,icon_url,int(enabled),now,link_id)) return {"ok": True} if STATIC_DIR.exists(): app.mount("/assets", StaticFiles(directory=STATIC_DIR / "assets"), name="assets") @app.get("/{path:path}") def spa(path: str): if path.startswith("api/"): raise HTTPException(404) index = STATIC_DIR / "index.html" if index.exists(): return FileResponse(index) return HTMLResponse("