from __future__ import annotations from datetime import datetime, timedelta, timezone from sqlalchemy import select from gitea_codex_bot.db import get_session_factory from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job, recover_stuck_running_jobs from gitea_codex_bot.types import ParsedCommand def test_claim_and_transition() -> None: session_factory = get_session_factory() with session_factory() as session: job = enqueue_job( session, repo="acme/repo", pr_number=314, head_sha="deadbeef", trigger_comment_id=9901, trigger_comment_body="@codex review", requested_by="alice", command=ParsedCommand(name="review", raw="@codex review"), ) with session_factory() as session: claimed = claim_next_job(session) assert claimed is not None assert claimed.id == job.id assert claimed.status == JobStatus.running with session_factory() as session: finish_job(session, job_id=job.id, success=True, skipped=False, result={"summary": "ok"}, error_message=None) with session_factory() as session: loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() assert loaded.status == JobStatus.succeeded assert loaded.result_json is not None def test_failed_job_retries_then_fails_terminally() -> None: session_factory = get_session_factory() with session_factory() as session: job = enqueue_job( session, repo="acme/repo", pr_number=271, head_sha="f00dbabe", trigger_comment_id=9902, trigger_comment_body="@codex review", requested_by="alice", command=ParsedCommand(name="review", raw="@codex review"), ) # First attempt fails => requeue. with session_factory() as session: claimed = claim_next_job(session) assert claimed is not None finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message="boom-1") with session_factory() as session: loaded = session.get(ReviewJob, job.id) assert loaded is not None assert loaded.status == JobStatus.queued assert loaded.started_at is None assert loaded.finished_at is None # Second attempt fails => requeue. with session_factory() as session: claimed = claim_next_job(session) assert claimed is not None finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message="boom-2") with session_factory() as session: loaded = session.get(ReviewJob, job.id) assert loaded is not None assert loaded.status == JobStatus.queued # Third attempt fails => terminal failed (max 2 retries exhausted). with session_factory() as session: claimed = claim_next_job(session) assert claimed is not None finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message="boom-3") with session_factory() as session: loaded = session.get(ReviewJob, job.id) assert loaded is not None assert loaded.status == JobStatus.failed assert loaded.finished_at is not None def test_recover_stuck_running_job_requeues_before_retry_limit() -> None: session_factory = get_session_factory() with session_factory() as session: job = enqueue_job( session, repo="acme/repo", pr_number=272, head_sha="feedface", trigger_comment_id=9903, trigger_comment_body="@codex review", requested_by="alice", command=ParsedCommand(name="review", raw="@codex review"), ) with session_factory() as session: claimed = claim_next_job(session) assert claimed is not None stale_start = datetime.now(timezone.utc) - timedelta(minutes=6) db_job = session.get(ReviewJob, job.id) assert db_job is not None db_job.started_at = stale_start session.commit() with session_factory() as session: recovered = recover_stuck_running_jobs(session, lease_timeout_seconds=300, max_retries=2) assert recovered == 1 with session_factory() as session: db_job = session.get(ReviewJob, job.id) assert db_job is not None assert db_job.status == JobStatus.queued assert db_job.started_at is None latest_run = session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one() assert latest_run.status.value == "failed" assert latest_run.error_message is not None assert "timed out" in latest_run.error_message def test_recover_stuck_running_job_fails_after_retry_limit() -> None: session_factory = get_session_factory() with session_factory() as session: job = enqueue_job( session, repo="acme/repo", pr_number=273, head_sha="deadc0de", trigger_comment_id=9904, trigger_comment_body="@codex review", requested_by="alice", command=ParsedCommand(name="review", raw="@codex review"), ) # Build up to third running attempt to hit retry limit when it times out. for _ in range(3): with session_factory() as session: claimed = claim_next_job(session) assert claimed is not None db_job = session.get(ReviewJob, job.id) assert db_job is not None db_job.started_at = datetime.now(timezone.utc) - timedelta(minutes=6) session.commit() with session_factory() as session: recover_stuck_running_jobs(session, lease_timeout_seconds=300, max_retries=2) with session_factory() as session: db_job = session.get(ReviewJob, job.id) assert db_job is not None assert db_job.status == JobStatus.failed assert db_job.finished_at is not None