218 lines
8.7 KiB
Python
218 lines
8.7 KiB
Python
from __future__ import annotations
|
|
|
|
from sqlalchemy import select
|
|
|
|
from gitea_codex_bot.db import get_session_factory
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus
|
|
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job, recover_stuck_jobs
|
|
from gitea_codex_bot.types import ParsedCommand
|
|
|
|
|
|
def test_claim_and_transition() -> None:
|
|
session_factory = get_session_factory()
|
|
with session_factory() as session:
|
|
job = enqueue_job(
|
|
session,
|
|
repo="acme/repo",
|
|
pr_number=314,
|
|
head_sha="deadbeef",
|
|
trigger_comment_id=9901,
|
|
trigger_comment_body="@codex review",
|
|
requested_by="alice",
|
|
command=ParsedCommand(name="review", raw="@codex review"),
|
|
)
|
|
|
|
with session_factory() as session:
|
|
claimed = claim_next_job(session)
|
|
assert claimed is not None
|
|
assert claimed.id == job.id
|
|
assert claimed.status == JobStatus.running
|
|
|
|
with session_factory() as session:
|
|
finish_job(session, job_id=job.id, success=True, skipped=False, result={"summary": "ok"}, error_message=None)
|
|
|
|
with session_factory() as session:
|
|
loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
|
assert loaded.status == JobStatus.succeeded
|
|
assert loaded.result_json is not None
|
|
|
|
|
|
def test_claim_recovers_stuck_running_job_by_requeue() -> None:
|
|
session_factory = get_session_factory()
|
|
with session_factory() as session:
|
|
first = enqueue_job(
|
|
session,
|
|
repo="acme/repo",
|
|
pr_number=1,
|
|
head_sha="aaaabbbb",
|
|
trigger_comment_id=1001,
|
|
trigger_comment_body="@codex review",
|
|
requested_by="alice",
|
|
command=ParsedCommand(name="review", raw="@codex review"),
|
|
)
|
|
second = enqueue_job(
|
|
session,
|
|
repo="acme/repo",
|
|
pr_number=2,
|
|
head_sha="ccccdddd",
|
|
trigger_comment_id=1002,
|
|
trigger_comment_body="@codex review",
|
|
requested_by="alice",
|
|
command=ParsedCommand(name="review", raw="@codex review"),
|
|
)
|
|
claimed = claim_next_job(session)
|
|
assert claimed is not None
|
|
assert claimed.id == first.id
|
|
|
|
with session_factory() as session:
|
|
stuck = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
|
|
stuck.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
|
session.commit()
|
|
|
|
with session_factory() as session:
|
|
outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
|
|
assert len(outcomes) == 1
|
|
assert outcomes[0].failed is False
|
|
recovered = claim_next_job(session)
|
|
assert recovered is not None
|
|
assert recovered.id == first.id
|
|
assert recovered.status == JobStatus.running
|
|
|
|
with session_factory() as session:
|
|
stale = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
|
|
assert stale.last_error is not None
|
|
assert "lease timed out" in stale.last_error
|
|
failed_runs = session.execute(
|
|
select(ReviewRun).where(ReviewRun.job_id == first.id, ReviewRun.status == RunStatus.failed)
|
|
).scalars()
|
|
assert len(list(failed_runs)) >= 1
|
|
|
|
queued_later = session.execute(select(ReviewJob).where(ReviewJob.id == second.id)).scalar_one()
|
|
assert queued_later.status in (JobStatus.queued, JobStatus.running)
|
|
|
|
|
|
def test_claim_recovers_stuck_running_job_by_fail() -> None:
|
|
session_factory = get_session_factory()
|
|
with session_factory() as session:
|
|
stuck_job = enqueue_job(
|
|
session,
|
|
repo="acme/repo",
|
|
pr_number=3,
|
|
head_sha="eeeeffff",
|
|
trigger_comment_id=1003,
|
|
trigger_comment_body="@codex review",
|
|
requested_by="alice",
|
|
command=ParsedCommand(name="review", raw="@codex review"),
|
|
)
|
|
claimed = claim_next_job(session)
|
|
assert claimed is not None
|
|
assert claimed.id == stuck_job.id
|
|
|
|
with session_factory() as session:
|
|
stale = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
|
|
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
|
session.commit()
|
|
|
|
with session_factory() as session:
|
|
outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="fail", max_retries=1)
|
|
assert len(outcomes) == 1
|
|
assert outcomes[0].failed is True
|
|
no_job = claim_next_job(session)
|
|
assert no_job is None
|
|
|
|
with session_factory() as session:
|
|
failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
|
|
assert failed.status == JobStatus.failed
|
|
assert failed.finished_at is not None
|
|
|
|
|
|
def test_requeue_allows_one_retry_then_fails_on_second_timeout() -> None:
|
|
session_factory = get_session_factory()
|
|
with session_factory() as session:
|
|
job = enqueue_job(
|
|
session,
|
|
repo="acme/repo",
|
|
pr_number=4,
|
|
head_sha="11112222",
|
|
trigger_comment_id=1004,
|
|
trigger_comment_body="@codex review",
|
|
requested_by="alice",
|
|
command=ParsedCommand(name="review", raw="@codex review"),
|
|
)
|
|
claimed = claim_next_job(session)
|
|
assert claimed is not None
|
|
assert claimed.id == job.id
|
|
|
|
with session_factory() as session:
|
|
stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
|
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
|
session.commit()
|
|
|
|
with session_factory() as session:
|
|
first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
|
|
assert len(first) == 1
|
|
assert first[0].failed is False
|
|
claimed_again = claim_next_job(session)
|
|
assert claimed_again is not None
|
|
assert claimed_again.id == job.id
|
|
|
|
with session_factory() as session:
|
|
stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
|
stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
|
session.commit()
|
|
|
|
with session_factory() as session:
|
|
second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
|
|
assert len(second) == 1
|
|
assert second[0].failed is True
|
|
failed = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
|
assert failed.status == JobStatus.failed
|
|
|
|
|
|
def test_recovery_persists_timeout_retry_even_if_latest_run_not_running() -> None:
|
|
session_factory = get_session_factory()
|
|
with session_factory() as session:
|
|
job = enqueue_job(
|
|
session,
|
|
repo="acme/repo",
|
|
pr_number=5,
|
|
head_sha="33334444",
|
|
trigger_comment_id=1005,
|
|
trigger_comment_body="@codex review",
|
|
requested_by="alice",
|
|
command=ParsedCommand(name="review", raw="@codex review"),
|
|
)
|
|
claimed = claim_next_job(session)
|
|
assert claimed is not None
|
|
assert claimed.id == job.id
|
|
|
|
with session_factory() as session:
|
|
latest_run = session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one()
|
|
latest_run.status = RunStatus.succeeded
|
|
latest_run.finished_at = datetime.now(timezone.utc)
|
|
stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
|
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
|
session.commit()
|
|
|
|
with session_factory() as session:
|
|
first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
|
|
assert len(first) == 1
|
|
assert first[0].failed is False
|
|
claimed_again = claim_next_job(session)
|
|
assert claimed_again is not None
|
|
assert claimed_again.id == job.id
|
|
|
|
with session_factory() as session:
|
|
stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
|
stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
|
|
session.commit()
|
|
|
|
with session_factory() as session:
|
|
second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
|
|
assert len(second) == 1
|
|
assert second[0].failed is True
|
|
final = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
|
|
assert final.status == JobStatus.failed
|