From dc48df1aab416733d050b04cda7d5836001e2583 Mon Sep 17 00:00:00 2001 From: Luna Date: Fri, 22 May 2026 21:10:11 +0000 Subject: [PATCH] Add stuck running job recovery with lease timeout --- .env.example | 2 + src/gitea_codex_bot/config.py | 2 + src/gitea_codex_bot/services/jobs.py | 40 +++++++++- src/gitea_codex_bot/workers/dispatcher.py | 6 +- tests/test_transitions.py | 89 ++++++++++++++++++++++- 5 files changed, 135 insertions(+), 4 deletions(-) diff --git a/.env.example b/.env.example index 9348323..3322a5d 100644 --- a/.env.example +++ b/.env.example @@ -44,6 +44,8 @@ WORKDIR=/var/lib/gitea-codex/worktrees MAX_DIFF_BYTES=200000 MAX_REVIEW_MINUTES=10 CONCURRENCY=1 +JOB_LEASE_TIMEOUT_SECONDS=300 +STUCK_JOB_RECOVERY_ACTION=requeue # Image used for ephemeral job containers (Node + npm + Codex CLI install). REVIEW_RUNNER_IMAGE=node:22-bookworm-slim diff --git a/src/gitea_codex_bot/config.py b/src/gitea_codex_bot/config.py index b6d3481..cecf5b4 100644 --- a/src/gitea_codex_bot/config.py +++ b/src/gitea_codex_bot/config.py @@ -38,6 +38,8 @@ class Settings(BaseSettings): max_diff_bytes: int = Field(default=200000, alias="MAX_DIFF_BYTES") max_review_minutes: int = Field(default=10, alias="MAX_REVIEW_MINUTES") concurrency: int = Field(default=1, alias="CONCURRENCY") + job_lease_timeout_seconds: int = Field(default=300, alias="JOB_LEASE_TIMEOUT_SECONDS") + stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION") review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE") enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS") diff --git a/src/gitea_codex_bot/services/jobs.py b/src/gitea_codex_bot/services/jobs.py index 28f1e8a..15c19f7 100644 --- a/src/gitea_codex_bot/services/jobs.py +++ b/src/gitea_codex_bot/services/jobs.py @@ -82,7 +82,8 @@ def enqueue_job( return job -def claim_next_job(session: Session) -> ReviewJob | None: +def claim_next_job(session: Session, *, lease_timeout_seconds: int, stuck_job_recovery_action: str) -> ReviewJob | None: + recover_stuck_jobs(session, lease_timeout_seconds=lease_timeout_seconds, action=stuck_job_recovery_action) job = session.execute( select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True) ).scalar_one_or_none() @@ -98,6 +99,43 @@ def claim_next_job(session: Session) -> ReviewJob | None: return job +def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str) -> int: + if lease_timeout_seconds <= 0: + return 0 + now = datetime.now(timezone.utc) + cutoff = now - timedelta(seconds=lease_timeout_seconds) + stale_jobs = session.execute( + select(ReviewJob) + .where( + ReviewJob.status == JobStatus.running, + ReviewJob.started_at.is_not(None), + ReviewJob.started_at < cutoff, + ) + .order_by(ReviewJob.started_at.asc()) + .with_for_update(skip_locked=True) + ).scalars() + recovered = 0 + for job in stale_jobs: + recovered += 1 + message = f"Job lease timed out after {lease_timeout_seconds}s while in running state; recovered via action={action}." + latest_run = ( + session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none() + ) + if latest_run and latest_run.status == RunStatus.running: + latest_run.status = RunStatus.failed + latest_run.finished_at = now + latest_run.error_message = message + job.last_error = message + if action == "fail": + job.status = JobStatus.failed + job.finished_at = now + else: + job.status = JobStatus.queued + job.started_at = None + job.finished_at = None + return recovered + + def finish_job( session: Session, *, diff --git a/src/gitea_codex_bot/workers/dispatcher.py b/src/gitea_codex_bot/workers/dispatcher.py index 84b38f1..898e063 100644 --- a/src/gitea_codex_bot/workers/dispatcher.py +++ b/src/gitea_codex_bot/workers/dispatcher.py @@ -78,7 +78,11 @@ def _handle_non_review_command( def process_one_job(settings: Settings) -> bool: session_factory = get_session_factory() with session_factory() as session: - job = claim_next_job(session) + job = claim_next_job( + session, + lease_timeout_seconds=settings.job_lease_timeout_seconds, + stuck_job_recovery_action=settings.stuck_job_recovery_action, + ) if not job: return False diff --git a/tests/test_transitions.py b/tests/test_transitions.py index 5e653e9..48466ed 100644 --- a/tests/test_transitions.py +++ b/tests/test_transitions.py @@ -3,7 +3,9 @@ from __future__ import annotations from sqlalchemy import select from gitea_codex_bot.db import get_session_factory -from gitea_codex_bot.models import JobStatus, ReviewJob +from datetime import datetime, timedelta, timezone + +from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job from gitea_codex_bot.types import ParsedCommand @@ -23,7 +25,7 @@ def test_claim_and_transition() -> None: ) with session_factory() as session: - claimed = claim_next_job(session) + claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") assert claimed is not None assert claimed.id == job.id assert claimed.status == JobStatus.running @@ -35,3 +37,86 @@ def test_claim_and_transition() -> None: loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() assert loaded.status == JobStatus.succeeded assert loaded.result_json is not None + + +def test_claim_recovers_stuck_running_job_by_requeue() -> None: + session_factory = get_session_factory() + with session_factory() as session: + first = enqueue_job( + session, + repo="acme/repo", + pr_number=1, + head_sha="aaaabbbb", + trigger_comment_id=1001, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + second = enqueue_job( + session, + repo="acme/repo", + pr_number=2, + head_sha="ccccdddd", + trigger_comment_id=1002, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + assert claimed is not None + assert claimed.id == first.id + + with session_factory() as session: + stuck = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one() + stuck.started_at = datetime.now(timezone.utc) - timedelta(seconds=601) + session.commit() + + with session_factory() as session: + recovered = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + assert recovered is not None + assert recovered.id == first.id + assert recovered.status == JobStatus.running + + with session_factory() as session: + stale = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one() + assert stale.last_error is not None + assert "lease timed out" in stale.last_error + failed_runs = session.execute( + select(ReviewRun).where(ReviewRun.job_id == first.id, ReviewRun.status == RunStatus.failed) + ).scalars() + assert len(list(failed_runs)) >= 1 + + queued_later = session.execute(select(ReviewJob).where(ReviewJob.id == second.id)).scalar_one() + assert queued_later.status in (JobStatus.queued, JobStatus.running) + + +def test_claim_recovers_stuck_running_job_by_fail() -> None: + session_factory = get_session_factory() + with session_factory() as session: + stuck_job = enqueue_job( + session, + repo="acme/repo", + pr_number=3, + head_sha="eeeeffff", + trigger_comment_id=1003, + trigger_comment_body="@codex review", + requested_by="alice", + command=ParsedCommand(name="review", raw="@codex review"), + ) + claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") + assert claimed is not None + assert claimed.id == stuck_job.id + + with session_factory() as session: + stale = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one() + stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601) + session.commit() + + with session_factory() as session: + no_job = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="fail") + assert no_job is None + + with session_factory() as session: + failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one() + assert failed.status == JobStatus.failed + assert failed.finished_at is not None