Add stuck running job recovery with lease timeout
This commit is contained in:
@@ -44,6 +44,8 @@ WORKDIR=/var/lib/gitea-codex/worktrees
|
||||
MAX_DIFF_BYTES=200000
|
||||
MAX_REVIEW_MINUTES=10
|
||||
CONCURRENCY=1
|
||||
JOB_LEASE_TIMEOUT_SECONDS=300
|
||||
STUCK_JOB_RECOVERY_ACTION=requeue
|
||||
|
||||
# Image used for ephemeral job containers (Node + npm + Codex CLI install).
|
||||
REVIEW_RUNNER_IMAGE=node:22-bookworm-slim
|
||||
|
||||
@@ -38,6 +38,8 @@ class Settings(BaseSettings):
|
||||
max_diff_bytes: int = Field(default=200000, alias="MAX_DIFF_BYTES")
|
||||
max_review_minutes: int = Field(default=10, alias="MAX_REVIEW_MINUTES")
|
||||
concurrency: int = Field(default=1, alias="CONCURRENCY")
|
||||
job_lease_timeout_seconds: int = Field(default=300, alias="JOB_LEASE_TIMEOUT_SECONDS")
|
||||
stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION")
|
||||
|
||||
review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE")
|
||||
enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS")
|
||||
|
||||
@@ -82,7 +82,8 @@ def enqueue_job(
|
||||
return job
|
||||
|
||||
|
||||
def claim_next_job(session: Session) -> ReviewJob | None:
|
||||
def claim_next_job(session: Session, *, lease_timeout_seconds: int, stuck_job_recovery_action: str) -> ReviewJob | None:
|
||||
recover_stuck_jobs(session, lease_timeout_seconds=lease_timeout_seconds, action=stuck_job_recovery_action)
|
||||
job = session.execute(
|
||||
select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True)
|
||||
).scalar_one_or_none()
|
||||
@@ -98,6 +99,43 @@ def claim_next_job(session: Session) -> ReviewJob | None:
|
||||
return job
|
||||
|
||||
|
||||
def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str) -> int:
|
||||
if lease_timeout_seconds <= 0:
|
||||
return 0
|
||||
now = datetime.now(timezone.utc)
|
||||
cutoff = now - timedelta(seconds=lease_timeout_seconds)
|
||||
stale_jobs = session.execute(
|
||||
select(ReviewJob)
|
||||
.where(
|
||||
ReviewJob.status == JobStatus.running,
|
||||
ReviewJob.started_at.is_not(None),
|
||||
ReviewJob.started_at < cutoff,
|
||||
)
|
||||
.order_by(ReviewJob.started_at.asc())
|
||||
.with_for_update(skip_locked=True)
|
||||
).scalars()
|
||||
recovered = 0
|
||||
for job in stale_jobs:
|
||||
recovered += 1
|
||||
message = f"Job lease timed out after {lease_timeout_seconds}s while in running state; recovered via action={action}."
|
||||
latest_run = (
|
||||
session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none()
|
||||
)
|
||||
if latest_run and latest_run.status == RunStatus.running:
|
||||
latest_run.status = RunStatus.failed
|
||||
latest_run.finished_at = now
|
||||
latest_run.error_message = message
|
||||
job.last_error = message
|
||||
if action == "fail":
|
||||
job.status = JobStatus.failed
|
||||
job.finished_at = now
|
||||
else:
|
||||
job.status = JobStatus.queued
|
||||
job.started_at = None
|
||||
job.finished_at = None
|
||||
return recovered
|
||||
|
||||
|
||||
def finish_job(
|
||||
session: Session,
|
||||
*,
|
||||
|
||||
@@ -78,7 +78,11 @@ def _handle_non_review_command(
|
||||
def process_one_job(settings: Settings) -> bool:
|
||||
session_factory = get_session_factory()
|
||||
with session_factory() as session:
|
||||
job = claim_next_job(session)
|
||||
job = claim_next_job(
|
||||
session,
|
||||
lease_timeout_seconds=settings.job_lease_timeout_seconds,
|
||||
stuck_job_recovery_action=settings.stuck_job_recovery_action,
|
||||
)
|
||||
if not job:
|
||||
return False
|
||||
|
||||
|
||||
@@ -3,7 +3,9 @@ from __future__ import annotations
|
||||
from sqlalchemy import select
|
||||
|
||||
from gitea_codex_bot.db import get_session_factory
|
||||
from gitea_codex_bot.models import JobStatus, ReviewJob
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus
|
||||
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job
|
||||
from gitea_codex_bot.types import ParsedCommand
|
||||
|
||||
@@ -23,7 +25,7 @@ def test_claim_and_transition() -> None:
|
||||
)
|
||||
|
||||
with session_factory() as session:
|
||||
claimed = claim_next_job(session)
|
||||
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
|
||||
assert claimed is not None
|
||||
assert claimed.id == job.id
|
||||
assert claimed.status == JobStatus.running
|
||||
@@ -35,3 +37,86 @@ def test_claim_and_transition() -> None:
|
||||
loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
||||
assert loaded.status == JobStatus.succeeded
|
||||
assert loaded.result_json is not None
|
||||
|
||||
|
||||
def test_claim_recovers_stuck_running_job_by_requeue() -> None:
|
||||
session_factory = get_session_factory()
|
||||
with session_factory() as session:
|
||||
first = enqueue_job(
|
||||
session,
|
||||
repo="acme/repo",
|
||||
pr_number=1,
|
||||
head_sha="aaaabbbb",
|
||||
trigger_comment_id=1001,
|
||||
trigger_comment_body="@codex review",
|
||||
requested_by="alice",
|
||||
command=ParsedCommand(name="review", raw="@codex review"),
|
||||
)
|
||||
second = enqueue_job(
|
||||
session,
|
||||
repo="acme/repo",
|
||||
pr_number=2,
|
||||
head_sha="ccccdddd",
|
||||
trigger_comment_id=1002,
|
||||
trigger_comment_body="@codex review",
|
||||
requested_by="alice",
|
||||
command=ParsedCommand(name="review", raw="@codex review"),
|
||||
)
|
||||
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
|
||||
assert claimed is not None
|
||||
assert claimed.id == first.id
|
||||
|
||||
with session_factory() as session:
|
||||
stuck = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
|
||||
stuck.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
||||
session.commit()
|
||||
|
||||
with session_factory() as session:
|
||||
recovered = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
|
||||
assert recovered is not None
|
||||
assert recovered.id == first.id
|
||||
assert recovered.status == JobStatus.running
|
||||
|
||||
with session_factory() as session:
|
||||
stale = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
|
||||
assert stale.last_error is not None
|
||||
assert "lease timed out" in stale.last_error
|
||||
failed_runs = session.execute(
|
||||
select(ReviewRun).where(ReviewRun.job_id == first.id, ReviewRun.status == RunStatus.failed)
|
||||
).scalars()
|
||||
assert len(list(failed_runs)) >= 1
|
||||
|
||||
queued_later = session.execute(select(ReviewJob).where(ReviewJob.id == second.id)).scalar_one()
|
||||
assert queued_later.status in (JobStatus.queued, JobStatus.running)
|
||||
|
||||
|
||||
def test_claim_recovers_stuck_running_job_by_fail() -> None:
|
||||
session_factory = get_session_factory()
|
||||
with session_factory() as session:
|
||||
stuck_job = enqueue_job(
|
||||
session,
|
||||
repo="acme/repo",
|
||||
pr_number=3,
|
||||
head_sha="eeeeffff",
|
||||
trigger_comment_id=1003,
|
||||
trigger_comment_body="@codex review",
|
||||
requested_by="alice",
|
||||
command=ParsedCommand(name="review", raw="@codex review"),
|
||||
)
|
||||
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
|
||||
assert claimed is not None
|
||||
assert claimed.id == stuck_job.id
|
||||
|
||||
with session_factory() as session:
|
||||
stale = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
|
||||
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
||||
session.commit()
|
||||
|
||||
with session_factory() as session:
|
||||
no_job = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="fail")
|
||||
assert no_job is None
|
||||
|
||||
with session_factory() as session:
|
||||
failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
|
||||
assert failed.status == JobStatus.failed
|
||||
assert failed.finished_at is not None
|
||||
|
||||
Reference in New Issue
Block a user