Add stuck-job recovery for running jobs #1

Closed
luna wants to merge 4 commits from feat/stuck-job-recovery into main
5 changed files with 128 additions and 19 deletions
Showing only changes of commit d24b4f4f79 - Show all commits

View File

@@ -46,6 +46,7 @@ MAX_REVIEW_MINUTES=10
CONCURRENCY=1
JOB_LEASE_TIMEOUT_SECONDS=300
STUCK_JOB_RECOVERY_ACTION=requeue
MAX_STUCK_JOB_RETRIES=1
# Image used for ephemeral job containers (Node + npm + Codex CLI install).
REVIEW_RUNNER_IMAGE=node:22-bookworm-slim

View File

@@ -40,6 +40,7 @@ class Settings(BaseSettings):
concurrency: int = Field(default=1, alias="CONCURRENCY")
job_lease_timeout_seconds: int = Field(default=300, alias="JOB_LEASE_TIMEOUT_SECONDS")
stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION")
max_stuck_job_retries: int = Field(default=1, alias="MAX_STUCK_JOB_RETRIES")
review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE")
enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS")

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from dataclasses import dataclass
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
@@ -12,6 +13,17 @@ from gitea_codex_bot.services.security import payload_digest
from gitea_codex_bot.types import ParsedCommand
logger = logging.getLogger(__name__)
LEASE_TIMEOUT_ERROR_PREFIX = "Job lease timed out"
@dataclass
class RecoveryOutcome:
repo: str
pr_number: int
job_id: int
retries_used: int
failed: bool
message: str
def persist_webhook_event(
@@ -95,8 +107,7 @@ def enqueue_job(
return job
def claim_next_job(session: Session, *, lease_timeout_seconds: int, stuck_job_recovery_action: str) -> ReviewJob | None:
recover_stuck_jobs(session, lease_timeout_seconds=lease_timeout_seconds, action=stuck_job_recovery_action)
def claim_next_job(session: Session) -> ReviewJob | None:
job = session.execute(
select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True)
).scalar_one_or_none()
@@ -121,9 +132,9 @@ def claim_next_job(session: Session, *, lease_timeout_seconds: int, stuck_job_re
return job
def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str) -> int:
def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str, max_retries: int) -> list[RecoveryOutcome]:
if lease_timeout_seconds <= 0:
return 0
return []
now = datetime.now(timezone.utc)
cutoff = now - timedelta(seconds=lease_timeout_seconds)
stale_jobs = session.execute(
@@ -136,10 +147,24 @@ def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action:
.order_by(ReviewJob.started_at.asc())
.with_for_update(skip_locked=True)
).scalars()
recovered = 0
outcomes: list[RecoveryOutcome] = []
for job in stale_jobs:
recovered += 1
message = f"Job lease timed out after {lease_timeout_seconds}s while in running state; recovered via action={action}."
prior_retries = session.execute(
select(ReviewRun)
.where(
ReviewRun.job_id == job.id,
ReviewRun.status == RunStatus.failed,
ReviewRun.error_message.is_not(None),
)
.order_by(ReviewRun.id.asc())
).scalars()
lease_retries_used = sum(1 for run in prior_retries if (run.error_message or "").startswith(LEASE_TIMEOUT_ERROR_PREFIX))
retries_used_after_this_timeout = lease_retries_used + 1
should_fail = action == "fail" or lease_retries_used >= max_retries
message = (
f"{LEASE_TIMEOUT_ERROR_PREFIX} after {lease_timeout_seconds}s while in running state; "
f"retries_used={retries_used_after_this_timeout}, max_retries={max_retries}."
)
latest_run = (
session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none()
)
@@ -148,14 +173,25 @@ def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action:
latest_run.finished_at = now
latest_run.error_message = message
job.last_error = message
if action == "fail":
if should_fail:
job.status = JobStatus.failed
job.finished_at = now
else:
job.status = JobStatus.queued
job.started_at = None
job.finished_at = None
return recovered
outcomes.append(
RecoveryOutcome(
repo=job.repo,
pr_number=job.pr_number,
job_id=job.id,
retries_used=retries_used_after_this_timeout,
failed=should_fail,
message=message,
)
)
session.commit()
return outcomes
def finish_job(

View File

@@ -12,7 +12,7 @@ from gitea_codex_bot.db import get_session_factory
from gitea_codex_bot.models import ReviewJob
from gitea_codex_bot.services.comments import upsert_persistent_review_comment_id
from gitea_codex_bot.services.gitea import GiteaClient
from gitea_codex_bot.services.jobs import claim_next_job, finish_job
from gitea_codex_bot.services.jobs import claim_next_job, finish_job, recover_stuck_jobs
from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment
from gitea_codex_bot.services.reviewer import create_fix_branch, create_fix_patch_note
from gitea_codex_bot.types import ParsedCommand
@@ -86,17 +86,39 @@ def _post_review_failure_comment(gitea: GiteaClient, job: ReviewJob, error_messa
def process_one_job(settings: Settings) -> bool:
session_factory = get_session_factory()
gitea = GiteaClient(settings)
with session_factory() as session:
job = claim_next_job(
recoveries = recover_stuck_jobs(
session,
lease_timeout_seconds=settings.job_lease_timeout_seconds,
stuck_job_recovery_action=settings.stuck_job_recovery_action,
action=settings.stuck_job_recovery_action,
max_retries=settings.max_stuck_job_retries,
)
for recovery in recoveries:
if not recovery.failed:
continue
try:
gitea.post_issue_comment(
recovery.repo,
recovery.pr_number,
(
"⚠️ Codex worker recovery failed this job after stuck lease retries were exhausted.\n\n"
f"- job_id: `{recovery.job_id}`\n"
f"- retries used: `{recovery.retries_used}`\n"
f"- details: {recovery.message}\n\n"
"Please re-run with `@codex review` after investigating runner/worker stability."
),
)
except Exception:
logger.exception("Failed to post stuck-job exhaustion comment for job id=%s", recovery.job_id)
with session_factory() as session:
job = claim_next_job(session)
if not job:
return False
command = _command_from_job(job)
gitea = GiteaClient(settings)
logger.info(
"Processing job id=%s repo=%s pr=%s command=%s args=%s head_sha=%s",
job.id,

View File

@@ -6,7 +6,7 @@ from gitea_codex_bot.db import get_session_factory
from datetime import datetime, timedelta, timezone
from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job, recover_stuck_jobs
from gitea_codex_bot.types import ParsedCommand
@@ -25,7 +25,7 @@ def test_claim_and_transition() -> None:
)
with session_factory() as session:
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
claimed = claim_next_job(session)
assert claimed is not None
assert claimed.id == job.id
assert claimed.status == JobStatus.running
@@ -62,7 +62,7 @@ def test_claim_recovers_stuck_running_job_by_requeue() -> None:
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
claimed = claim_next_job(session)
assert claimed is not None
assert claimed.id == first.id
@@ -72,7 +72,10 @@ def test_claim_recovers_stuck_running_job_by_requeue() -> None:
session.commit()
with session_factory() as session:
recovered = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(outcomes) == 1
assert outcomes[0].failed is False
recovered = claim_next_job(session)
assert recovered is not None
assert recovered.id == first.id
assert recovered.status == JobStatus.running
@@ -103,7 +106,7 @@ def test_claim_recovers_stuck_running_job_by_fail() -> None:
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
claimed = claim_next_job(session)
assert claimed is not None
assert claimed.id == stuck_job.id
@@ -113,10 +116,56 @@ def test_claim_recovers_stuck_running_job_by_fail() -> None:
session.commit()
with session_factory() as session:
no_job = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="fail")
outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="fail", max_retries=1)
assert len(outcomes) == 1
assert outcomes[0].failed is True
no_job = claim_next_job(session)
assert no_job is None
with session_factory() as session:
failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
assert failed.status == JobStatus.failed
assert failed.finished_at is not None
def test_requeue_allows_one_retry_then_fails_on_second_timeout() -> None:
session_factory = get_session_factory()
with session_factory() as session:
job = enqueue_job(
session,
repo="acme/repo",
pr_number=4,
head_sha="11112222",
trigger_comment_id=1004,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
claimed = claim_next_job(session)
assert claimed is not None
assert claimed.id == job.id
with session_factory() as session:
stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(first) == 1
assert first[0].failed is False
claimed_again = claim_next_job(session)
assert claimed_again is not None
assert claimed_again.id == job.id
with session_factory() as session:
stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(second) == 1
assert second[0].failed is True
failed = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
assert failed.status == JobStatus.failed