Cap stuck-job requeue retries and notify on exhaustion
All checks were successful
ci / test (pull_request) Successful in 29s
ci / publish (pull_request) Has been skipped

This commit is contained in:
2026-05-22 21:25:53 +00:00
parent dc48df1aab
commit 1fb67b630d
5 changed files with 129 additions and 19 deletions

View File

@@ -46,6 +46,7 @@ MAX_REVIEW_MINUTES=10
CONCURRENCY=1 CONCURRENCY=1
JOB_LEASE_TIMEOUT_SECONDS=300 JOB_LEASE_TIMEOUT_SECONDS=300
STUCK_JOB_RECOVERY_ACTION=requeue STUCK_JOB_RECOVERY_ACTION=requeue
MAX_STUCK_JOB_RETRIES=1
# Image used for ephemeral job containers (Node + npm + Codex CLI install). # Image used for ephemeral job containers (Node + npm + Codex CLI install).
REVIEW_RUNNER_IMAGE=node:22-bookworm-slim REVIEW_RUNNER_IMAGE=node:22-bookworm-slim

View File

@@ -40,6 +40,7 @@ class Settings(BaseSettings):
concurrency: int = Field(default=1, alias="CONCURRENCY") concurrency: int = Field(default=1, alias="CONCURRENCY")
job_lease_timeout_seconds: int = Field(default=300, alias="JOB_LEASE_TIMEOUT_SECONDS") job_lease_timeout_seconds: int = Field(default=300, alias="JOB_LEASE_TIMEOUT_SECONDS")
stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION") stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION")
max_stuck_job_retries: int = Field(default=1, alias="MAX_STUCK_JOB_RETRIES")
review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE") review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE")
enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS") enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS")

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from dataclasses import dataclass
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
@@ -10,6 +11,18 @@ from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus, W
from gitea_codex_bot.services.security import payload_digest from gitea_codex_bot.services.security import payload_digest
from gitea_codex_bot.types import ParsedCommand from gitea_codex_bot.types import ParsedCommand
LEASE_TIMEOUT_ERROR_PREFIX = "Job lease timed out"
@dataclass
class RecoveryOutcome:
repo: str
pr_number: int
job_id: int
retries_used: int
failed: bool
message: str
def persist_webhook_event( def persist_webhook_event(
session: Session, session: Session,
@@ -82,8 +95,7 @@ def enqueue_job(
return job return job
def claim_next_job(session: Session, *, lease_timeout_seconds: int, stuck_job_recovery_action: str) -> ReviewJob | None: def claim_next_job(session: Session) -> ReviewJob | None:
recover_stuck_jobs(session, lease_timeout_seconds=lease_timeout_seconds, action=stuck_job_recovery_action)
job = session.execute( job = session.execute(
select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True) select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True)
).scalar_one_or_none() ).scalar_one_or_none()
@@ -99,9 +111,9 @@ def claim_next_job(session: Session, *, lease_timeout_seconds: int, stuck_job_re
return job return job
def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str) -> int: def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str, max_retries: int) -> list[RecoveryOutcome]:
if lease_timeout_seconds <= 0: if lease_timeout_seconds <= 0:
return 0 return []
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
cutoff = now - timedelta(seconds=lease_timeout_seconds) cutoff = now - timedelta(seconds=lease_timeout_seconds)
stale_jobs = session.execute( stale_jobs = session.execute(
@@ -114,10 +126,24 @@ def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action:
.order_by(ReviewJob.started_at.asc()) .order_by(ReviewJob.started_at.asc())
.with_for_update(skip_locked=True) .with_for_update(skip_locked=True)
).scalars() ).scalars()
recovered = 0 outcomes: list[RecoveryOutcome] = []
for job in stale_jobs: for job in stale_jobs:
recovered += 1 prior_retries = session.execute(
message = f"Job lease timed out after {lease_timeout_seconds}s while in running state; recovered via action={action}." select(ReviewRun)
.where(
ReviewRun.job_id == job.id,
ReviewRun.status == RunStatus.failed,
ReviewRun.error_message.is_not(None),
)
.order_by(ReviewRun.id.asc())
).scalars()
lease_retries_used = sum(1 for run in prior_retries if (run.error_message or "").startswith(LEASE_TIMEOUT_ERROR_PREFIX))
retries_used_after_this_timeout = lease_retries_used + 1
should_fail = action == "fail" or lease_retries_used >= max_retries
message = (
f"{LEASE_TIMEOUT_ERROR_PREFIX} after {lease_timeout_seconds}s while in running state; "
f"retries_used={retries_used_after_this_timeout}, max_retries={max_retries}."
)
latest_run = ( latest_run = (
session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none() session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none()
) )
@@ -126,14 +152,25 @@ def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action:
latest_run.finished_at = now latest_run.finished_at = now
latest_run.error_message = message latest_run.error_message = message
job.last_error = message job.last_error = message
if action == "fail": if should_fail:
job.status = JobStatus.failed job.status = JobStatus.failed
job.finished_at = now job.finished_at = now
else: else:
job.status = JobStatus.queued job.status = JobStatus.queued
job.started_at = None job.started_at = None
job.finished_at = None job.finished_at = None
return recovered outcomes.append(
RecoveryOutcome(
repo=job.repo,
pr_number=job.pr_number,
job_id=job.id,
retries_used=retries_used_after_this_timeout,
failed=should_fail,
message=message,
)
)
session.commit()
return outcomes
def finish_job( def finish_job(

View File

@@ -13,7 +13,7 @@ from gitea_codex_bot.db import get_session_factory
from gitea_codex_bot.models import ReviewJob from gitea_codex_bot.models import ReviewJob
from gitea_codex_bot.services.comments import get_persistent_review_comment_id, upsert_persistent_review_comment_id from gitea_codex_bot.services.comments import get_persistent_review_comment_id, upsert_persistent_review_comment_id
from gitea_codex_bot.services.gitea import GiteaClient from gitea_codex_bot.services.gitea import GiteaClient
from gitea_codex_bot.services.jobs import claim_next_job, finish_job from gitea_codex_bot.services.jobs import claim_next_job, finish_job, recover_stuck_jobs
from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment
from gitea_codex_bot.services.reviewer import create_fix_branch, create_fix_patch_note from gitea_codex_bot.services.reviewer import create_fix_branch, create_fix_patch_note
from gitea_codex_bot.types import ParsedCommand from gitea_codex_bot.types import ParsedCommand
@@ -77,17 +77,39 @@ def _handle_non_review_command(
def process_one_job(settings: Settings) -> bool: def process_one_job(settings: Settings) -> bool:
session_factory = get_session_factory() session_factory = get_session_factory()
gitea = GiteaClient(settings)
with session_factory() as session: with session_factory() as session:
job = claim_next_job( recoveries = recover_stuck_jobs(
session, session,
lease_timeout_seconds=settings.job_lease_timeout_seconds, lease_timeout_seconds=settings.job_lease_timeout_seconds,
stuck_job_recovery_action=settings.stuck_job_recovery_action, action=settings.stuck_job_recovery_action,
max_retries=settings.max_stuck_job_retries,
) )
for recovery in recoveries:
if not recovery.failed:
continue
try:
gitea.post_issue_comment(
recovery.repo,
recovery.pr_number,
(
"⚠️ Codex worker recovery failed this job after stuck lease retries were exhausted.\n\n"
f"- job_id: `{recovery.job_id}`\n"
f"- retries used: `{recovery.retries_used}`\n"
f"- details: {recovery.message}\n\n"
"Please re-run with `@codex review` after investigating runner/worker stability."
),
)
except Exception:
logger.exception("Failed to post stuck-job exhaustion comment for job id=%s", recovery.job_id)
with session_factory() as session:
job = claim_next_job(session)
if not job: if not job:
return False return False
command = _command_from_job(job) command = _command_from_job(job)
gitea = GiteaClient(settings)
with session_factory() as session: with session_factory() as session:
db_job = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() db_job = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()

View File

@@ -6,7 +6,7 @@ from gitea_codex_bot.db import get_session_factory
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job, recover_stuck_jobs
from gitea_codex_bot.types import ParsedCommand from gitea_codex_bot.types import ParsedCommand
@@ -25,7 +25,7 @@ def test_claim_and_transition() -> None:
) )
with session_factory() as session: with session_factory() as session:
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") claimed = claim_next_job(session)
assert claimed is not None assert claimed is not None
assert claimed.id == job.id assert claimed.id == job.id
assert claimed.status == JobStatus.running assert claimed.status == JobStatus.running
@@ -62,7 +62,7 @@ def test_claim_recovers_stuck_running_job_by_requeue() -> None:
requested_by="alice", requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"), command=ParsedCommand(name="review", raw="@codex review"),
) )
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") claimed = claim_next_job(session)
assert claimed is not None assert claimed is not None
assert claimed.id == first.id assert claimed.id == first.id
@@ -72,7 +72,10 @@ def test_claim_recovers_stuck_running_job_by_requeue() -> None:
session.commit() session.commit()
with session_factory() as session: with session_factory() as session:
recovered = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(outcomes) == 1
assert outcomes[0].failed is False
recovered = claim_next_job(session)
assert recovered is not None assert recovered is not None
assert recovered.id == first.id assert recovered.id == first.id
assert recovered.status == JobStatus.running assert recovered.status == JobStatus.running
@@ -103,7 +106,7 @@ def test_claim_recovers_stuck_running_job_by_fail() -> None:
requested_by="alice", requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"), command=ParsedCommand(name="review", raw="@codex review"),
) )
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue") claimed = claim_next_job(session)
assert claimed is not None assert claimed is not None
assert claimed.id == stuck_job.id assert claimed.id == stuck_job.id
@@ -113,10 +116,56 @@ def test_claim_recovers_stuck_running_job_by_fail() -> None:
session.commit() session.commit()
with session_factory() as session: with session_factory() as session:
no_job = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="fail") outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="fail", max_retries=1)
assert len(outcomes) == 1
assert outcomes[0].failed is True
no_job = claim_next_job(session)
assert no_job is None assert no_job is None
with session_factory() as session: with session_factory() as session:
failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one() failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
assert failed.status == JobStatus.failed assert failed.status == JobStatus.failed
assert failed.finished_at is not None assert failed.finished_at is not None
def test_requeue_allows_one_retry_then_fails_on_second_timeout() -> None:
session_factory = get_session_factory()
with session_factory() as session:
job = enqueue_job(
session,
repo="acme/repo",
pr_number=4,
head_sha="11112222",
trigger_comment_id=1004,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
claimed = claim_next_job(session)
assert claimed is not None
assert claimed.id == job.id
with session_factory() as session:
stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(first) == 1
assert first[0].failed is False
claimed_again = claim_next_job(session)
assert claimed_again is not None
assert claimed_again.id == job.id
with session_factory() as session:
stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(second) == 1
assert second[0].failed is True
failed = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
assert failed.status == JobStatus.failed