[bug]. Add stuck-job retry recovery
This commit is contained in:
@@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
@@ -96,6 +96,7 @@ def enqueue_job(
|
||||
|
||||
|
||||
def claim_next_job(session: Session) -> ReviewJob | None:
|
||||
recover_stuck_running_jobs(session, lease_timeout_seconds=300, max_retries=2)
|
||||
job = session.execute(
|
||||
select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True)
|
||||
).scalar_one_or_none()
|
||||
@@ -120,6 +121,66 @@ def claim_next_job(session: Session) -> ReviewJob | None:
|
||||
return job
|
||||
|
||||
|
||||
def recover_stuck_running_jobs(session: Session, *, lease_timeout_seconds: int, max_retries: int) -> int:
|
||||
now = datetime.now(timezone.utc)
|
||||
lease_cutoff = now - timedelta(seconds=lease_timeout_seconds)
|
||||
stale_running_jobs = session.execute(
|
||||
select(ReviewJob)
|
||||
.where(
|
||||
ReviewJob.status == JobStatus.running,
|
||||
ReviewJob.started_at.is_not(None),
|
||||
ReviewJob.started_at <= lease_cutoff,
|
||||
)
|
||||
.with_for_update(skip_locked=True)
|
||||
).scalars().all()
|
||||
if not stale_running_jobs:
|
||||
return 0
|
||||
|
||||
recovered = 0
|
||||
for job in stale_running_jobs:
|
||||
attempt_count = _count_job_attempts(session, job.id)
|
||||
timeout_error = (
|
||||
f"Job lease timed out after {lease_timeout_seconds}s on attempt {attempt_count}. "
|
||||
"Recovered by queue watchdog."
|
||||
)
|
||||
latest_run = (
|
||||
session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none()
|
||||
)
|
||||
if latest_run and latest_run.status == RunStatus.running:
|
||||
latest_run.status = RunStatus.failed
|
||||
latest_run.finished_at = now
|
||||
latest_run.error_message = timeout_error
|
||||
|
||||
retries_used = max(attempt_count - 1, 0)
|
||||
if retries_used < max_retries:
|
||||
job.status = JobStatus.queued
|
||||
job.started_at = None
|
||||
job.finished_at = None
|
||||
job.last_error = timeout_error
|
||||
logger.warning(
|
||||
"Recovered timed-out running job id=%s by requeueing attempt=%s retries_used=%s/%s",
|
||||
job.id,
|
||||
attempt_count,
|
||||
retries_used,
|
||||
max_retries,
|
||||
)
|
||||
else:
|
||||
job.status = JobStatus.failed
|
||||
job.finished_at = now
|
||||
job.last_error = timeout_error
|
||||
logger.error(
|
||||
"Recovered timed-out running job id=%s by failing permanently attempt=%s retries_used=%s/%s",
|
||||
job.id,
|
||||
attempt_count,
|
||||
retries_used,
|
||||
max_retries,
|
||||
)
|
||||
recovered += 1
|
||||
|
||||
session.commit()
|
||||
return recovered
|
||||
|
||||
|
||||
def finish_job(
|
||||
session: Session,
|
||||
*,
|
||||
@@ -142,11 +203,20 @@ def finish_job(
|
||||
job.status = JobStatus.succeeded
|
||||
run_status = RunStatus.succeeded
|
||||
else:
|
||||
job.status = JobStatus.failed
|
||||
attempt_count = _count_job_attempts(session, job_id)
|
||||
retries_used = max(attempt_count - 1, 0)
|
||||
if retries_used < 2:
|
||||
job.status = JobStatus.queued
|
||||
else:
|
||||
job.status = JobStatus.failed
|
||||
run_status = RunStatus.failed
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
job.finished_at = now
|
||||
if job.status == JobStatus.queued:
|
||||
job.started_at = None
|
||||
job.finished_at = None
|
||||
else:
|
||||
job.finished_at = now
|
||||
job.last_error = error_message
|
||||
if result is not None:
|
||||
job.result_json = result
|
||||
@@ -168,3 +238,8 @@ def finish_job(
|
||||
skipped,
|
||||
bool(error_message),
|
||||
)
|
||||
|
||||
|
||||
def _count_job_attempts(session: Session, job_id: int) -> int:
|
||||
attempts = session.execute(select(func.count(ReviewRun.id)).where(ReviewRun.job_id == job_id)).scalar_one()
|
||||
return int(attempts or 0)
|
||||
|
||||
Reference in New Issue
Block a user