From 82436f90d150be5d6a60f58e98cd24b2ede77f6f Mon Sep 17 00:00:00 2001 From: Space-Banane Date: Sat, 23 May 2026 00:04:43 +0200 Subject: [PATCH] [bug]. Add stuck-job retry recovery --- TODO.md | 2 +- src/gitea_codex_bot/services/jobs.py | 81 ++++++++++++++++- tests/test_transitions.py | 125 ++++++++++++++++++++++++++- 3 files changed, 202 insertions(+), 6 deletions(-) diff --git a/TODO.md b/TODO.md index 71b0d97..ea267c4 100644 --- a/TODO.md +++ b/TODO.md @@ -7,7 +7,7 @@ - [ ] `BUG`: Remove host-side fallback path for review execution, or gate it behind explicit `ALLOW_HOST_FALLBACK=false` by default so isolation cannot be bypassed silently. - [x] `BUG`: Enforce `.codex-review.yml` `enabled=false` at runtime (currently loaded but not enforced). - [x] `BUG`: Remove `.codex-review.yml` fix policy (`commands.allow_fix`) and rely on global `ENABLE_FIX_COMMANDS`. -- [ ] `BUG`: Add stuck-job recovery for `running` jobs (lease timeout + requeue/fail) so one crashed worker does not deadlock the queue. +- [x] `BUG`: Add stuck-job recovery for `running` jobs (lease timeout + requeue/fail) so one crashed worker does not deadlock the queue. - [ ] `BUG`: Validate required secrets/settings are non-empty at startup (`GITEA_WEBHOOK_SECRET`, `GITEA_TOKEN`, `ALLOWED_REPOS`) and fail fast if blank. - [ ] `TEST`: Add integration test proving the runner executes the exact PR head SHA in isolated mode and does not rely on host checkout. diff --git a/src/gitea_codex_bot/services/jobs.py b/src/gitea_codex_bot/services/jobs.py index 488cda6..0adbece 100644 --- a/src/gitea_codex_bot/services/jobs.py +++ b/src/gitea_codex_bot/services/jobs.py @@ -3,7 +3,7 @@ from __future__ import annotations import logging from datetime import datetime, timedelta, timezone -from sqlalchemy import select +from sqlalchemy import func, select from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session @@ -96,6 +96,7 @@ def enqueue_job( def claim_next_job(session: Session) -> ReviewJob | None: + recover_stuck_running_jobs(session, lease_timeout_seconds=300, max_retries=2) job = session.execute( select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True) ).scalar_one_or_none() @@ -120,6 +121,66 @@ def claim_next_job(session: Session) -> ReviewJob | None: return job +def recover_stuck_running_jobs(session: Session, *, lease_timeout_seconds: int, max_retries: int) -> int: + now = datetime.now(timezone.utc) + lease_cutoff = now - timedelta(seconds=lease_timeout_seconds) + stale_running_jobs = session.execute( + select(ReviewJob) + .where( + ReviewJob.status == JobStatus.running, + ReviewJob.started_at.is_not(None), + ReviewJob.started_at <= lease_cutoff, + ) + .with_for_update(skip_locked=True) + ).scalars().all() + if not stale_running_jobs: + return 0 + + recovered = 0 + for job in stale_running_jobs: + attempt_count = _count_job_attempts(session, job.id) + timeout_error = ( + f"Job lease timed out after {lease_timeout_seconds}s on attempt {attempt_count}. " + "Recovered by queue watchdog." + ) + latest_run = ( + session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none() + ) + if latest_run and latest_run.status == RunStatus.running: + latest_run.status = RunStatus.failed + latest_run.finished_at = now + latest_run.error_message = timeout_error + + retries_used = max(attempt_count - 1, 0) + if retries_used < max_retries: + job.status = JobStatus.queued + job.started_at = None + job.finished_at = None + job.last_error = timeout_error + logger.warning( + "Recovered timed-out running job id=%s by requeueing attempt=%s retries_used=%s/%s", + job.id, + attempt_count, + retries_used, + max_retries, + ) + else: + job.status = JobStatus.failed + job.finished_at = now + job.last_error = timeout_error + logger.error( + "Recovered timed-out running job id=%s by failing permanently attempt=%s retries_used=%s/%s", + job.id, + attempt_count, + retries_used, + max_retries, + ) + recovered += 1 + + session.commit() + return recovered + + def finish_job( session: Session, *, @@ -142,11 +203,20 @@ def finish_job( job.status = JobStatus.succeeded run_status = RunStatus.succeeded else: - job.status = JobStatus.failed + attempt_count = _count_job_attempts(session, job_id) + retries_used = max(attempt_count - 1, 0) + if retries_used < 2: + job.status = JobStatus.queued + else: + job.status = JobStatus.failed run_status = RunStatus.failed now = datetime.now(timezone.utc) - job.finished_at = now + if job.status == JobStatus.queued: + job.started_at = None + job.finished_at = None + else: + job.finished_at = now job.last_error = error_message if result is not None: job.result_json = result @@ -168,3 +238,8 @@ def finish_job( skipped, bool(error_message), ) + + +def _count_job_attempts(session: Session, job_id: int) -> int: + attempts = session.execute(select(func.count(ReviewRun.id)).where(ReviewRun.job_id == job_id)).scalar_one() + return int(attempts or 0) diff --git a/tests/test_transitions.py b/tests/test_transitions.py index 5e653e9..b2d1328 100644 --- a/tests/test_transitions.py +++ b/tests/test_transitions.py @@ -1,10 +1,12 @@ from __future__ import annotations +from datetime import datetime, timedelta, timezone + from sqlalchemy import select from gitea_codex_bot.db import get_session_factory -from gitea_codex_bot.models import JobStatus, ReviewJob -from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job +from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun +from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job, recover_stuck_running_jobs from gitea_codex_bot.types import ParsedCommand @@ -35,3 +37,122 @@ def test_claim_and_transition() -> None: loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() assert loaded.status == JobStatus.succeeded assert loaded.result_json is not None + + +def test_failed_job_retries_then_fails_terminally() -> None: + session_factory = get_session_factory() + with session_factory() as session: + job = enqueue_job( + session, + repo="acme/repo", + pr_number=271, + head_sha="f00dbabe", + trigger_comment_id=9902, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + + # First attempt fails => requeue. + with session_factory() as session: + claimed = claim_next_job(session) + assert claimed is not None + finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message="boom-1") + with session_factory() as session: + loaded = session.get(ReviewJob, job.id) + assert loaded is not None + assert loaded.status == JobStatus.queued + assert loaded.started_at is None + assert loaded.finished_at is None + + # Second attempt fails => requeue. + with session_factory() as session: + claimed = claim_next_job(session) + assert claimed is not None + finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message="boom-2") + with session_factory() as session: + loaded = session.get(ReviewJob, job.id) + assert loaded is not None + assert loaded.status == JobStatus.queued + + # Third attempt fails => terminal failed (max 2 retries exhausted). + with session_factory() as session: + claimed = claim_next_job(session) + assert claimed is not None + finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message="boom-3") + with session_factory() as session: + loaded = session.get(ReviewJob, job.id) + assert loaded is not None + assert loaded.status == JobStatus.failed + assert loaded.finished_at is not None + + +def test_recover_stuck_running_job_requeues_before_retry_limit() -> None: + session_factory = get_session_factory() + with session_factory() as session: + job = enqueue_job( + session, + repo="acme/repo", + pr_number=272, + head_sha="feedface", + trigger_comment_id=9903, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + + with session_factory() as session: + claimed = claim_next_job(session) + assert claimed is not None + stale_start = datetime.now(timezone.utc) - timedelta(minutes=6) + db_job = session.get(ReviewJob, job.id) + assert db_job is not None + db_job.started_at = stale_start + session.commit() + + with session_factory() as session: + recovered = recover_stuck_running_jobs(session, lease_timeout_seconds=300, max_retries=2) + assert recovered == 1 + + with session_factory() as session: + db_job = session.get(ReviewJob, job.id) + assert db_job is not None + assert db_job.status == JobStatus.queued + assert db_job.started_at is None + latest_run = session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one() + assert latest_run.status.value == "failed" + assert latest_run.error_message is not None + assert "timed out" in latest_run.error_message + + +def test_recover_stuck_running_job_fails_after_retry_limit() -> None: + session_factory = get_session_factory() + with session_factory() as session: + job = enqueue_job( + session, + repo="acme/repo", + pr_number=273, + head_sha="deadc0de", + trigger_comment_id=9904, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + + # Build up to third running attempt to hit retry limit when it times out. + for _ in range(3): + with session_factory() as session: + claimed = claim_next_job(session) + assert claimed is not None + db_job = session.get(ReviewJob, job.id) + assert db_job is not None + db_job.started_at = datetime.now(timezone.utc) - timedelta(minutes=6) + session.commit() + with session_factory() as session: + recover_stuck_running_jobs(session, lease_timeout_seconds=300, max_retries=2) + + with session_factory() as session: + db_job = session.get(ReviewJob, job.id) + assert db_job is not None + assert db_job.status == JobStatus.failed + assert db_job.finished_at is not None