Compare commits

...

3 Commits

Author SHA1 Message Date
Space-Banane
dde83a2417 Make session cookie security configurable
All checks were successful
docker / build-and-push (push) Successful in 51s
2026-05-20 21:53:51 +02:00
Space-Banane
637cfe967f Add session expiry tracking and enforcement 2026-05-20 21:53:36 +02:00
Space-Banane
94392c2c99 Add health and readiness endpoints 2026-05-20 21:53:12 +02:00
2 changed files with 76 additions and 4 deletions

View File

@@ -32,6 +32,11 @@ docker compose up --build
The app expects a MariaDB instance configured through environment variables. The app expects a MariaDB instance configured through environment variables.
### Session and Cookie Env Vars
- `SESSION_TTL_SECONDS` (default: `86400`)
- `SESSION_COOKIE_SECURE` (default: `false`, set `true` in production HTTPS)
## Gitea CI/CD ## Gitea CI/CD
Add these secrets in Gitea: Add these secrets in Gitea:

View File

@@ -18,6 +18,8 @@ from pydantic import BaseModel
STATIC_DIR = Path("frontend/dist") STATIC_DIR = Path("frontend/dist")
PUBLIC_DIR = Path("public") PUBLIC_DIR = Path("public")
SESSION_COOKIE = "jellomator_session" SESSION_COOKIE = "jellomator_session"
SESSION_TTL_SECONDS = int(os.getenv("SESSION_TTL_SECONDS", "86400"))
SESSION_COOKIE_SECURE = os.getenv("SESSION_COOKIE_SECURE", "false").lower() in ("1", "true", "yes", "on")
DB_HOST = os.getenv("DB_HOST", "mariadb") DB_HOST = os.getenv("DB_HOST", "mariadb")
DB_PORT = int(os.getenv("DB_PORT", "3306")) DB_PORT = int(os.getenv("DB_PORT", "3306"))
DB_USER = os.getenv("DB_USER", "jellomator") DB_USER = os.getenv("DB_USER", "jellomator")
@@ -28,6 +30,23 @@ app = FastAPI(title="Jellomator")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
@app.get("/healthz")
def healthz():
return {"ok": True}
@app.get("/readyz")
def readyz():
try:
with db() as c:
with c.cursor() as cur:
cur.execute("select 1 as ok")
cur.fetchone()
except Exception:
raise HTTPException(503, "Database not ready")
return {"ok": True}
@contextmanager @contextmanager
def db(): def db():
conn = pymysql.connect( conn = pymysql.connect(
@@ -63,10 +82,18 @@ def init_db():
token varchar(255) primary key, token varchar(255) primary key,
user_id bigint not null, user_id bigint not null,
created_at varchar(64) not null, created_at varchar(64) not null,
expires_at varchar(64) null,
last_seen_at varchar(64) null,
index (user_id), index (user_id),
constraint sessions_user_fk foreign key (user_id) references users(id) on delete cascade constraint sessions_user_fk foreign key (user_id) references users(id) on delete cascade
) engine=InnoDB default charset=utf8mb4 ) engine=InnoDB default charset=utf8mb4
""") """)
cur.execute("show columns from sessions like 'expires_at'")
if cur.fetchone() is None:
cur.execute("alter table sessions add column expires_at varchar(64) null after created_at")
cur.execute("show columns from sessions like 'last_seen_at'")
if cur.fetchone() is None:
cur.execute("alter table sessions add column last_seen_at varchar(64) null after expires_at")
cur.execute(""" cur.execute("""
create table if not exists links( create table if not exists links(
id bigint auto_increment primary key, id bigint auto_increment primary key,
@@ -104,15 +131,43 @@ class LoginIn(BaseModel):
password: str password: str
def utc_now_iso() -> str:
return datetime.utcnow().isoformat()
def expires_at_iso() -> str:
now = datetime.utcnow().timestamp()
return datetime.utcfromtimestamp(now + SESSION_TTL_SECONDS).isoformat()
def current_user(request: Request): def current_user(request: Request):
token = request.cookies.get(SESSION_COOKIE) token = request.cookies.get(SESSION_COOKIE)
if not token: if not token:
return None return None
with db() as c: with db() as c:
with c.cursor() as cur: with c.cursor() as cur:
cur.execute("select u.username,u.role from sessions s join users u on u.id=s.user_id where s.token=%s", (token,)) cur.execute(
"select s.expires_at,u.username,u.role from sessions s join users u on u.id=s.user_id where s.token=%s",
(token,),
)
row = cur.fetchone() row = cur.fetchone()
return row if row else None if not row:
return None
expires_at = row.get("expires_at")
now = datetime.utcnow()
if expires_at:
try:
if now >= datetime.fromisoformat(expires_at):
cur.execute("delete from sessions where token=%s", (token,))
return None
except ValueError:
cur.execute("delete from sessions where token=%s", (token,))
return None
cur.execute(
"update sessions set last_seen_at=%s where token=%s",
(utc_now_iso(), token),
)
return {"username": row["username"], "role": row["role"]}
def require_admin(request: Request): def require_admin(request: Request):
@@ -162,9 +217,21 @@ def login(inp: LoginIn):
raise HTTPException(401, "Invalid credentials") raise HTTPException(401, "Invalid credentials")
token = secrets.token_urlsafe(32) token = secrets.token_urlsafe(32)
with c.cursor() as cur: with c.cursor() as cur:
cur.execute("insert into sessions(token,user_id,created_at) values (%s,%s,%s)", (token, row["id"], datetime.utcnow().isoformat())) now = utc_now_iso()
cur.execute(
"insert into sessions(token,user_id,created_at,expires_at,last_seen_at) values (%s,%s,%s,%s,%s)",
(token, row["id"], now, expires_at_iso(), now),
)
response = JSONResponse({"ok": True}) response = JSONResponse({"ok": True})
response.set_cookie(SESSION_COOKIE, token, httponly=True, samesite="lax", secure=False, path="/") response.set_cookie(
SESSION_COOKIE,
token,
httponly=True,
samesite="lax",
secure=SESSION_COOKIE_SECURE,
max_age=SESSION_TTL_SECONDS,
path="/",
)
return response return response