Files
gitea-codex/tests/test_jobs.py
Space-Banane e7c7d82f84
Some checks failed
ci / test (push) Failing after 16s
ci / publish (push) Has been skipped
feat. Review foot note, docker fix, pass message to reviewer , update tests
2026-05-22 22:16:09 +02:00

86 lines
3.0 KiB
Python

from __future__ import annotations
from sqlalchemy.exc import IntegrityError
from gitea_codex_bot.db import get_session_factory
from gitea_codex_bot.models import ReviewJob
from gitea_codex_bot.services.jobs import cooldown_remaining_seconds, enqueue_job, persist_webhook_event
from gitea_codex_bot.types import ParsedCommand
def test_persist_webhook_dedupe() -> None:
session_factory = get_session_factory()
with session_factory() as session:
first = persist_webhook_event(session, delivery_id="d1", event_name="issue_comment", repo="acme/repo", comment_id=1, payload=b"{}")
second = persist_webhook_event(session, delivery_id="d1", event_name="issue_comment", repo="acme/repo", comment_id=1, payload=b"{}")
assert first is True
assert second is False
def test_enqueue_and_cooldown() -> None:
session_factory = get_session_factory()
with session_factory() as session:
cmd = ParsedCommand(name="review", raw="@codex review")
enqueue_job(
session,
repo="acme/repo",
pr_number=42,
head_sha="abc",
trigger_comment_id=100,
trigger_comment_body="@codex review",
requested_by="user",
command=cmd,
)
remaining = cooldown_remaining_seconds(session, "acme/repo", 42, 60)
assert remaining >= 0
def test_trigger_comment_unique() -> None:
session_factory = get_session_factory()
with session_factory() as session:
cmd = ParsedCommand(name="review", raw="@codex review")
enqueue_job(
session,
repo="acme/repo",
pr_number=7,
head_sha="x",
trigger_comment_id=321,
trigger_comment_body="@codex review",
requested_by="user",
command=cmd,
)
try:
enqueue_job(
session,
repo="acme/repo",
pr_number=7,
head_sha="x",
trigger_comment_id=321,
trigger_comment_body="@codex review",
requested_by="user",
command=cmd,
)
duplicate_raised = False
except IntegrityError:
duplicate_raised = True
session.rollback()
assert duplicate_raised is True
def test_enqueue_persists_full_trigger_comment_body() -> None:
session_factory = get_session_factory()
with session_factory() as session:
cmd = ParsedCommand(name="review", raw="@codex review security\nplease focus auth")
job = enqueue_job(
session,
repo="acme/repo",
pr_number=55,
head_sha="abc123",
trigger_comment_id=9191,
trigger_comment_body=cmd.raw,
requested_by="alice",
command=cmd,
)
stored = session.get(ReviewJob, job.id)
assert stored is not None
assert stored.trigger_comment_body == "@codex review security\nplease focus auth"