diff --git a/.env.example b/.env.example index 3322a5d..1f3acae 100644 --- a/.env.example +++ b/.env.example @@ -46,6 +46,7 @@ MAX_REVIEW_MINUTES=10 CONCURRENCY=1 JOB_LEASE_TIMEOUT_SECONDS=300 STUCK_JOB_RECOVERY_ACTION=requeue +MAX_STUCK_JOB_RETRIES=1 # Image used for ephemeral job containers (Node + npm + Codex CLI install). REVIEW_RUNNER_IMAGE=node:22-bookworm-slim diff --git a/src/gitea_codex_bot/config.py b/src/gitea_codex_bot/config.py index cecf5b4..9e2a19f 100644 --- a/src/gitea_codex_bot/config.py +++ b/src/gitea_codex_bot/config.py @@ -40,6 +40,7 @@ class Settings(BaseSettings): concurrency: int = Field(default=1, alias="CONCURRENCY") job_lease_timeout_seconds: int = Field(default=300, alias="JOB_LEASE_TIMEOUT_SECONDS") stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION") + max_stuck_job_retries: int = Field(default=1, alias="MAX_STUCK_JOB_RETRIES") review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE") enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS") diff --git a/src/gitea_codex_bot/services/jobs.py b/src/gitea_codex_bot/services/jobs.py index c366cac..03ca2ef 100644 --- a/src/gitea_codex_bot/services/jobs.py +++ b/src/gitea_codex_bot/services/jobs.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging from datetime import datetime, timedelta, timezone +from dataclasses import dataclass from sqlalchemy import select from sqlalchemy.exc import IntegrityError @@ -12,6 +13,17 @@ from gitea_codex_bot.services.security import payload_digest from gitea_codex_bot.types import ParsedCommand logger = logging.getLogger(__name__) +LEASE_TIMEOUT_ERROR_PREFIX = "Job lease timed out" + + +@dataclass +class RecoveryOutcome: + repo: str + pr_number: int + job_id: int + retries_used: int + failed: bool + message: str def persist_webhook_event( @@ -95,8 +107,7 @@ def enqueue_job( return job -def claim_next_job(session: Session, *, lease_timeout_seconds: int, stuck_job_recovery_action: str) -> ReviewJob | None: - recover_stuck_jobs(session, lease_timeout_seconds=lease_timeout_seconds, action=stuck_job_recovery_action) +def claim_next_job(session: Session) -> ReviewJob | None: job = session.execute( select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True) ).scalar_one_or_none() @@ -121,9 +132,9 @@ def claim_next_job(session: Session, *, lease_timeout_seconds: int, stuck_job_re return job -def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str) -> int: +def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str, max_retries: int) -> list[RecoveryOutcome]: if lease_timeout_seconds <= 0: - return 0 + return [] now = datetime.now(timezone.utc) cutoff = now - timedelta(seconds=lease_timeout_seconds) stale_jobs = session.execute( @@ -136,10 +147,24 @@ def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: .order_by(ReviewJob.started_at.asc()) .with_for_update(skip_locked=True) ).scalars() - recovered = 0 + outcomes: list[RecoveryOutcome] = [] for job in stale_jobs: - recovered += 1 - message = f"Job lease timed out after {lease_timeout_seconds}s while in running state; recovered via action={action}." + prior_retries = session.execute( + select(ReviewRun) + .where( + ReviewRun.job_id == job.id, + ReviewRun.status == RunStatus.failed, + ReviewRun.error_message.is_not(None), + ) + .order_by(ReviewRun.id.asc()) + ).scalars() + lease_retries_used = sum(1 for run in prior_retries if (run.error_message or "").startswith(LEASE_TIMEOUT_ERROR_PREFIX)) + retries_used_after_this_timeout = lease_retries_used + 1 + should_fail = action == "fail" or lease_retries_used >= max_retries + message = ( + f"{LEASE_TIMEOUT_ERROR_PREFIX} after {lease_timeout_seconds}s while in running state; " + f"retries_used={retries_used_after_this_timeout}, max_retries={max_retries}." + ) latest_run = ( session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none() ) @@ -148,14 +173,25 @@ def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: latest_run.finished_at = now latest_run.error_message = message job.last_error = message - if action == "fail": + if should_fail: job.status = JobStatus.failed job.finished_at = now else: job.status = JobStatus.queued job.started_at = None job.finished_at = None - return recovered + outcomes.append( + RecoveryOutcome( + repo=job.repo, + pr_number=job.pr_number, + job_id=job.id, + retries_used=retries_used_after_this_timeout, + failed=should_fail, + message=message, + ) + ) + session.commit() + return outcomes def finish_job( diff --git a/src/gitea_codex_bot/workers/dispatcher.py b/src/gitea_codex_bot/workers/dispatcher.py index 746b080..8b57404 100644 --- a/src/gitea_codex_bot/workers/dispatcher.py +++ b/src/gitea_codex_bot/workers/dispatcher.py @@ -12,7 +12,7 @@ from gitea_codex_bot.db import get_session_factory from gitea_codex_bot.models import ReviewJob from gitea_codex_bot.services.comments import upsert_persistent_review_comment_id from gitea_codex_bot.services.gitea import GiteaClient -from gitea_codex_bot.services.jobs import claim_next_job, finish_job +from gitea_codex_bot.services.jobs import claim_next_job, finish_job, recover_stuck_jobs from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment from gitea_codex_bot.services.reviewer import create_fix_branch, create_fix_patch_note from gitea_codex_bot.types import ParsedCommand @@ -86,17 +86,39 @@ def _post_review_failure_comment(gitea: GiteaClient, job: ReviewJob, error_messa def process_one_job(settings: Settings) -> bool: session_factory = get_session_factory() + gitea = GiteaClient(settings) + with session_factory() as session: - job = claim_next_job( + recoveries = recover_stuck_jobs( session, lease_timeout_seconds=settings.job_lease_timeout_seconds, - stuck_job_recovery_action=settings.stuck_job_recovery_action, + action=settings.stuck_job_recovery_action, + max_retries=settings.max_stuck_job_retries, ) + for recovery in recoveries: + if not recovery.failed: + continue + try: + gitea.post_issue_comment( + recovery.repo, + recovery.pr_number, + ( + "⚠️ Codex worker recovery failed this job after stuck lease retries were exhausted.\n\n" + f"- job_id: `{recovery.job_id}`\n" + f"- retries used: `{recovery.retries_used}`\n" + f"- details: {recovery.message}\n\n" + "Please re-run with `@codex review` after investigating runner/worker stability." + ), + ) + except Exception: + logger.exception("Failed to post stuck-job exhaustion comment for job id=%s", recovery.job_id) + + with session_factory() as session: + job = claim_next_job(session) if not job: return False command = _command_from_job(job) - gitea = GiteaClient(settings) logger.info( "Processing job id=%s repo=%s pr=%s command=%s args=%s head_sha=%s", job.id, diff --git a/tests/test_transitions.py b/tests/test_transitions.py index 48466ed..8166f14 100644 --- a/tests/test_transitions.py +++ b/tests/test_transitions.py @@ -6,7 +6,7 @@ from gitea_codex_bot.db import get_session_factory from datetime import datetime, timedelta, timezone from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus -from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job +from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job, recover_stuck_jobs from gitea_codex_bot.types import ParsedCommand @@ -25,7 +25,7 @@ def test_claim_and_transition() -> None: ) with session_factory() as session: - claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + claimed = claim_next_job(session) assert claimed is not None assert claimed.id == job.id assert claimed.status == JobStatus.running @@ -62,7 +62,7 @@ def test_claim_recovers_stuck_running_job_by_requeue() -> None: requested_by="alice", command=ParsedCommand(name="review", raw="@codex review"), ) - claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + claimed = claim_next_job(session) assert claimed is not None assert claimed.id == first.id @@ -72,7 +72,10 @@ def test_claim_recovers_stuck_running_job_by_requeue() -> None: session.commit() with session_factory() as session: - recovered = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1) + assert len(outcomes) == 1 + assert outcomes[0].failed is False + recovered = claim_next_job(session) assert recovered is not None assert recovered.id == first.id assert recovered.status == JobStatus.running @@ -103,7 +106,7 @@ def test_claim_recovers_stuck_running_job_by_fail() -> None: requested_by="alice", command=ParsedCommand(name="review", raw="@codex review"), ) - claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + claimed = claim_next_job(session) assert claimed is not None assert claimed.id == stuck_job.id @@ -113,10 +116,56 @@ def test_claim_recovers_stuck_running_job_by_fail() -> None: session.commit() with session_factory() as session: - no_job = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="fail") + outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="fail", max_retries=1) + assert len(outcomes) == 1 + assert outcomes[0].failed is True + no_job = claim_next_job(session) assert no_job is None with session_factory() as session: failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one() assert failed.status == JobStatus.failed assert failed.finished_at is not None + + +def test_requeue_allows_one_retry_then_fails_on_second_timeout() -> None: + session_factory = get_session_factory() + with session_factory() as session: + job = enqueue_job( + session, + repo="acme/repo", + pr_number=4, + head_sha="11112222", + trigger_comment_id=1004, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + claimed = claim_next_job(session) + assert claimed is not None + assert claimed.id == job.id + + with session_factory() as session: + stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() + stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601) + session.commit() + + with session_factory() as session: + first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1) + assert len(first) == 1 + assert first[0].failed is False + claimed_again = claim_next_job(session) + assert claimed_again is not None + assert claimed_again.id == job.id + + with session_factory() as session: + stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() + stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601) + session.commit() + + with session_factory() as session: + second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1) + assert len(second) == 1 + assert second[0].failed is True + failed = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() + assert failed.status == JobStatus.failed