From dfd67c1cce2ffee3fc5a7b63b2081f4908083f67 Mon Sep 17 00:00:00 2001 From: Luna Date: Fri, 22 May 2026 21:10:11 +0000 Subject: [PATCH 1/4] Add stuck running job recovery with lease timeout --- .env.example | 2 + src/gitea_codex_bot/config.py | 2 + src/gitea_codex_bot/services/jobs.py | 40 +++++++++- src/gitea_codex_bot/workers/dispatcher.py | 6 +- tests/test_transitions.py | 89 ++++++++++++++++++++++- 5 files changed, 135 insertions(+), 4 deletions(-) diff --git a/.env.example b/.env.example index 9348323..3322a5d 100644 --- a/.env.example +++ b/.env.example @@ -44,6 +44,8 @@ WORKDIR=/var/lib/gitea-codex/worktrees MAX_DIFF_BYTES=200000 MAX_REVIEW_MINUTES=10 CONCURRENCY=1 +JOB_LEASE_TIMEOUT_SECONDS=300 +STUCK_JOB_RECOVERY_ACTION=requeue # Image used for ephemeral job containers (Node + npm + Codex CLI install). REVIEW_RUNNER_IMAGE=node:22-bookworm-slim diff --git a/src/gitea_codex_bot/config.py b/src/gitea_codex_bot/config.py index b6d3481..cecf5b4 100644 --- a/src/gitea_codex_bot/config.py +++ b/src/gitea_codex_bot/config.py @@ -38,6 +38,8 @@ class Settings(BaseSettings): max_diff_bytes: int = Field(default=200000, alias="MAX_DIFF_BYTES") max_review_minutes: int = Field(default=10, alias="MAX_REVIEW_MINUTES") concurrency: int = Field(default=1, alias="CONCURRENCY") + job_lease_timeout_seconds: int = Field(default=300, alias="JOB_LEASE_TIMEOUT_SECONDS") + stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION") review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE") enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS") diff --git a/src/gitea_codex_bot/services/jobs.py b/src/gitea_codex_bot/services/jobs.py index 488cda6..c366cac 100644 --- a/src/gitea_codex_bot/services/jobs.py +++ b/src/gitea_codex_bot/services/jobs.py @@ -95,7 +95,8 @@ def enqueue_job( return job -def claim_next_job(session: Session) -> ReviewJob | None: +def claim_next_job(session: Session, *, lease_timeout_seconds: int, stuck_job_recovery_action: str) -> ReviewJob | None: + recover_stuck_jobs(session, lease_timeout_seconds=lease_timeout_seconds, action=stuck_job_recovery_action) job = session.execute( select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True) ).scalar_one_or_none() @@ -120,6 +121,43 @@ def claim_next_job(session: Session) -> ReviewJob | None: return job +def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str) -> int: + if lease_timeout_seconds <= 0: + return 0 + now = datetime.now(timezone.utc) + cutoff = now - timedelta(seconds=lease_timeout_seconds) + stale_jobs = session.execute( + select(ReviewJob) + .where( + ReviewJob.status == JobStatus.running, + ReviewJob.started_at.is_not(None), + ReviewJob.started_at < cutoff, + ) + .order_by(ReviewJob.started_at.asc()) + .with_for_update(skip_locked=True) + ).scalars() + recovered = 0 + for job in stale_jobs: + recovered += 1 + message = f"Job lease timed out after {lease_timeout_seconds}s while in running state; recovered via action={action}." + latest_run = ( + session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none() + ) + if latest_run and latest_run.status == RunStatus.running: + latest_run.status = RunStatus.failed + latest_run.finished_at = now + latest_run.error_message = message + job.last_error = message + if action == "fail": + job.status = JobStatus.failed + job.finished_at = now + else: + job.status = JobStatus.queued + job.started_at = None + job.finished_at = None + return recovered + + def finish_job( session: Session, *, diff --git a/src/gitea_codex_bot/workers/dispatcher.py b/src/gitea_codex_bot/workers/dispatcher.py index 899ff9d..746b080 100644 --- a/src/gitea_codex_bot/workers/dispatcher.py +++ b/src/gitea_codex_bot/workers/dispatcher.py @@ -87,7 +87,11 @@ def _post_review_failure_comment(gitea: GiteaClient, job: ReviewJob, error_messa def process_one_job(settings: Settings) -> bool: session_factory = get_session_factory() with session_factory() as session: - job = claim_next_job(session) + job = claim_next_job( + session, + lease_timeout_seconds=settings.job_lease_timeout_seconds, + stuck_job_recovery_action=settings.stuck_job_recovery_action, + ) if not job: return False diff --git a/tests/test_transitions.py b/tests/test_transitions.py index 5e653e9..48466ed 100644 --- a/tests/test_transitions.py +++ b/tests/test_transitions.py @@ -3,7 +3,9 @@ from __future__ import annotations from sqlalchemy import select from gitea_codex_bot.db import get_session_factory -from gitea_codex_bot.models import JobStatus, ReviewJob +from datetime import datetime, timedelta, timezone + +from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job from gitea_codex_bot.types import ParsedCommand @@ -23,7 +25,7 @@ def test_claim_and_transition() -> None: ) with session_factory() as session: - claimed = claim_next_job(session) + claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") assert claimed is not None assert claimed.id == job.id assert claimed.status == JobStatus.running @@ -35,3 +37,86 @@ def test_claim_and_transition() -> None: loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() assert loaded.status == JobStatus.succeeded assert loaded.result_json is not None + + +def test_claim_recovers_stuck_running_job_by_requeue() -> None: + session_factory = get_session_factory() + with session_factory() as session: + first = enqueue_job( + session, + repo="acme/repo", + pr_number=1, + head_sha="aaaabbbb", + trigger_comment_id=1001, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + second = enqueue_job( + session, + repo="acme/repo", + pr_number=2, + head_sha="ccccdddd", + trigger_comment_id=1002, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + assert claimed is not None + assert claimed.id == first.id + + with session_factory() as session: + stuck = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one() + stuck.started_at = datetime.now(timezone.utc) - timedelta(seconds=601) + session.commit() + + with session_factory() as session: + recovered = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + assert recovered is not None + assert recovered.id == first.id + assert recovered.status == JobStatus.running + + with session_factory() as session: + stale = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one() + assert stale.last_error is not None + assert "lease timed out" in stale.last_error + failed_runs = session.execute( + select(ReviewRun).where(ReviewRun.job_id == first.id, ReviewRun.status == RunStatus.failed) + ).scalars() + assert len(list(failed_runs)) >= 1 + + queued_later = session.execute(select(ReviewJob).where(ReviewJob.id == second.id)).scalar_one() + assert queued_later.status in (JobStatus.queued, JobStatus.running) + + +def test_claim_recovers_stuck_running_job_by_fail() -> None: + session_factory = get_session_factory() + with session_factory() as session: + stuck_job = enqueue_job( + session, + repo="acme/repo", + pr_number=3, + head_sha="eeeeffff", + trigger_comment_id=1003, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + assert claimed is not None + assert claimed.id == stuck_job.id + + with session_factory() as session: + stale = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one() + stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601) + session.commit() + + with session_factory() as session: + no_job = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="fail") + assert no_job is None + + with session_factory() as session: + failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one() + assert failed.status == JobStatus.failed + assert failed.finished_at is not None -- 2.39.5 From d24b4f4f7914e68829c679dcc78403682c010b8d Mon Sep 17 00:00:00 2001 From: Luna Date: Fri, 22 May 2026 21:25:53 +0000 Subject: [PATCH 2/4] Cap stuck-job requeue retries and notify on exhaustion --- .env.example | 1 + src/gitea_codex_bot/config.py | 1 + src/gitea_codex_bot/services/jobs.py | 54 ++++++++++++++++---- src/gitea_codex_bot/workers/dispatcher.py | 30 +++++++++-- tests/test_transitions.py | 61 ++++++++++++++++++++--- 5 files changed, 128 insertions(+), 19 deletions(-) diff --git a/.env.example b/.env.example index 3322a5d..1f3acae 100644 --- a/.env.example +++ b/.env.example @@ -46,6 +46,7 @@ MAX_REVIEW_MINUTES=10 CONCURRENCY=1 JOB_LEASE_TIMEOUT_SECONDS=300 STUCK_JOB_RECOVERY_ACTION=requeue +MAX_STUCK_JOB_RETRIES=1 # Image used for ephemeral job containers (Node + npm + Codex CLI install). REVIEW_RUNNER_IMAGE=node:22-bookworm-slim diff --git a/src/gitea_codex_bot/config.py b/src/gitea_codex_bot/config.py index cecf5b4..9e2a19f 100644 --- a/src/gitea_codex_bot/config.py +++ b/src/gitea_codex_bot/config.py @@ -40,6 +40,7 @@ class Settings(BaseSettings): concurrency: int = Field(default=1, alias="CONCURRENCY") job_lease_timeout_seconds: int = Field(default=300, alias="JOB_LEASE_TIMEOUT_SECONDS") stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION") + max_stuck_job_retries: int = Field(default=1, alias="MAX_STUCK_JOB_RETRIES") review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE") enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS") diff --git a/src/gitea_codex_bot/services/jobs.py b/src/gitea_codex_bot/services/jobs.py index c366cac..03ca2ef 100644 --- a/src/gitea_codex_bot/services/jobs.py +++ b/src/gitea_codex_bot/services/jobs.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging from datetime import datetime, timedelta, timezone +from dataclasses import dataclass from sqlalchemy import select from sqlalchemy.exc import IntegrityError @@ -12,6 +13,17 @@ from gitea_codex_bot.services.security import payload_digest from gitea_codex_bot.types import ParsedCommand logger = logging.getLogger(__name__) +LEASE_TIMEOUT_ERROR_PREFIX = "Job lease timed out" + + +@dataclass +class RecoveryOutcome: + repo: str + pr_number: int + job_id: int + retries_used: int + failed: bool + message: str def persist_webhook_event( @@ -95,8 +107,7 @@ def enqueue_job( return job -def claim_next_job(session: Session, *, lease_timeout_seconds: int, stuck_job_recovery_action: str) -> ReviewJob | None: - recover_stuck_jobs(session, lease_timeout_seconds=lease_timeout_seconds, action=stuck_job_recovery_action) +def claim_next_job(session: Session) -> ReviewJob | None: job = session.execute( select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True) ).scalar_one_or_none() @@ -121,9 +132,9 @@ def claim_next_job(session: Session, *, lease_timeout_seconds: int, stuck_job_re return job -def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str) -> int: +def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str, max_retries: int) -> list[RecoveryOutcome]: if lease_timeout_seconds <= 0: - return 0 + return [] now = datetime.now(timezone.utc) cutoff = now - timedelta(seconds=lease_timeout_seconds) stale_jobs = session.execute( @@ -136,10 +147,24 @@ def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: .order_by(ReviewJob.started_at.asc()) .with_for_update(skip_locked=True) ).scalars() - recovered = 0 + outcomes: list[RecoveryOutcome] = [] for job in stale_jobs: - recovered += 1 - message = f"Job lease timed out after {lease_timeout_seconds}s while in running state; recovered via action={action}." + prior_retries = session.execute( + select(ReviewRun) + .where( + ReviewRun.job_id == job.id, + ReviewRun.status == RunStatus.failed, + ReviewRun.error_message.is_not(None), + ) + .order_by(ReviewRun.id.asc()) + ).scalars() + lease_retries_used = sum(1 for run in prior_retries if (run.error_message or "").startswith(LEASE_TIMEOUT_ERROR_PREFIX)) + retries_used_after_this_timeout = lease_retries_used + 1 + should_fail = action == "fail" or lease_retries_used >= max_retries + message = ( + f"{LEASE_TIMEOUT_ERROR_PREFIX} after {lease_timeout_seconds}s while in running state; " + f"retries_used={retries_used_after_this_timeout}, max_retries={max_retries}." + ) latest_run = ( session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none() ) @@ -148,14 +173,25 @@ def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: latest_run.finished_at = now latest_run.error_message = message job.last_error = message - if action == "fail": + if should_fail: job.status = JobStatus.failed job.finished_at = now else: job.status = JobStatus.queued job.started_at = None job.finished_at = None - return recovered + outcomes.append( + RecoveryOutcome( + repo=job.repo, + pr_number=job.pr_number, + job_id=job.id, + retries_used=retries_used_after_this_timeout, + failed=should_fail, + message=message, + ) + ) + session.commit() + return outcomes def finish_job( diff --git a/src/gitea_codex_bot/workers/dispatcher.py b/src/gitea_codex_bot/workers/dispatcher.py index 746b080..8b57404 100644 --- a/src/gitea_codex_bot/workers/dispatcher.py +++ b/src/gitea_codex_bot/workers/dispatcher.py @@ -12,7 +12,7 @@ from gitea_codex_bot.db import get_session_factory from gitea_codex_bot.models import ReviewJob from gitea_codex_bot.services.comments import upsert_persistent_review_comment_id from gitea_codex_bot.services.gitea import GiteaClient -from gitea_codex_bot.services.jobs import claim_next_job, finish_job +from gitea_codex_bot.services.jobs import claim_next_job, finish_job, recover_stuck_jobs from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment from gitea_codex_bot.services.reviewer import create_fix_branch, create_fix_patch_note from gitea_codex_bot.types import ParsedCommand @@ -86,17 +86,39 @@ def _post_review_failure_comment(gitea: GiteaClient, job: ReviewJob, error_messa def process_one_job(settings: Settings) -> bool: session_factory = get_session_factory() + gitea = GiteaClient(settings) + with session_factory() as session: - job = claim_next_job( + recoveries = recover_stuck_jobs( session, lease_timeout_seconds=settings.job_lease_timeout_seconds, - stuck_job_recovery_action=settings.stuck_job_recovery_action, + action=settings.stuck_job_recovery_action, + max_retries=settings.max_stuck_job_retries, ) + for recovery in recoveries: + if not recovery.failed: + continue + try: + gitea.post_issue_comment( + recovery.repo, + recovery.pr_number, + ( + "⚠️ Codex worker recovery failed this job after stuck lease retries were exhausted.\n\n" + f"- job_id: `{recovery.job_id}`\n" + f"- retries used: `{recovery.retries_used}`\n" + f"- details: {recovery.message}\n\n" + "Please re-run with `@codex review` after investigating runner/worker stability." + ), + ) + except Exception: + logger.exception("Failed to post stuck-job exhaustion comment for job id=%s", recovery.job_id) + + with session_factory() as session: + job = claim_next_job(session) if not job: return False command = _command_from_job(job) - gitea = GiteaClient(settings) logger.info( "Processing job id=%s repo=%s pr=%s command=%s args=%s head_sha=%s", job.id, diff --git a/tests/test_transitions.py b/tests/test_transitions.py index 48466ed..8166f14 100644 --- a/tests/test_transitions.py +++ b/tests/test_transitions.py @@ -6,7 +6,7 @@ from gitea_codex_bot.db import get_session_factory from datetime import datetime, timedelta, timezone from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus -from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job +from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job, recover_stuck_jobs from gitea_codex_bot.types import ParsedCommand @@ -25,7 +25,7 @@ def test_claim_and_transition() -> None: ) with session_factory() as session: - claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + claimed = claim_next_job(session) assert claimed is not None assert claimed.id == job.id assert claimed.status == JobStatus.running @@ -62,7 +62,7 @@ def test_claim_recovers_stuck_running_job_by_requeue() -> None: requested_by="alice", command=ParsedCommand(name="review", raw="@codex review"), ) - claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + claimed = claim_next_job(session) assert claimed is not None assert claimed.id == first.id @@ -72,7 +72,10 @@ def test_claim_recovers_stuck_running_job_by_requeue() -> None: session.commit() with session_factory() as session: - recovered = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1) + assert len(outcomes) == 1 + assert outcomes[0].failed is False + recovered = claim_next_job(session) assert recovered is not None assert recovered.id == first.id assert recovered.status == JobStatus.running @@ -103,7 +106,7 @@ def test_claim_recovers_stuck_running_job_by_fail() -> None: requested_by="alice", command=ParsedCommand(name="review", raw="@codex review"), ) - claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + claimed = claim_next_job(session) assert claimed is not None assert claimed.id == stuck_job.id @@ -113,10 +116,56 @@ def test_claim_recovers_stuck_running_job_by_fail() -> None: session.commit() with session_factory() as session: - no_job = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="fail") + outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="fail", max_retries=1) + assert len(outcomes) == 1 + assert outcomes[0].failed is True + no_job = claim_next_job(session) assert no_job is None with session_factory() as session: failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one() assert failed.status == JobStatus.failed assert failed.finished_at is not None + + +def test_requeue_allows_one_retry_then_fails_on_second_timeout() -> None: + session_factory = get_session_factory() + with session_factory() as session: + job = enqueue_job( + session, + repo="acme/repo", + pr_number=4, + head_sha="11112222", + trigger_comment_id=1004, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + claimed = claim_next_job(session) + assert claimed is not None + assert claimed.id == job.id + + with session_factory() as session: + stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() + stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601) + session.commit() + + with session_factory() as session: + first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1) + assert len(first) == 1 + assert first[0].failed is False + claimed_again = claim_next_job(session) + assert claimed_again is not None + assert claimed_again.id == job.id + + with session_factory() as session: + stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() + stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601) + session.commit() + + with session_factory() as session: + second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1) + assert len(second) == 1 + assert second[0].failed is True + failed = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() + assert failed.status == JobStatus.failed -- 2.39.5 From c73aadc6603907901c3f32c8ff4652376df8afe7 Mon Sep 17 00:00:00 2001 From: Luna Date: Fri, 22 May 2026 21:50:06 +0000 Subject: [PATCH 3/4] Validate lease timeout against max review runtime --- .env.example | 2 +- src/gitea_codex_bot/config.py | 14 ++++++++++++-- tests/test_config.py | 12 ++++++++++++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/.env.example b/.env.example index 1f3acae..2c020db 100644 --- a/.env.example +++ b/.env.example @@ -44,7 +44,7 @@ WORKDIR=/var/lib/gitea-codex/worktrees MAX_DIFF_BYTES=200000 MAX_REVIEW_MINUTES=10 CONCURRENCY=1 -JOB_LEASE_TIMEOUT_SECONDS=300 +JOB_LEASE_TIMEOUT_SECONDS=660 STUCK_JOB_RECOVERY_ACTION=requeue MAX_STUCK_JOB_RETRIES=1 diff --git a/src/gitea_codex_bot/config.py b/src/gitea_codex_bot/config.py index 9e2a19f..c010ac9 100644 --- a/src/gitea_codex_bot/config.py +++ b/src/gitea_codex_bot/config.py @@ -3,7 +3,7 @@ from __future__ import annotations from functools import lru_cache from typing import Literal -from pydantic import Field, SecretStr, field_validator +from pydantic import Field, SecretStr, field_validator, model_validator from pydantic_settings import BaseSettings, SettingsConfigDict @@ -38,7 +38,7 @@ class Settings(BaseSettings): max_diff_bytes: int = Field(default=200000, alias="MAX_DIFF_BYTES") max_review_minutes: int = Field(default=10, alias="MAX_REVIEW_MINUTES") concurrency: int = Field(default=1, alias="CONCURRENCY") - job_lease_timeout_seconds: int = Field(default=300, alias="JOB_LEASE_TIMEOUT_SECONDS") + job_lease_timeout_seconds: int = Field(default=660, alias="JOB_LEASE_TIMEOUT_SECONDS") stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION") max_stuck_job_retries: int = Field(default=1, alias="MAX_STUCK_JOB_RETRIES") @@ -63,6 +63,16 @@ class Settings(BaseSettings): values = [item.strip() for item in self.allowed_repos.split(",")] return {value for value in values if value} + @model_validator(mode="after") + def validate_job_lease_timeout(self) -> "Settings": + minimum_lease_timeout = (self.max_review_minutes * 60) + 60 + if self.job_lease_timeout_seconds < minimum_lease_timeout: + raise ValueError( + "JOB_LEASE_TIMEOUT_SECONDS must be at least MAX_REVIEW_MINUTES*60 + 60 " + f"(minimum {minimum_lease_timeout}s for MAX_REVIEW_MINUTES={self.max_review_minutes})." + ) + return self + @lru_cache(maxsize=1) def get_settings() -> Settings: diff --git a/tests/test_config.py b/tests/test_config.py index e1a5a6b..3b3a903 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,3 +1,6 @@ +import pytest +from pydantic import ValidationError + from gitea_codex_bot.config import get_settings @@ -11,3 +14,12 @@ def test_codex_auth_defaults_to_api_key_mode() -> None: settings = get_settings() assert settings.codex_auth_mode == "api_key" assert settings.codex_auth_json_path is None + + +def test_job_lease_timeout_must_cover_max_review_runtime(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("MAX_REVIEW_MINUTES", "10") + monkeypatch.setenv("JOB_LEASE_TIMEOUT_SECONDS", "300") + get_settings.cache_clear() + + with pytest.raises(ValidationError, match="JOB_LEASE_TIMEOUT_SECONDS must be at least"): + get_settings() -- 2.39.5 From 9028d5b56c24f06b0d9bb2a9f6564d91e293a132 Mon Sep 17 00:00:00 2001 From: Luna Date: Fri, 22 May 2026 21:54:56 +0000 Subject: [PATCH 4/4] Harden stuck-job retry accounting and retry config validation --- src/gitea_codex_bot/config.py | 2 ++ src/gitea_codex_bot/services/jobs.py | 9 ++++++ tests/test_config.py | 8 +++++ tests/test_transitions.py | 46 ++++++++++++++++++++++++++++ 4 files changed, 65 insertions(+) diff --git a/src/gitea_codex_bot/config.py b/src/gitea_codex_bot/config.py index c010ac9..0bd792e 100644 --- a/src/gitea_codex_bot/config.py +++ b/src/gitea_codex_bot/config.py @@ -65,6 +65,8 @@ class Settings(BaseSettings): @model_validator(mode="after") def validate_job_lease_timeout(self) -> "Settings": + if self.max_stuck_job_retries < 0: + raise ValueError("MAX_STUCK_JOB_RETRIES must be >= 0.") minimum_lease_timeout = (self.max_review_minutes * 60) + 60 if self.job_lease_timeout_seconds < minimum_lease_timeout: raise ValueError( diff --git a/src/gitea_codex_bot/services/jobs.py b/src/gitea_codex_bot/services/jobs.py index 03ca2ef..6964d72 100644 --- a/src/gitea_codex_bot/services/jobs.py +++ b/src/gitea_codex_bot/services/jobs.py @@ -172,6 +172,15 @@ def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: latest_run.status = RunStatus.failed latest_run.finished_at = now latest_run.error_message = message + else: + session.add( + ReviewRun( + job_id=job.id, + status=RunStatus.failed, + finished_at=now, + error_message=message, + ) + ) job.last_error = message if should_fail: job.status = JobStatus.failed diff --git a/tests/test_config.py b/tests/test_config.py index 3b3a903..b879836 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -23,3 +23,11 @@ def test_job_lease_timeout_must_cover_max_review_runtime(monkeypatch: pytest.Mon with pytest.raises(ValidationError, match="JOB_LEASE_TIMEOUT_SECONDS must be at least"): get_settings() + + +def test_max_stuck_job_retries_must_be_non_negative(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("MAX_STUCK_JOB_RETRIES", "-1") + get_settings.cache_clear() + + with pytest.raises(ValidationError, match="MAX_STUCK_JOB_RETRIES must be >= 0"): + get_settings() diff --git a/tests/test_transitions.py b/tests/test_transitions.py index 8166f14..03277fb 100644 --- a/tests/test_transitions.py +++ b/tests/test_transitions.py @@ -169,3 +169,49 @@ def test_requeue_allows_one_retry_then_fails_on_second_timeout() -> None: assert second[0].failed is True failed = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() assert failed.status == JobStatus.failed + + +def test_recovery_persists_timeout_retry_even_if_latest_run_not_running() -> None: + session_factory = get_session_factory() + with session_factory() as session: + job = enqueue_job( + session, + repo="acme/repo", + pr_number=5, + head_sha="33334444", + trigger_comment_id=1005, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + claimed = claim_next_job(session) + assert claimed is not None + assert claimed.id == job.id + + with session_factory() as session: + latest_run = session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one() + latest_run.status = RunStatus.succeeded + latest_run.finished_at = datetime.now(timezone.utc) + stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() + stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601) + session.commit() + + with session_factory() as session: + first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1) + assert len(first) == 1 + assert first[0].failed is False + claimed_again = claim_next_job(session) + assert claimed_again is not None + assert claimed_again.id == job.id + + with session_factory() as session: + stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() + stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601) + session.commit() + + with session_factory() as session: + second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1) + assert len(second) == 1 + assert second[0].failed is True + final = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() + assert final.status == JobStatus.failed -- 2.39.5