from __future__ import annotations from sqlalchemy import select from gitea_codex_bot.db import get_session_factory from gitea_codex_bot.models import JobStatus, ReviewJob from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job from gitea_codex_bot.types import ParsedCommand def test_claim_and_transition() -> None: session_factory = get_session_factory() with session_factory() as session: job = enqueue_job( session, repo="acme/repo", pr_number=314, head_sha="deadbeef", trigger_comment_id=9901, trigger_comment_body="@codex review", requested_by="alice", command=ParsedCommand(name="review", raw="@codex review"), ) with session_factory() as session: claimed = claim_next_job(session) assert claimed is not None assert claimed.id == job.id assert claimed.status == JobStatus.running with session_factory() as session: finish_job(session, job_id=job.id, success=True, skipped=False, result={"summary": "ok"}, error_message=None) with session_factory() as session: loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() assert loaded.status == JobStatus.succeeded assert loaded.result_json is not None