Add stuck-job recovery for running jobs #1

Closed
luna wants to merge 4 commits from feat/stuck-job-recovery into main
6 changed files with 332 additions and 5 deletions

View File

@@ -44,6 +44,9 @@ WORKDIR=/var/lib/gitea-codex/worktrees
MAX_DIFF_BYTES=200000 MAX_DIFF_BYTES=200000
MAX_REVIEW_MINUTES=10 MAX_REVIEW_MINUTES=10
CONCURRENCY=1 CONCURRENCY=1
JOB_LEASE_TIMEOUT_SECONDS=660
STUCK_JOB_RECOVERY_ACTION=requeue
MAX_STUCK_JOB_RETRIES=1
# Image used for ephemeral job containers (Node + npm + Codex CLI install). # Image used for ephemeral job containers (Node + npm + Codex CLI install).
REVIEW_RUNNER_IMAGE=node:22-bookworm-slim REVIEW_RUNNER_IMAGE=node:22-bookworm-slim

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
from functools import lru_cache from functools import lru_cache
from typing import Literal from typing import Literal
from pydantic import Field, SecretStr, field_validator from pydantic import Field, SecretStr, field_validator, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -38,6 +38,9 @@ class Settings(BaseSettings):
max_diff_bytes: int = Field(default=200000, alias="MAX_DIFF_BYTES") max_diff_bytes: int = Field(default=200000, alias="MAX_DIFF_BYTES")
max_review_minutes: int = Field(default=10, alias="MAX_REVIEW_MINUTES") max_review_minutes: int = Field(default=10, alias="MAX_REVIEW_MINUTES")
concurrency: int = Field(default=1, alias="CONCURRENCY") concurrency: int = Field(default=1, alias="CONCURRENCY")
job_lease_timeout_seconds: int = Field(default=660, alias="JOB_LEASE_TIMEOUT_SECONDS")
stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION")
max_stuck_job_retries: int = Field(default=1, alias="MAX_STUCK_JOB_RETRIES")
review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE") review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE")
enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS") enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS")
@@ -60,6 +63,18 @@ class Settings(BaseSettings):
values = [item.strip() for item in self.allowed_repos.split(",")] values = [item.strip() for item in self.allowed_repos.split(",")]
return {value for value in values if value} return {value for value in values if value}
@model_validator(mode="after")
def validate_job_lease_timeout(self) -> "Settings":
if self.max_stuck_job_retries < 0:
raise ValueError("MAX_STUCK_JOB_RETRIES must be >= 0.")
minimum_lease_timeout = (self.max_review_minutes * 60) + 60
if self.job_lease_timeout_seconds < minimum_lease_timeout:
raise ValueError(
"JOB_LEASE_TIMEOUT_SECONDS must be at least MAX_REVIEW_MINUTES*60 + 60 "
f"(minimum {minimum_lease_timeout}s for MAX_REVIEW_MINUTES={self.max_review_minutes})."
)
return self
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def get_settings() -> Settings: def get_settings() -> Settings:

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import logging import logging
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from dataclasses import dataclass
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
@@ -12,6 +13,17 @@ from gitea_codex_bot.services.security import payload_digest
from gitea_codex_bot.types import ParsedCommand from gitea_codex_bot.types import ParsedCommand
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
LEASE_TIMEOUT_ERROR_PREFIX = "Job lease timed out"
@dataclass
class RecoveryOutcome:
repo: str
pr_number: int
job_id: int
retries_used: int
failed: bool
message: str
def persist_webhook_event( def persist_webhook_event(
@@ -120,6 +132,77 @@ def claim_next_job(session: Session) -> ReviewJob | None:
return job return job
def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str, max_retries: int) -> list[RecoveryOutcome]:
if lease_timeout_seconds <= 0:
return []
now = datetime.now(timezone.utc)
cutoff = now - timedelta(seconds=lease_timeout_seconds)
stale_jobs = session.execute(
select(ReviewJob)
.where(
ReviewJob.status == JobStatus.running,
ReviewJob.started_at.is_not(None),
ReviewJob.started_at < cutoff,
)
.order_by(ReviewJob.started_at.asc())
.with_for_update(skip_locked=True)
).scalars()
outcomes: list[RecoveryOutcome] = []
for job in stale_jobs:
prior_retries = session.execute(
select(ReviewRun)
.where(
ReviewRun.job_id == job.id,
ReviewRun.status == RunStatus.failed,
ReviewRun.error_message.is_not(None),
)
.order_by(ReviewRun.id.asc())
).scalars()
lease_retries_used = sum(1 for run in prior_retries if (run.error_message or "").startswith(LEASE_TIMEOUT_ERROR_PREFIX))
retries_used_after_this_timeout = lease_retries_used + 1
should_fail = action == "fail" or lease_retries_used >= max_retries
message = (
f"{LEASE_TIMEOUT_ERROR_PREFIX} after {lease_timeout_seconds}s while in running state; "
f"retries_used={retries_used_after_this_timeout}, max_retries={max_retries}."
)
latest_run = (
session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none()
)
if latest_run and latest_run.status == RunStatus.running:
latest_run.status = RunStatus.failed
latest_run.finished_at = now
latest_run.error_message = message
else:
session.add(
ReviewRun(
job_id=job.id,
status=RunStatus.failed,
finished_at=now,
error_message=message,
)
)
job.last_error = message
if should_fail:
job.status = JobStatus.failed
job.finished_at = now
else:
job.status = JobStatus.queued
job.started_at = None
job.finished_at = None
outcomes.append(
RecoveryOutcome(
repo=job.repo,
pr_number=job.pr_number,
job_id=job.id,
retries_used=retries_used_after_this_timeout,
failed=should_fail,
message=message,
)
)
session.commit()
return outcomes
def finish_job( def finish_job(
session: Session, session: Session,
*, *,

View File

@@ -12,7 +12,7 @@ from gitea_codex_bot.db import get_session_factory
from gitea_codex_bot.models import ReviewJob from gitea_codex_bot.models import ReviewJob
from gitea_codex_bot.services.comments import upsert_persistent_review_comment_id from gitea_codex_bot.services.comments import upsert_persistent_review_comment_id
from gitea_codex_bot.services.gitea import GiteaClient from gitea_codex_bot.services.gitea import GiteaClient
from gitea_codex_bot.services.jobs import claim_next_job, finish_job from gitea_codex_bot.services.jobs import claim_next_job, finish_job, recover_stuck_jobs
from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment
from gitea_codex_bot.services.reviewer import create_fix_branch, create_fix_patch_note from gitea_codex_bot.services.reviewer import create_fix_branch, create_fix_patch_note
from gitea_codex_bot.types import ParsedCommand from gitea_codex_bot.types import ParsedCommand
@@ -86,13 +86,39 @@ def _post_review_failure_comment(gitea: GiteaClient, job: ReviewJob, error_messa
def process_one_job(settings: Settings) -> bool: def process_one_job(settings: Settings) -> bool:
session_factory = get_session_factory() session_factory = get_session_factory()
gitea = GiteaClient(settings)
with session_factory() as session:
recoveries = recover_stuck_jobs(
session,
lease_timeout_seconds=settings.job_lease_timeout_seconds,
action=settings.stuck_job_recovery_action,
max_retries=settings.max_stuck_job_retries,
)
for recovery in recoveries:
if not recovery.failed:
continue
try:
gitea.post_issue_comment(
recovery.repo,
recovery.pr_number,
(
"⚠️ Codex worker recovery failed this job after stuck lease retries were exhausted.\n\n"
f"- job_id: `{recovery.job_id}`\n"
f"- retries used: `{recovery.retries_used}`\n"
f"- details: {recovery.message}\n\n"
"Please re-run with `@codex review` after investigating runner/worker stability."
),
)
except Exception:
logger.exception("Failed to post stuck-job exhaustion comment for job id=%s", recovery.job_id)
with session_factory() as session: with session_factory() as session:
job = claim_next_job(session) job = claim_next_job(session)
if not job: if not job:
return False return False
command = _command_from_job(job) command = _command_from_job(job)
gitea = GiteaClient(settings)
logger.info( logger.info(
"Processing job id=%s repo=%s pr=%s command=%s args=%s head_sha=%s", "Processing job id=%s repo=%s pr=%s command=%s args=%s head_sha=%s",
job.id, job.id,

View File

@@ -1,3 +1,6 @@
import pytest
from pydantic import ValidationError
from gitea_codex_bot.config import get_settings from gitea_codex_bot.config import get_settings
@@ -11,3 +14,20 @@ def test_codex_auth_defaults_to_api_key_mode() -> None:
settings = get_settings() settings = get_settings()
assert settings.codex_auth_mode == "api_key" assert settings.codex_auth_mode == "api_key"
assert settings.codex_auth_json_path is None assert settings.codex_auth_json_path is None
def test_job_lease_timeout_must_cover_max_review_runtime(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("MAX_REVIEW_MINUTES", "10")
monkeypatch.setenv("JOB_LEASE_TIMEOUT_SECONDS", "300")
get_settings.cache_clear()
with pytest.raises(ValidationError, match="JOB_LEASE_TIMEOUT_SECONDS must be at least"):
get_settings()
def test_max_stuck_job_retries_must_be_non_negative(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("MAX_STUCK_JOB_RETRIES", "-1")
get_settings.cache_clear()
with pytest.raises(ValidationError, match="MAX_STUCK_JOB_RETRIES must be >= 0"):
get_settings()

View File

@@ -3,8 +3,10 @@ from __future__ import annotations
from sqlalchemy import select from sqlalchemy import select
from gitea_codex_bot.db import get_session_factory from gitea_codex_bot.db import get_session_factory
from gitea_codex_bot.models import JobStatus, ReviewJob from datetime import datetime, timedelta, timezone
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job
from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job, recover_stuck_jobs
from gitea_codex_bot.types import ParsedCommand from gitea_codex_bot.types import ParsedCommand
@@ -35,3 +37,181 @@ def test_claim_and_transition() -> None:
loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
assert loaded.status == JobStatus.succeeded assert loaded.status == JobStatus.succeeded
assert loaded.result_json is not None assert loaded.result_json is not None
def test_claim_recovers_stuck_running_job_by_requeue() -> None:
session_factory = get_session_factory()
with session_factory() as session:
first = enqueue_job(
session,
repo="acme/repo",
pr_number=1,
head_sha="aaaabbbb",
trigger_comment_id=1001,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
second = enqueue_job(
session,
repo="acme/repo",
pr_number=2,
head_sha="ccccdddd",
trigger_comment_id=1002,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
claimed = claim_next_job(session)
assert claimed is not None
assert claimed.id == first.id
with session_factory() as session:
stuck = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
stuck.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(outcomes) == 1
assert outcomes[0].failed is False
recovered = claim_next_job(session)
assert recovered is not None
assert recovered.id == first.id
assert recovered.status == JobStatus.running
with session_factory() as session:
stale = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
assert stale.last_error is not None
assert "lease timed out" in stale.last_error
failed_runs = session.execute(
select(ReviewRun).where(ReviewRun.job_id == first.id, ReviewRun.status == RunStatus.failed)
).scalars()
assert len(list(failed_runs)) >= 1
queued_later = session.execute(select(ReviewJob).where(ReviewJob.id == second.id)).scalar_one()
assert queued_later.status in (JobStatus.queued, JobStatus.running)
def test_claim_recovers_stuck_running_job_by_fail() -> None:
session_factory = get_session_factory()
with session_factory() as session:
stuck_job = enqueue_job(
session,
repo="acme/repo",
pr_number=3,
head_sha="eeeeffff",
trigger_comment_id=1003,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
claimed = claim_next_job(session)
assert claimed is not None
assert claimed.id == stuck_job.id
with session_factory() as session:
stale = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="fail", max_retries=1)
assert len(outcomes) == 1
assert outcomes[0].failed is True
no_job = claim_next_job(session)
assert no_job is None
with session_factory() as session:
failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
assert failed.status == JobStatus.failed
assert failed.finished_at is not None
def test_requeue_allows_one_retry_then_fails_on_second_timeout() -> None:
session_factory = get_session_factory()
with session_factory() as session:
job = enqueue_job(
session,
repo="acme/repo",
pr_number=4,
head_sha="11112222",
trigger_comment_id=1004,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
claimed = claim_next_job(session)
assert claimed is not None
assert claimed.id == job.id
with session_factory() as session:
stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(first) == 1
assert first[0].failed is False
claimed_again = claim_next_job(session)
assert claimed_again is not None
assert claimed_again.id == job.id
with session_factory() as session:
stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(second) == 1
assert second[0].failed is True
failed = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
assert failed.status == JobStatus.failed
def test_recovery_persists_timeout_retry_even_if_latest_run_not_running() -> None:
session_factory = get_session_factory()
with session_factory() as session:
job = enqueue_job(
session,
repo="acme/repo",
pr_number=5,
head_sha="33334444",
trigger_comment_id=1005,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
claimed = claim_next_job(session)
assert claimed is not None
assert claimed.id == job.id
with session_factory() as session:
latest_run = session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one()
latest_run.status = RunStatus.succeeded
latest_run.finished_at = datetime.now(timezone.utc)
stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(first) == 1
assert first[0].failed is False
claimed_again = claim_next_job(session)
assert claimed_again is not None
assert claimed_again.id == job.id
with session_factory() as session:
stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(second) == 1
assert second[0].failed is True
final = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
assert final.status == JobStatus.failed