Add stuck-job recovery for running jobs #1
@@ -44,6 +44,9 @@ WORKDIR=/var/lib/gitea-codex/worktrees
|
|||||||
MAX_DIFF_BYTES=200000
|
MAX_DIFF_BYTES=200000
|
||||||
MAX_REVIEW_MINUTES=10
|
MAX_REVIEW_MINUTES=10
|
||||||
CONCURRENCY=1
|
CONCURRENCY=1
|
||||||
|
JOB_LEASE_TIMEOUT_SECONDS=660
|
||||||
|
STUCK_JOB_RECOVERY_ACTION=requeue
|
||||||
|
MAX_STUCK_JOB_RETRIES=1
|
||||||
|
|
||||||
# Image used for ephemeral job containers (Node + npm + Codex CLI install).
|
# Image used for ephemeral job containers (Node + npm + Codex CLI install).
|
||||||
REVIEW_RUNNER_IMAGE=node:22-bookworm-slim
|
REVIEW_RUNNER_IMAGE=node:22-bookworm-slim
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ from __future__ import annotations
|
|||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from typing import Literal
|
from typing import Literal
|
||||||
|
|
||||||
from pydantic import Field, SecretStr, field_validator
|
from pydantic import Field, SecretStr, field_validator, model_validator
|
||||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
|
||||||
|
|
||||||
@@ -38,6 +38,9 @@ class Settings(BaseSettings):
|
|||||||
max_diff_bytes: int = Field(default=200000, alias="MAX_DIFF_BYTES")
|
max_diff_bytes: int = Field(default=200000, alias="MAX_DIFF_BYTES")
|
||||||
max_review_minutes: int = Field(default=10, alias="MAX_REVIEW_MINUTES")
|
max_review_minutes: int = Field(default=10, alias="MAX_REVIEW_MINUTES")
|
||||||
concurrency: int = Field(default=1, alias="CONCURRENCY")
|
concurrency: int = Field(default=1, alias="CONCURRENCY")
|
||||||
|
job_lease_timeout_seconds: int = Field(default=660, alias="JOB_LEASE_TIMEOUT_SECONDS")
|
||||||
|
stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION")
|
||||||
|
max_stuck_job_retries: int = Field(default=1, alias="MAX_STUCK_JOB_RETRIES")
|
||||||
|
|
||||||
review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE")
|
review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE")
|
||||||
enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS")
|
enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS")
|
||||||
@@ -60,6 +63,18 @@ class Settings(BaseSettings):
|
|||||||
values = [item.strip() for item in self.allowed_repos.split(",")]
|
values = [item.strip() for item in self.allowed_repos.split(",")]
|
||||||
return {value for value in values if value}
|
return {value for value in values if value}
|
||||||
|
|
||||||
|
@model_validator(mode="after")
|
||||||
|
def validate_job_lease_timeout(self) -> "Settings":
|
||||||
|
if self.max_stuck_job_retries < 0:
|
||||||
|
raise ValueError("MAX_STUCK_JOB_RETRIES must be >= 0.")
|
||||||
|
minimum_lease_timeout = (self.max_review_minutes * 60) + 60
|
||||||
|
if self.job_lease_timeout_seconds < minimum_lease_timeout:
|
||||||
|
raise ValueError(
|
||||||
|
"JOB_LEASE_TIMEOUT_SECONDS must be at least MAX_REVIEW_MINUTES*60 + 60 "
|
||||||
|
f"(minimum {minimum_lease_timeout}s for MAX_REVIEW_MINUTES={self.max_review_minutes})."
|
||||||
|
)
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
@lru_cache(maxsize=1)
|
@lru_cache(maxsize=1)
|
||||||
def get_settings() -> Settings:
|
def get_settings() -> Settings:
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
@@ -12,6 +13,17 @@ from gitea_codex_bot.services.security import payload_digest
|
|||||||
from gitea_codex_bot.types import ParsedCommand
|
from gitea_codex_bot.types import ParsedCommand
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
LEASE_TIMEOUT_ERROR_PREFIX = "Job lease timed out"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RecoveryOutcome:
|
||||||
|
repo: str
|
||||||
|
pr_number: int
|
||||||
|
job_id: int
|
||||||
|
retries_used: int
|
||||||
|
failed: bool
|
||||||
|
message: str
|
||||||
|
|
||||||
|
|
||||||
def persist_webhook_event(
|
def persist_webhook_event(
|
||||||
@@ -120,6 +132,77 @@ def claim_next_job(session: Session) -> ReviewJob | None:
|
|||||||
return job
|
return job
|
||||||
|
|
||||||
|
|
||||||
|
def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str, max_retries: int) -> list[RecoveryOutcome]:
|
||||||
|
if lease_timeout_seconds <= 0:
|
||||||
|
return []
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
cutoff = now - timedelta(seconds=lease_timeout_seconds)
|
||||||
|
stale_jobs = session.execute(
|
||||||
|
select(ReviewJob)
|
||||||
|
.where(
|
||||||
|
ReviewJob.status == JobStatus.running,
|
||||||
|
ReviewJob.started_at.is_not(None),
|
||||||
|
ReviewJob.started_at < cutoff,
|
||||||
|
)
|
||||||
|
.order_by(ReviewJob.started_at.asc())
|
||||||
|
.with_for_update(skip_locked=True)
|
||||||
|
).scalars()
|
||||||
|
outcomes: list[RecoveryOutcome] = []
|
||||||
|
for job in stale_jobs:
|
||||||
|
prior_retries = session.execute(
|
||||||
|
select(ReviewRun)
|
||||||
|
.where(
|
||||||
|
ReviewRun.job_id == job.id,
|
||||||
|
ReviewRun.status == RunStatus.failed,
|
||||||
|
ReviewRun.error_message.is_not(None),
|
||||||
|
)
|
||||||
|
.order_by(ReviewRun.id.asc())
|
||||||
|
).scalars()
|
||||||
|
lease_retries_used = sum(1 for run in prior_retries if (run.error_message or "").startswith(LEASE_TIMEOUT_ERROR_PREFIX))
|
||||||
|
retries_used_after_this_timeout = lease_retries_used + 1
|
||||||
|
should_fail = action == "fail" or lease_retries_used >= max_retries
|
||||||
|
message = (
|
||||||
|
f"{LEASE_TIMEOUT_ERROR_PREFIX} after {lease_timeout_seconds}s while in running state; "
|
||||||
|
f"retries_used={retries_used_after_this_timeout}, max_retries={max_retries}."
|
||||||
|
)
|
||||||
|
latest_run = (
|
||||||
|
session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none()
|
||||||
|
)
|
||||||
|
if latest_run and latest_run.status == RunStatus.running:
|
||||||
|
latest_run.status = RunStatus.failed
|
||||||
|
latest_run.finished_at = now
|
||||||
|
latest_run.error_message = message
|
||||||
|
else:
|
||||||
|
session.add(
|
||||||
|
ReviewRun(
|
||||||
|
job_id=job.id,
|
||||||
|
status=RunStatus.failed,
|
||||||
|
finished_at=now,
|
||||||
|
error_message=message,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
job.last_error = message
|
||||||
|
if should_fail:
|
||||||
|
job.status = JobStatus.failed
|
||||||
|
job.finished_at = now
|
||||||
|
else:
|
||||||
|
job.status = JobStatus.queued
|
||||||
|
job.started_at = None
|
||||||
|
job.finished_at = None
|
||||||
|
outcomes.append(
|
||||||
|
RecoveryOutcome(
|
||||||
|
repo=job.repo,
|
||||||
|
pr_number=job.pr_number,
|
||||||
|
job_id=job.id,
|
||||||
|
retries_used=retries_used_after_this_timeout,
|
||||||
|
failed=should_fail,
|
||||||
|
message=message,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
session.commit()
|
||||||
|
return outcomes
|
||||||
|
|
||||||
|
|
||||||
def finish_job(
|
def finish_job(
|
||||||
session: Session,
|
session: Session,
|
||||||
*,
|
*,
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ from gitea_codex_bot.db import get_session_factory
|
|||||||
from gitea_codex_bot.models import ReviewJob
|
from gitea_codex_bot.models import ReviewJob
|
||||||
from gitea_codex_bot.services.comments import upsert_persistent_review_comment_id
|
from gitea_codex_bot.services.comments import upsert_persistent_review_comment_id
|
||||||
from gitea_codex_bot.services.gitea import GiteaClient
|
from gitea_codex_bot.services.gitea import GiteaClient
|
||||||
from gitea_codex_bot.services.jobs import claim_next_job, finish_job
|
from gitea_codex_bot.services.jobs import claim_next_job, finish_job, recover_stuck_jobs
|
||||||
from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment
|
from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment
|
||||||
from gitea_codex_bot.services.reviewer import create_fix_branch, create_fix_patch_note
|
from gitea_codex_bot.services.reviewer import create_fix_branch, create_fix_patch_note
|
||||||
from gitea_codex_bot.types import ParsedCommand
|
from gitea_codex_bot.types import ParsedCommand
|
||||||
@@ -86,13 +86,39 @@ def _post_review_failure_comment(gitea: GiteaClient, job: ReviewJob, error_messa
|
|||||||
|
|
||||||
def process_one_job(settings: Settings) -> bool:
|
def process_one_job(settings: Settings) -> bool:
|
||||||
session_factory = get_session_factory()
|
session_factory = get_session_factory()
|
||||||
|
gitea = GiteaClient(settings)
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
recoveries = recover_stuck_jobs(
|
||||||
|
session,
|
||||||
|
lease_timeout_seconds=settings.job_lease_timeout_seconds,
|
||||||
|
action=settings.stuck_job_recovery_action,
|
||||||
|
max_retries=settings.max_stuck_job_retries,
|
||||||
|
)
|
||||||
|
for recovery in recoveries:
|
||||||
|
if not recovery.failed:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
gitea.post_issue_comment(
|
||||||
|
recovery.repo,
|
||||||
|
recovery.pr_number,
|
||||||
|
(
|
||||||
|
"⚠️ Codex worker recovery failed this job after stuck lease retries were exhausted.\n\n"
|
||||||
|
f"- job_id: `{recovery.job_id}`\n"
|
||||||
|
f"- retries used: `{recovery.retries_used}`\n"
|
||||||
|
f"- details: {recovery.message}\n\n"
|
||||||
|
"Please re-run with `@codex review` after investigating runner/worker stability."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to post stuck-job exhaustion comment for job id=%s", recovery.job_id)
|
||||||
|
|
||||||
with session_factory() as session:
|
with session_factory() as session:
|
||||||
job = claim_next_job(session)
|
job = claim_next_job(session)
|
||||||
if not job:
|
if not job:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
command = _command_from_job(job)
|
command = _command_from_job(job)
|
||||||
gitea = GiteaClient(settings)
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Processing job id=%s repo=%s pr=%s command=%s args=%s head_sha=%s",
|
"Processing job id=%s repo=%s pr=%s command=%s args=%s head_sha=%s",
|
||||||
job.id,
|
job.id,
|
||||||
|
|||||||
@@ -1,3 +1,6 @@
|
|||||||
|
import pytest
|
||||||
|
from pydantic import ValidationError
|
||||||
|
|
||||||
from gitea_codex_bot.config import get_settings
|
from gitea_codex_bot.config import get_settings
|
||||||
|
|
||||||
|
|
||||||
@@ -11,3 +14,20 @@ def test_codex_auth_defaults_to_api_key_mode() -> None:
|
|||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
assert settings.codex_auth_mode == "api_key"
|
assert settings.codex_auth_mode == "api_key"
|
||||||
assert settings.codex_auth_json_path is None
|
assert settings.codex_auth_json_path is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_job_lease_timeout_must_cover_max_review_runtime(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
monkeypatch.setenv("MAX_REVIEW_MINUTES", "10")
|
||||||
|
monkeypatch.setenv("JOB_LEASE_TIMEOUT_SECONDS", "300")
|
||||||
|
get_settings.cache_clear()
|
||||||
|
|
||||||
|
with pytest.raises(ValidationError, match="JOB_LEASE_TIMEOUT_SECONDS must be at least"):
|
||||||
|
get_settings()
|
||||||
|
|
||||||
|
|
||||||
|
def test_max_stuck_job_retries_must_be_non_negative(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
monkeypatch.setenv("MAX_STUCK_JOB_RETRIES", "-1")
|
||||||
|
get_settings.cache_clear()
|
||||||
|
|
||||||
|
with pytest.raises(ValidationError, match="MAX_STUCK_JOB_RETRIES must be >= 0"):
|
||||||
|
get_settings()
|
||||||
|
|||||||
@@ -3,8 +3,10 @@ from __future__ import annotations
|
|||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
from gitea_codex_bot.db import get_session_factory
|
from gitea_codex_bot.db import get_session_factory
|
||||||
from gitea_codex_bot.models import JobStatus, ReviewJob
|
from datetime import datetime, timedelta, timezone
|
||||||
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job
|
|
||||||
|
from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus
|
||||||
|
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job, recover_stuck_jobs
|
||||||
from gitea_codex_bot.types import ParsedCommand
|
from gitea_codex_bot.types import ParsedCommand
|
||||||
|
|
||||||
|
|
||||||
@@ -35,3 +37,181 @@ def test_claim_and_transition() -> None:
|
|||||||
loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
||||||
assert loaded.status == JobStatus.succeeded
|
assert loaded.status == JobStatus.succeeded
|
||||||
assert loaded.result_json is not None
|
assert loaded.result_json is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_claim_recovers_stuck_running_job_by_requeue() -> None:
|
||||||
|
session_factory = get_session_factory()
|
||||||
|
with session_factory() as session:
|
||||||
|
first = enqueue_job(
|
||||||
|
session,
|
||||||
|
repo="acme/repo",
|
||||||
|
pr_number=1,
|
||||||
|
head_sha="aaaabbbb",
|
||||||
|
trigger_comment_id=1001,
|
||||||
|
trigger_comment_body="@codex review",
|
||||||
|
requested_by="alice",
|
||||||
|
command=ParsedCommand(name="review", raw="@codex review"),
|
||||||
|
)
|
||||||
|
second = enqueue_job(
|
||||||
|
session,
|
||||||
|
repo="acme/repo",
|
||||||
|
pr_number=2,
|
||||||
|
head_sha="ccccdddd",
|
||||||
|
trigger_comment_id=1002,
|
||||||
|
trigger_comment_body="@codex review",
|
||||||
|
requested_by="alice",
|
||||||
|
command=ParsedCommand(name="review", raw="@codex review"),
|
||||||
|
)
|
||||||
|
claimed = claim_next_job(session)
|
||||||
|
assert claimed is not None
|
||||||
|
assert claimed.id == first.id
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
stuck = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
|
||||||
|
stuck.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
|
||||||
|
assert len(outcomes) == 1
|
||||||
|
assert outcomes[0].failed is False
|
||||||
|
recovered = claim_next_job(session)
|
||||||
|
assert recovered is not None
|
||||||
|
assert recovered.id == first.id
|
||||||
|
assert recovered.status == JobStatus.running
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
stale = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
|
||||||
|
assert stale.last_error is not None
|
||||||
|
assert "lease timed out" in stale.last_error
|
||||||
|
failed_runs = session.execute(
|
||||||
|
select(ReviewRun).where(ReviewRun.job_id == first.id, ReviewRun.status == RunStatus.failed)
|
||||||
|
).scalars()
|
||||||
|
assert len(list(failed_runs)) >= 1
|
||||||
|
|
||||||
|
queued_later = session.execute(select(ReviewJob).where(ReviewJob.id == second.id)).scalar_one()
|
||||||
|
assert queued_later.status in (JobStatus.queued, JobStatus.running)
|
||||||
|
|
||||||
|
|
||||||
|
def test_claim_recovers_stuck_running_job_by_fail() -> None:
|
||||||
|
session_factory = get_session_factory()
|
||||||
|
with session_factory() as session:
|
||||||
|
stuck_job = enqueue_job(
|
||||||
|
session,
|
||||||
|
repo="acme/repo",
|
||||||
|
pr_number=3,
|
||||||
|
head_sha="eeeeffff",
|
||||||
|
trigger_comment_id=1003,
|
||||||
|
trigger_comment_body="@codex review",
|
||||||
|
requested_by="alice",
|
||||||
|
command=ParsedCommand(name="review", raw="@codex review"),
|
||||||
|
)
|
||||||
|
claimed = claim_next_job(session)
|
||||||
|
assert claimed is not None
|
||||||
|
assert claimed.id == stuck_job.id
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
stale = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
|
||||||
|
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="fail", max_retries=1)
|
||||||
|
assert len(outcomes) == 1
|
||||||
|
assert outcomes[0].failed is True
|
||||||
|
no_job = claim_next_job(session)
|
||||||
|
assert no_job is None
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
|
||||||
|
assert failed.status == JobStatus.failed
|
||||||
|
assert failed.finished_at is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_requeue_allows_one_retry_then_fails_on_second_timeout() -> None:
|
||||||
|
session_factory = get_session_factory()
|
||||||
|
with session_factory() as session:
|
||||||
|
job = enqueue_job(
|
||||||
|
session,
|
||||||
|
repo="acme/repo",
|
||||||
|
pr_number=4,
|
||||||
|
head_sha="11112222",
|
||||||
|
trigger_comment_id=1004,
|
||||||
|
trigger_comment_body="@codex review",
|
||||||
|
requested_by="alice",
|
||||||
|
command=ParsedCommand(name="review", raw="@codex review"),
|
||||||
|
)
|
||||||
|
claimed = claim_next_job(session)
|
||||||
|
assert claimed is not None
|
||||||
|
assert claimed.id == job.id
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
||||||
|
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
|
||||||
|
assert len(first) == 1
|
||||||
|
assert first[0].failed is False
|
||||||
|
claimed_again = claim_next_job(session)
|
||||||
|
assert claimed_again is not None
|
||||||
|
assert claimed_again.id == job.id
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
||||||
|
stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
|
||||||
|
assert len(second) == 1
|
||||||
|
assert second[0].failed is True
|
||||||
|
failed = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
||||||
|
assert failed.status == JobStatus.failed
|
||||||
|
|
||||||
|
|
||||||
|
def test_recovery_persists_timeout_retry_even_if_latest_run_not_running() -> None:
|
||||||
|
session_factory = get_session_factory()
|
||||||
|
with session_factory() as session:
|
||||||
|
job = enqueue_job(
|
||||||
|
session,
|
||||||
|
repo="acme/repo",
|
||||||
|
pr_number=5,
|
||||||
|
head_sha="33334444",
|
||||||
|
trigger_comment_id=1005,
|
||||||
|
trigger_comment_body="@codex review",
|
||||||
|
requested_by="alice",
|
||||||
|
command=ParsedCommand(name="review", raw="@codex review"),
|
||||||
|
)
|
||||||
|
claimed = claim_next_job(session)
|
||||||
|
assert claimed is not None
|
||||||
|
assert claimed.id == job.id
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
latest_run = session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one()
|
||||||
|
latest_run.status = RunStatus.succeeded
|
||||||
|
latest_run.finished_at = datetime.now(timezone.utc)
|
||||||
|
stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
||||||
|
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
|
||||||
|
assert len(first) == 1
|
||||||
|
assert first[0].failed is False
|
||||||
|
claimed_again = claim_next_job(session)
|
||||||
|
assert claimed_again is not None
|
||||||
|
assert claimed_again.id == job.id
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
||||||
|
stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
with session_factory() as session:
|
||||||
|
second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
|
||||||
|
assert len(second) == 1
|
||||||
|
assert second[0].failed is True
|
||||||
|
final = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
||||||
|
assert final.status == JobStatus.failed
|
||||||
|
|||||||
Reference in New Issue
Block a user