Files
gitea-codex/tests/test_transitions.py

123 lines
4.9 KiB
Python

from __future__ import annotations
from sqlalchemy import select
from gitea_codex_bot.db import get_session_factory
from datetime import datetime, timedelta, timezone
from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job
from gitea_codex_bot.types import ParsedCommand
def test_claim_and_transition() -> None:
session_factory = get_session_factory()
with session_factory() as session:
job = enqueue_job(
session,
repo="acme/repo",
pr_number=314,
head_sha="deadbeef",
trigger_comment_id=9901,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
with session_factory() as session:
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
assert claimed is not None
assert claimed.id == job.id
assert claimed.status == JobStatus.running
with session_factory() as session:
finish_job(session, job_id=job.id, success=True, skipped=False, result={"summary": "ok"}, error_message=None)
with session_factory() as session:
loaded = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
assert loaded.status == JobStatus.succeeded
assert loaded.result_json is not None
def test_claim_recovers_stuck_running_job_by_requeue() -> None:
session_factory = get_session_factory()
with session_factory() as session:
first = enqueue_job(
session,
repo="acme/repo",
pr_number=1,
head_sha="aaaabbbb",
trigger_comment_id=1001,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
second = enqueue_job(
session,
repo="acme/repo",
pr_number=2,
head_sha="ccccdddd",
trigger_comment_id=1002,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
assert claimed is not None
assert claimed.id == first.id
with session_factory() as session:
stuck = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
stuck.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
recovered = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
assert recovered is not None
assert recovered.id == first.id
assert recovered.status == JobStatus.running
with session_factory() as session:
stale = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
assert stale.last_error is not None
assert "lease timed out" in stale.last_error
failed_runs = session.execute(
select(ReviewRun).where(ReviewRun.job_id == first.id, ReviewRun.status == RunStatus.failed)
).scalars()
assert len(list(failed_runs)) >= 1
queued_later = session.execute(select(ReviewJob).where(ReviewJob.id == second.id)).scalar_one()
assert queued_later.status in (JobStatus.queued, JobStatus.running)
def test_claim_recovers_stuck_running_job_by_fail() -> None:
session_factory = get_session_factory()
with session_factory() as session:
stuck_job = enqueue_job(
session,
repo="acme/repo",
pr_number=3,
head_sha="eeeeffff",
trigger_comment_id=1003,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
claimed = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="requeue")
assert claimed is not None
assert claimed.id == stuck_job.id
with session_factory() as session:
stale = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
no_job = claim_next_job(session, lease_timeout_seconds=300, stuck_job_recovery_action="fail")
assert no_job is None
with session_factory() as session:
failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
assert failed.status == JobStatus.failed
assert failed.finished_at is not None