Files
gitea-codex/tests/test_jobs.py
Space-Banane d8956b309d
Some checks failed
ci / test (push) Failing after 12s
ci / publish (push) Has been skipped
First MVP
2026-05-22 19:25:57 +02:00

39 lines
1.8 KiB
Python

from __future__ import annotations
from sqlalchemy.exc import IntegrityError
from gitea_codex_bot.db import get_session_factory
from gitea_codex_bot.services.jobs import cooldown_remaining_seconds, enqueue_job, persist_webhook_event
from gitea_codex_bot.types import ParsedCommand
def test_persist_webhook_dedupe() -> None:
session_factory = get_session_factory()
with session_factory() as session:
first = persist_webhook_event(session, delivery_id="d1", event_name="issue_comment", repo="acme/repo", comment_id=1, payload=b"{}")
second = persist_webhook_event(session, delivery_id="d1", event_name="issue_comment", repo="acme/repo", comment_id=1, payload=b"{}")
assert first is True
assert second is False
def test_enqueue_and_cooldown() -> None:
session_factory = get_session_factory()
with session_factory() as session:
cmd = ParsedCommand(name="review", raw="@codex review")
enqueue_job(session, repo="acme/repo", pr_number=42, head_sha="abc", trigger_comment_id=100, requested_by="user", command=cmd)
remaining = cooldown_remaining_seconds(session, "acme/repo", 42, 60)
assert remaining >= 0
def test_trigger_comment_unique() -> None:
session_factory = get_session_factory()
with session_factory() as session:
cmd = ParsedCommand(name="review", raw="@codex review")
enqueue_job(session, repo="acme/repo", pr_number=7, head_sha="x", trigger_comment_id=321, requested_by="user", command=cmd)
try:
enqueue_job(session, repo="acme/repo", pr_number=7, head_sha="x", trigger_comment_id=321, requested_by="user", command=cmd)
duplicate_raised = False
except IntegrityError:
duplicate_raised = True
session.rollback()
assert duplicate_raised is True