Add session expiry tracking and enforcement

This commit is contained in:
Space-Banane
2026-05-20 21:53:36 +02:00
parent 94392c2c99
commit 637cfe967f

View File

@@ -18,6 +18,7 @@ from pydantic import BaseModel
STATIC_DIR = Path("frontend/dist")
PUBLIC_DIR = Path("public")
SESSION_COOKIE = "jellomator_session"
SESSION_TTL_SECONDS = int(os.getenv("SESSION_TTL_SECONDS", "86400"))
DB_HOST = os.getenv("DB_HOST", "mariadb")
DB_PORT = int(os.getenv("DB_PORT", "3306"))
DB_USER = os.getenv("DB_USER", "jellomator")
@@ -80,10 +81,18 @@ def init_db():
token varchar(255) primary key,
user_id bigint not null,
created_at varchar(64) not null,
expires_at varchar(64) null,
last_seen_at varchar(64) null,
index (user_id),
constraint sessions_user_fk foreign key (user_id) references users(id) on delete cascade
) engine=InnoDB default charset=utf8mb4
""")
cur.execute("show columns from sessions like 'expires_at'")
if cur.fetchone() is None:
cur.execute("alter table sessions add column expires_at varchar(64) null after created_at")
cur.execute("show columns from sessions like 'last_seen_at'")
if cur.fetchone() is None:
cur.execute("alter table sessions add column last_seen_at varchar(64) null after expires_at")
cur.execute("""
create table if not exists links(
id bigint auto_increment primary key,
@@ -121,15 +130,43 @@ class LoginIn(BaseModel):
password: str
def utc_now_iso() -> str:
return datetime.utcnow().isoformat()
def expires_at_iso() -> str:
now = datetime.utcnow().timestamp()
return datetime.utcfromtimestamp(now + SESSION_TTL_SECONDS).isoformat()
def current_user(request: Request):
token = request.cookies.get(SESSION_COOKIE)
if not token:
return None
with db() as c:
with c.cursor() as cur:
cur.execute("select u.username,u.role from sessions s join users u on u.id=s.user_id where s.token=%s", (token,))
cur.execute(
"select s.expires_at,u.username,u.role from sessions s join users u on u.id=s.user_id where s.token=%s",
(token,),
)
row = cur.fetchone()
return row if row else None
if not row:
return None
expires_at = row.get("expires_at")
now = datetime.utcnow()
if expires_at:
try:
if now >= datetime.fromisoformat(expires_at):
cur.execute("delete from sessions where token=%s", (token,))
return None
except ValueError:
cur.execute("delete from sessions where token=%s", (token,))
return None
cur.execute(
"update sessions set last_seen_at=%s where token=%s",
(utc_now_iso(), token),
)
return {"username": row["username"], "role": row["role"]}
def require_admin(request: Request):
@@ -179,7 +216,11 @@ def login(inp: LoginIn):
raise HTTPException(401, "Invalid credentials")
token = secrets.token_urlsafe(32)
with c.cursor() as cur:
cur.execute("insert into sessions(token,user_id,created_at) values (%s,%s,%s)", (token, row["id"], datetime.utcnow().isoformat()))
now = utc_now_iso()
cur.execute(
"insert into sessions(token,user_id,created_at,expires_at,last_seen_at) values (%s,%s,%s,%s,%s)",
(token, row["id"], now, expires_at_iso(), now),
)
response = JSONResponse({"ok": True})
response.set_cookie(SESSION_COOKIE, token, httponly=True, samesite="lax", secure=False, path="/")
return response