From 637cfe967f45ef9ea96e7e18b937554fbb0aab81 Mon Sep 17 00:00:00 2001 From: Space-Banane Date: Wed, 20 May 2026 21:53:36 +0200 Subject: [PATCH] Add session expiry tracking and enforcement --- backend/main.py | 47 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/backend/main.py b/backend/main.py index 775ac51..297711c 100644 --- a/backend/main.py +++ b/backend/main.py @@ -18,6 +18,7 @@ from pydantic import BaseModel STATIC_DIR = Path("frontend/dist") PUBLIC_DIR = Path("public") SESSION_COOKIE = "jellomator_session" +SESSION_TTL_SECONDS = int(os.getenv("SESSION_TTL_SECONDS", "86400")) DB_HOST = os.getenv("DB_HOST", "mariadb") DB_PORT = int(os.getenv("DB_PORT", "3306")) DB_USER = os.getenv("DB_USER", "jellomator") @@ -80,10 +81,18 @@ def init_db(): token varchar(255) primary key, user_id bigint not null, created_at varchar(64) not null, + expires_at varchar(64) null, + last_seen_at varchar(64) null, index (user_id), constraint sessions_user_fk foreign key (user_id) references users(id) on delete cascade ) engine=InnoDB default charset=utf8mb4 """) + cur.execute("show columns from sessions like 'expires_at'") + if cur.fetchone() is None: + cur.execute("alter table sessions add column expires_at varchar(64) null after created_at") + cur.execute("show columns from sessions like 'last_seen_at'") + if cur.fetchone() is None: + cur.execute("alter table sessions add column last_seen_at varchar(64) null after expires_at") cur.execute(""" create table if not exists links( id bigint auto_increment primary key, @@ -121,15 +130,43 @@ class LoginIn(BaseModel): password: str +def utc_now_iso() -> str: + return datetime.utcnow().isoformat() + + +def expires_at_iso() -> str: + now = datetime.utcnow().timestamp() + return datetime.utcfromtimestamp(now + SESSION_TTL_SECONDS).isoformat() + + def current_user(request: Request): token = request.cookies.get(SESSION_COOKIE) if not token: return None with db() as c: with c.cursor() as cur: - cur.execute("select u.username,u.role from sessions s join users u on u.id=s.user_id where s.token=%s", (token,)) + cur.execute( + "select s.expires_at,u.username,u.role from sessions s join users u on u.id=s.user_id where s.token=%s", + (token,), + ) row = cur.fetchone() - return row if row else None + if not row: + return None + expires_at = row.get("expires_at") + now = datetime.utcnow() + if expires_at: + try: + if now >= datetime.fromisoformat(expires_at): + cur.execute("delete from sessions where token=%s", (token,)) + return None + except ValueError: + cur.execute("delete from sessions where token=%s", (token,)) + return None + cur.execute( + "update sessions set last_seen_at=%s where token=%s", + (utc_now_iso(), token), + ) + return {"username": row["username"], "role": row["role"]} def require_admin(request: Request): @@ -179,7 +216,11 @@ def login(inp: LoginIn): raise HTTPException(401, "Invalid credentials") token = secrets.token_urlsafe(32) with c.cursor() as cur: - cur.execute("insert into sessions(token,user_id,created_at) values (%s,%s,%s)", (token, row["id"], datetime.utcnow().isoformat())) + now = utc_now_iso() + cur.execute( + "insert into sessions(token,user_id,created_at,expires_at,last_seen_at) values (%s,%s,%s,%s,%s)", + (token, row["id"], now, expires_at_iso(), now), + ) response = JSONResponse({"ok": True}) response.set_cookie(SESSION_COOKIE, token, httponly=True, samesite="lax", secure=False, path="/") return response