test: add pytest verification suite and gitea ci workflow
All checks were successful
CI / test (push) Successful in 48s
All checks were successful
CI / test (push) Successful in 48s
This commit is contained in:
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import secrets
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
@@ -73,20 +74,22 @@ def create_app(config: AppConfig | None = None) -> FastAPI:
|
||||
if not app_config.screenjob_token:
|
||||
raise RuntimeError("SCREENJOB_TOKEN is required in environment or .env.")
|
||||
|
||||
app = FastAPI(title="ScreenJob API", version="1.0.0")
|
||||
db = HistoryDB(app_config.db_path)
|
||||
ws_hub = _WebSocketHub()
|
||||
manager = JobManager(config=app_config, db=db, broadcast=ws_hub.broadcast_from_thread)
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(_: FastAPI):
|
||||
ws_hub.set_loop(asyncio.get_running_loop())
|
||||
yield
|
||||
|
||||
app = FastAPI(title="ScreenJob API", version="1.0.0", lifespan=lifespan)
|
||||
|
||||
app.state.config = app_config
|
||||
app.state.db = db
|
||||
app.state.ws_hub = ws_hub
|
||||
app.state.manager = manager
|
||||
|
||||
@app.on_event("startup")
|
||||
async def _on_startup() -> None:
|
||||
ws_hub.set_loop(asyncio.get_running_loop())
|
||||
|
||||
def _extract_token(
|
||||
authorization: str | None,
|
||||
x_screenjob_token: str | None,
|
||||
|
||||
Reference in New Issue
Block a user