Compare commits

..

2 Commits

Author SHA1 Message Date
1fb67b630d Cap stuck-job requeue retries and notify on exhaustion
All checks were successful
ci / test (pull_request) Successful in 29s
ci / publish (pull_request) Has been skipped
2026-05-22 21:25:53 +00:00
dc48df1aab Add stuck running job recovery with lease timeout
Some checks failed
ci / test (pull_request) Failing after 30s
ci / publish (pull_request) Has been skipped
2026-05-22 21:10:11 +00:00
13 changed files with 57 additions and 487 deletions

View File

@@ -44,7 +44,7 @@ WORKDIR=/var/lib/gitea-codex/worktrees
MAX_DIFF_BYTES=200000 MAX_DIFF_BYTES=200000
MAX_REVIEW_MINUTES=10 MAX_REVIEW_MINUTES=10
CONCURRENCY=1 CONCURRENCY=1
JOB_LEASE_TIMEOUT_SECONDS=660 JOB_LEASE_TIMEOUT_SECONDS=300
STUCK_JOB_RECOVERY_ACTION=requeue STUCK_JOB_RECOVERY_ACTION=requeue
MAX_STUCK_JOB_RETRIES=1 MAX_STUCK_JOB_RETRIES=1

View File

@@ -11,10 +11,8 @@ COPY pyproject.toml README.md /app/
COPY src /app/src COPY src /app/src
COPY alembic.ini /app/ COPY alembic.ini /app/
COPY alembic /app/alembic COPY alembic /app/alembic
COPY docker/entrypoint.sh /app/docker/entrypoint.sh
RUN pip install --no-cache-dir . RUN pip install --no-cache-dir .
RUN chmod +x /app/docker/entrypoint.sh
EXPOSE 8000 EXPOSE 8000
CMD ["/app/docker/entrypoint.sh"] CMD ["uvicorn", "gitea_codex_bot.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -12,7 +12,6 @@
- [ ] `TEST`: Add integration test proving the runner executes the exact PR head SHA in isolated mode and does not rely on host checkout. - [ ] `TEST`: Add integration test proving the runner executes the exact PR head SHA in isolated mode and does not rely on host checkout.
### P1 (Important) ### P1 (Important)
- [x] `BUG`: Log webhook events rejected because repo is not listed in `ALLOWED_REPOS`.
- [ ] `FEATURE`: Full control UI to update the bots settings. Password in env variable protected login page. No more env variables. - [ ] `FEATURE`: Full control UI to update the bots settings. Password in env variable protected login page. No more env variables.
- [ ] `FEATURE`: Automatic Trigger on new PRs and or commits on PRs with context that its a change that needs review not the whole PR again. GITEA_ALLOW_PR_AUTO_REVIEW=true would be needed - [ ] `FEATURE`: Automatic Trigger on new PRs and or commits on PRs with context that its a change that needs review not the whole PR again. GITEA_ALLOW_PR_AUTO_REVIEW=true would be needed
- [ ] `BUG`: Container runner hardcodes `codex exec --json -m gpt-5`; use `OPENAI_REVIEW_MODEL` and `OPENAI_REASONING_EFFORT` consistently across runner paths. - [ ] `BUG`: Container runner hardcodes `codex exec --json -m gpt-5`; use `OPENAI_REVIEW_MODEL` and `OPENAI_REASONING_EFFORT` consistently across runner paths.

View File

@@ -1,44 +0,0 @@
#!/bin/sh
set -eu
echo "Checking migration baseline..."
python - <<'PY'
from sqlalchemy import create_engine, inspect, text
from gitea_codex_bot.config import get_settings
settings = get_settings()
engine = create_engine(settings.sqlalchemy_url)
with engine.connect() as conn:
inspector = inspect(conn)
tables = set(inspector.get_table_names())
has_alembic_version = "alembic_version" in tables
has_review_jobs = "review_jobs" in tables
has_webhook_events = "webhook_events" in tables
stamped_revision = None
if has_alembic_version:
row = conn.execute(text("SELECT version_num FROM alembic_version LIMIT 1")).fetchone()
if row and row[0]:
stamped_revision = row[0]
if (not has_alembic_version or not stamped_revision) and (has_review_jobs or has_webhook_events):
revision = "0001_initial"
if has_review_jobs:
columns = {c["name"] for c in inspector.get_columns("review_jobs")}
if "trigger_comment_body" in columns:
revision = "0002_trigger_comment_body"
conn.execute(text("CREATE TABLE IF NOT EXISTS alembic_version (version_num VARCHAR(32) NOT NULL, PRIMARY KEY (version_num))"))
conn.execute(text("DELETE FROM alembic_version"))
conn.execute(text("INSERT INTO alembic_version (version_num) VALUES (:revision)"), {"revision": revision})
conn.commit()
print(f"Stamped legacy database at revision {revision}")
PY
echo "Running database migrations..."
alembic upgrade head
echo "Starting API server..."
exec uvicorn gitea_codex_bot.main:app --host 0.0.0.0 --port 8000

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
from functools import lru_cache from functools import lru_cache
from typing import Literal from typing import Literal
from pydantic import Field, SecretStr, field_validator, model_validator from pydantic import Field, SecretStr, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -38,7 +38,7 @@ class Settings(BaseSettings):
max_diff_bytes: int = Field(default=200000, alias="MAX_DIFF_BYTES") max_diff_bytes: int = Field(default=200000, alias="MAX_DIFF_BYTES")
max_review_minutes: int = Field(default=10, alias="MAX_REVIEW_MINUTES") max_review_minutes: int = Field(default=10, alias="MAX_REVIEW_MINUTES")
concurrency: int = Field(default=1, alias="CONCURRENCY") concurrency: int = Field(default=1, alias="CONCURRENCY")
job_lease_timeout_seconds: int = Field(default=660, alias="JOB_LEASE_TIMEOUT_SECONDS") job_lease_timeout_seconds: int = Field(default=300, alias="JOB_LEASE_TIMEOUT_SECONDS")
stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION") stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION")
max_stuck_job_retries: int = Field(default=1, alias="MAX_STUCK_JOB_RETRIES") max_stuck_job_retries: int = Field(default=1, alias="MAX_STUCK_JOB_RETRIES")
@@ -63,16 +63,6 @@ class Settings(BaseSettings):
values = [item.strip() for item in self.allowed_repos.split(",")] values = [item.strip() for item in self.allowed_repos.split(",")]
return {value for value in values if value} return {value for value in values if value}
@model_validator(mode="after")
def validate_job_lease_timeout(self) -> "Settings":
minimum_lease_timeout = (self.max_review_minutes * 60) + 60
if self.job_lease_timeout_seconds < minimum_lease_timeout:
raise ValueError(
"JOB_LEASE_TIMEOUT_SECONDS must be at least MAX_REVIEW_MINUTES*60 + 60 "
f"(minimum {minimum_lease_timeout}s for MAX_REVIEW_MINUTES={self.max_review_minutes})."
)
return self
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def get_settings() -> Settings: def get_settings() -> Settings:

View File

@@ -11,12 +11,10 @@ from fastapi import Depends, FastAPI, Header, HTTPException, Request, status
from fastapi.exception_handlers import http_exception_handler from fastapi.exception_handlers import http_exception_handler
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.exceptions import HTTPException as StarletteHTTPException
from sqlalchemy import select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from gitea_codex_bot.config import Settings, get_settings from gitea_codex_bot.config import Settings, get_settings
from gitea_codex_bot.db import get_session from gitea_codex_bot.db import Base, get_engine, get_session
from gitea_codex_bot.models import JobStatus, ReviewJob
from gitea_codex_bot.services.commands import parse_command from gitea_codex_bot.services.commands import parse_command
from gitea_codex_bot.services.gitea import GiteaClient from gitea_codex_bot.services.gitea import GiteaClient
from gitea_codex_bot.services.jobs import cooldown_remaining_seconds, enqueue_job, persist_webhook_event from gitea_codex_bot.services.jobs import cooldown_remaining_seconds, enqueue_job, persist_webhook_event
@@ -125,6 +123,7 @@ async def lifespan(app: FastAPI):
_validate_required_env(settings) _validate_required_env(settings)
_log_startup_identity(settings) _log_startup_identity(settings)
_log_startup_auth_json_status(settings) _log_startup_auth_json_status(settings)
Base.metadata.create_all(bind=get_engine())
stop_event = asyncio.Event() stop_event = asyncio.Event()
task = asyncio.create_task(worker_loop(settings, stop_event)) task = asyncio.create_task(worker_loop(settings, stop_event))
@@ -149,13 +148,6 @@ def _load_repo_review_config_for_pr(gitea: GiteaClient, repo: str, pr_number: in
return parse_repo_review_config_text(cfg_text, configured=True), head_sha return parse_repo_review_config_text(cfg_text, configured=True), head_sha
def _resolve_pr_head_sha(gitea: GiteaClient, repo: str, pr_number: int, fallback: str) -> str:
try:
return gitea.get_pull_request(repo, pr_number).head_sha
except Exception:
return fallback
def _render_landing_page() -> str: def _render_landing_page() -> str:
return """<!doctype html> return """<!doctype html>
<html lang="en"> <html lang="en">
@@ -173,8 +165,6 @@ def _render_landing_page() -> str:
<p class="mt-4 text-base leading-7 text-slate-300">This endpoint powers automated pull request review workflows for Gitea. It validates signed webhook events, queues review jobs, and posts structured feedback back to pull requests.</p> <p class="mt-4 text-base leading-7 text-slate-300">This endpoint powers automated pull request review workflows for Gitea. It validates signed webhook events, queues review jobs, and posts structured feedback back to pull requests.</p>
<div class="mt-8 flex flex-wrap gap-3 text-sm"> <div class="mt-8 flex flex-wrap gap-3 text-sm">
<button id="health-button" type="button" class="rounded-lg border border-slate-700 bg-slate-800/80 px-3 py-2 text-slate-200 transition hover:border-slate-500 hover:bg-slate-700">Health: <code>/healthz</code></button> <button id="health-button" type="button" class="rounded-lg border border-slate-700 bg-slate-800/80 px-3 py-2 text-slate-200 transition hover:border-slate-500 hover:bg-slate-700">Health: <code>/healthz</code></button>
<button id="failure-button" type="button" class="rounded-lg border border-amber-500/40 bg-amber-500/10 px-3 py-2 text-amber-200 transition hover:border-amber-400 hover:bg-amber-500/20">Latest failure: <code>/healthz/latest-failure</code></button>
<button id="job-button" type="button" class="rounded-lg border border-cyan-500/40 bg-cyan-500/10 px-3 py-2 text-cyan-200 transition hover:border-cyan-400 hover:bg-cyan-500/20">Latest job: <code>/healthz/latest-job</code></button>
<span class="rounded-lg border border-slate-700 bg-slate-800/80 px-3 py-2 text-slate-200">Webhook: <code>POST /webhook/gitea</code></span> <span class="rounded-lg border border-slate-700 bg-slate-800/80 px-3 py-2 text-slate-200">Webhook: <code>POST /webhook/gitea</code></span>
</div> </div>
</section> </section>
@@ -190,8 +180,6 @@ def _render_landing_page() -> str:
</div> </div>
<script> <script>
const healthButton = document.getElementById("health-button"); const healthButton = document.getElementById("health-button");
const failureButton = document.getElementById("failure-button");
const jobButton = document.getElementById("job-button");
const healthModal = document.getElementById("health-modal"); const healthModal = document.getElementById("health-modal");
const closeModal = document.getElementById("close-modal"); const closeModal = document.getElementById("close-modal");
const healthResult = document.getElementById("health-result"); const healthResult = document.getElementById("health-result");
@@ -209,57 +197,6 @@ def _render_landing_page() -> str:
} }
} }
async function loadLatestFailure() {
healthResult.textContent = "Loading...";
try {
const response = await fetch("/healthz/latest-failure", { headers: { Accept: "application/json" } });
const payload = await response.json();
if (!payload.has_failed_job) {
healthResult.textContent = "No failed jobs found.";
return;
}
const failedAt = payload.failed_at ? payload.failed_at : "unknown";
const errorText = payload.error ? payload.error : "unknown";
healthResult.textContent =
"Latest failed job #" + payload.job_id +
" | " + payload.repo + "#" + payload.pr_number +
" | command=" + payload.command +
" | commit=" + payload.head_sha.slice(0, 7) +
" | failed_at=" + failedAt +
" | error=" + errorText;
} catch (_error) {
healthResult.textContent = "Could not load latest failure output.";
}
}
async function loadLatestJob() {
healthResult.textContent = "Loading...";
try {
const response = await fetch("/healthz/latest-job", { headers: { Accept: "application/json" } });
const payload = await response.json();
if (!payload.has_job) {
healthResult.textContent = "No jobs found yet.";
return;
}
const startedAt = payload.started_at ? payload.started_at : "not started";
const finishedAt = payload.finished_at ? payload.finished_at : "not finished";
const errorText = payload.error ? payload.error : "none";
const summary = payload.result_summary ? payload.result_summary : "none";
healthResult.textContent =
"Latest job #" + payload.job_id +
" | " + payload.repo + "#" + payload.pr_number +
" | command=" + payload.command +
" | status=" + payload.job_status +
" | commit=" + payload.head_sha.slice(0, 7) +
" | started_at=" + startedAt +
" | finished_at=" + finishedAt +
" | error=" + errorText +
" | summary=" + summary;
} catch (_error) {
healthResult.textContent = "Could not load latest job output.";
}
}
function showModal() { function showModal() {
healthModal.classList.remove("hidden"); healthModal.classList.remove("hidden");
healthModal.classList.add("flex"); healthModal.classList.add("flex");
@@ -274,14 +211,6 @@ def _render_landing_page() -> str:
showModal(); showModal();
await loadHealth(); await loadHealth();
}); });
failureButton.addEventListener("click", async function () {
showModal();
await loadLatestFailure();
});
jobButton.addEventListener("click", async function () {
showModal();
await loadLatestJob();
});
closeModal.addEventListener("click", hideModal); closeModal.addEventListener("click", hideModal);
healthModal.addEventListener("click", function (event) { healthModal.addEventListener("click", function (event) {
@@ -336,54 +265,6 @@ def healthz(settings: Settings = Depends(get_settings)) -> dict[str, str]:
return {"status": "ok"} return {"status": "ok"}
@app.get("/healthz/latest-failure")
def healthz_latest_failure(session: Session = Depends(get_session)) -> dict[str, Any]:
failed_job = session.execute(
select(ReviewJob).where(ReviewJob.status == JobStatus.failed).order_by(ReviewJob.created_at.desc(), ReviewJob.id.desc()).limit(1)
).scalar_one_or_none()
if not failed_job:
return {"status": "ok", "has_failed_job": False}
return {
"status": "ok",
"has_failed_job": True,
"job_id": failed_job.id,
"repo": failed_job.repo,
"pr_number": failed_job.pr_number,
"command": failed_job.command,
"head_sha": failed_job.head_sha,
"error": failed_job.last_error or "",
"failed_at": failed_job.finished_at.isoformat() if failed_job.finished_at else None,
}
@app.get("/healthz/latest-job")
def healthz_latest_job(session: Session = Depends(get_session)) -> dict[str, Any]:
latest_job = session.execute(select(ReviewJob).order_by(ReviewJob.created_at.desc(), ReviewJob.id.desc()).limit(1)).scalar_one_or_none()
if not latest_job:
return {"status": "ok", "has_job": False}
result_summary = ""
if isinstance(latest_job.result_json, dict):
summary = latest_job.result_json.get("summary")
if isinstance(summary, str):
result_summary = summary
return {
"status": "ok",
"has_job": True,
"job_id": latest_job.id,
"repo": latest_job.repo,
"pr_number": latest_job.pr_number,
"command": latest_job.command,
"head_sha": latest_job.head_sha,
"job_status": latest_job.status.value if hasattr(latest_job.status, "value") else str(latest_job.status),
"error": latest_job.last_error or "",
"result_summary": result_summary,
"created_at": latest_job.created_at.isoformat() if latest_job.created_at else None,
"started_at": latest_job.started_at.isoformat() if latest_job.started_at else None,
"finished_at": latest_job.finished_at.isoformat() if latest_job.finished_at else None,
}
@app.post("/webhook/gitea") @app.post("/webhook/gitea")
async def gitea_webhook( async def gitea_webhook(
request: Request, request: Request,
@@ -432,13 +313,6 @@ async def gitea_webhook(
) )
if repo not in settings.allowed_repo_set: if repo not in settings.allowed_repo_set:
logger.info(
"Webhook ignored: repo not in ALLOWED_REPOS repo=%s pr=%s comment_id=%s sender=%s",
repo,
pr_number,
comment_id,
sender_username,
)
return {"accepted": False, "reason": "repo not allowed"} return {"accepted": False, "reason": "repo not allowed"}
inserted = persist_webhook_event( inserted = persist_webhook_event(
@@ -454,7 +328,6 @@ async def gitea_webhook(
gitea = GiteaClient(settings) gitea = GiteaClient(settings)
if parsed_command.name in {"review", "rerun"}: if parsed_command.name in {"review", "rerun"}:
head_sha = _resolve_pr_head_sha(gitea, repo, pr_number, head_sha)
repo_cfg: RepoReviewConfig | None = None repo_cfg: RepoReviewConfig | None = None
try: try:
repo_cfg, resolved_head_sha = _load_repo_review_config_for_pr(gitea, repo, pr_number) repo_cfg, resolved_head_sha = _load_repo_review_config_for_pr(gitea, repo, pr_number)
@@ -462,7 +335,10 @@ async def gitea_webhook(
except Exception: except Exception:
repo_cfg = None repo_cfg = None
if head_sha == "unknown": if head_sha == "unknown":
head_sha = _resolve_pr_head_sha(gitea, repo, pr_number, head_sha) try:
head_sha = gitea.get_pull_request(repo, pr_number).head_sha
except Exception:
pass
if repo_cfg and not repo_cfg.enabled: if repo_cfg and not repo_cfg.enabled:
gitea.post_issue_comment(repo, pr_number, format_disabled_ack()) gitea.post_issue_comment(repo, pr_number, format_disabled_ack())
return {"accepted": True, "reason": "review disabled by repo config"} return {"accepted": True, "reason": "review disabled by repo config"}

View File

@@ -90,16 +90,6 @@ class GiteaClient:
) )
return int(payload["id"]) return int(payload["id"])
def get_issue_comment(self, repo: str, comment_id: int) -> dict[str, Any]:
owner, name = self.split_repo(repo)
encoded_owner = quote(owner, safe="")
encoded_name = quote(name, safe="")
payload = self._request(
"GET",
f"/api/v1/repos/{encoded_owner}/{encoded_name}/issues/comments/{comment_id}",
)
return dict(payload)
def list_issue_comments(self, repo: str, pr_number: int) -> list[dict[str, Any]]: def list_issue_comments(self, repo: str, pr_number: int) -> list[dict[str, Any]]:
owner, name = self.split_repo(repo) owner, name = self.split_repo(repo)
encoded_owner = quote(owner, safe="") encoded_owner = quote(owner, safe="")

View File

@@ -1,6 +1,5 @@
from __future__ import annotations from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from dataclasses import dataclass from dataclasses import dataclass
@@ -12,7 +11,6 @@ from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus, W
from gitea_codex_bot.services.security import payload_digest from gitea_codex_bot.services.security import payload_digest
from gitea_codex_bot.types import ParsedCommand from gitea_codex_bot.types import ParsedCommand
logger = logging.getLogger(__name__)
LEASE_TIMEOUT_ERROR_PREFIX = "Job lease timed out" LEASE_TIMEOUT_ERROR_PREFIX = "Job lease timed out"
@@ -94,16 +92,6 @@ def enqueue_job(
session.add(job) session.add(job)
session.commit() session.commit()
session.refresh(job) session.refresh(job)
logger.info(
"Job enqueued id=%s repo=%s pr=%s command=%s head_sha=%s trigger_comment_id=%s requested_by=%s",
job.id,
job.repo,
job.pr_number,
job.command,
job.head_sha,
job.trigger_comment_id,
job.requested_by,
)
return job return job
@@ -120,15 +108,6 @@ def claim_next_job(session: Session) -> ReviewJob | None:
session.add(run) session.add(run)
session.commit() session.commit()
session.refresh(job) session.refresh(job)
logger.info(
"Job claimed id=%s repo=%s pr=%s command=%s head_sha=%s status=%s",
job.id,
job.repo,
job.pr_number,
job.command,
job.head_sha,
job.status.value if hasattr(job.status, "value") else job.status,
)
return job return job
@@ -232,13 +211,3 @@ def finish_job(
latest_run.error_message = error_message latest_run.error_message = error_message
session.commit() session.commit()
logger.info(
"Job finished id=%s repo=%s pr=%s status=%s run_status=%s skipped=%s error_present=%s",
job.id,
job.repo,
job.pr_number,
job.status.value if hasattr(job.status, "value") else job.status,
run_status.value if hasattr(run_status, "value") else run_status,
skipped,
bool(error_message),
)

View File

@@ -4,13 +4,14 @@ import asyncio
import logging import logging
from typing import Any from typing import Any
import httpx
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from gitea_codex_bot.config import Settings from gitea_codex_bot.config import Settings
from gitea_codex_bot.db import get_session_factory from gitea_codex_bot.db import get_session_factory
from gitea_codex_bot.models import ReviewJob from gitea_codex_bot.models import ReviewJob
from gitea_codex_bot.services.comments import upsert_persistent_review_comment_id from gitea_codex_bot.services.comments import get_persistent_review_comment_id, upsert_persistent_review_comment_id
from gitea_codex_bot.services.gitea import GiteaClient from gitea_codex_bot.services.gitea import GiteaClient
from gitea_codex_bot.services.jobs import claim_next_job, finish_job, recover_stuck_jobs from gitea_codex_bot.services.jobs import claim_next_job, finish_job, recover_stuck_jobs
from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment
@@ -74,16 +75,6 @@ def _handle_non_review_command(
return False, False, None, None return False, False, None, None
def _post_review_failure_comment(gitea: GiteaClient, job: ReviewJob, error_message: str) -> None:
message = (
"⚠️ Codex review run failed after queueing.\n\n"
f"- Commit: `{job.head_sha[:7]}`\n"
f"- Error: `{error_message[:500]}`\n\n"
"Please rerun `@codex rerun` after checking worker logs."
)
gitea.post_issue_comment(job.repo, job.pr_number, message)
def process_one_job(settings: Settings) -> bool: def process_one_job(settings: Settings) -> bool:
session_factory = get_session_factory() session_factory = get_session_factory()
gitea = GiteaClient(settings) gitea = GiteaClient(settings)
@@ -119,27 +110,11 @@ def process_one_job(settings: Settings) -> bool:
return False return False
command = _command_from_job(job) command = _command_from_job(job)
logger.info(
"Processing job id=%s repo=%s pr=%s command=%s args=%s head_sha=%s",
job.id,
job.repo,
job.pr_number,
command.name,
command.arguments,
job.head_sha,
)
with session_factory() as session: with session_factory() as session:
db_job = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one() db_job = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
handled, skipped, result, error = _handle_non_review_command(settings, session, gitea, db_job, command) handled, skipped, result, error = _handle_non_review_command(settings, session, gitea, db_job, command)
if handled: if handled:
logger.info(
"Non-review command handled job id=%s command=%s skipped=%s error_present=%s",
db_job.id,
command.name,
skipped,
bool(error),
)
finish_job(session, job_id=db_job.id, success=error is None, skipped=skipped, result=result, error_message=error) finish_job(session, job_id=db_job.id, success=error is None, skipped=skipped, result=result, error_message=error)
return True return True
@@ -159,15 +134,6 @@ def process_one_job(settings: Settings) -> bool:
) )
return True return True
result, repo_cfg = run_review_ephemeral(settings, repo=job.repo, pr_number=job.pr_number, command=command) result, repo_cfg = run_review_ephemeral(settings, repo=job.repo, pr_number=job.pr_number, command=command)
logger.info(
"Runner returned job id=%s repo=%s pr=%s repo_cfg_enabled=%s repo_cfg_configured=%s result_keys=%s",
job.id,
job.repo,
job.pr_number,
repo_cfg.enabled,
repo_cfg.configured,
sorted(result.keys()),
)
if not repo_cfg.enabled: if not repo_cfg.enabled:
with session_factory() as session: with session_factory() as session:
gitea.post_issue_comment(job.repo, job.pr_number, format_disabled_ack()) gitea.post_issue_comment(job.repo, job.pr_number, format_disabled_ack())
@@ -182,14 +148,22 @@ def process_one_job(settings: Settings) -> bool:
return True return True
comment_body = format_result_comment(job.head_sha, result, repo_configured=repo_cfg.configured) comment_body = format_result_comment(job.head_sha, result, repo_configured=repo_cfg.configured)
with session_factory() as session: with session_factory() as session:
comment_id = gitea.post_issue_comment(job.repo, job.pr_number, comment_body) comment_id = get_persistent_review_comment_id(session, job.repo, job.pr_number)
logger.info( if comment_id:
"Posted review comment job id=%s repo=%s pr=%s comment_id=%s", try:
job.id, gitea.edit_issue_comment(job.repo, comment_id, comment_body)
job.repo, except httpx.HTTPStatusError as exc:
job.pr_number, if exc.response.status_code != 404:
comment_id, raise
) logger.warning(
"Persistent review comment not found; posting a new one repo=%s pr=%s old_comment_id=%s",
job.repo,
job.pr_number,
comment_id,
)
comment_id = gitea.post_issue_comment(job.repo, job.pr_number, comment_body)
else:
comment_id = gitea.post_issue_comment(job.repo, job.pr_number, comment_body)
upsert_persistent_review_comment_id( upsert_persistent_review_comment_id(
session, session,
repo=job.repo, repo=job.repo,
@@ -197,24 +171,11 @@ def process_one_job(settings: Settings) -> bool:
head_sha=job.head_sha, head_sha=job.head_sha,
comment_id=comment_id, comment_id=comment_id,
) )
logger.info(
"Persistent comment mapping upserted job id=%s repo=%s pr=%s comment_id=%s head_sha=%s",
job.id,
job.repo,
job.pr_number,
comment_id,
job.head_sha,
)
finish_job(session, job_id=job.id, success=True, skipped=False, result=result, error_message=None) finish_job(session, job_id=job.id, success=True, skipped=False, result=result, error_message=None)
except Exception as exc: except Exception as exc:
logger.exception("Review job failed id=%s", job.id) logger.exception("Review job failed id=%s", job.id)
error_text = str(exc).strip() or exc.__class__.__name__
try:
_post_review_failure_comment(gitea, job, error_text)
except Exception:
logger.exception("Failed to post review failure comment id=%s", job.id)
with session_factory() as session: with session_factory() as session:
finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message=error_text) finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message=str(exc))
return True return True

View File

@@ -1,6 +1,3 @@
import pytest
from pydantic import ValidationError
from gitea_codex_bot.config import get_settings from gitea_codex_bot.config import get_settings
@@ -14,12 +11,3 @@ def test_codex_auth_defaults_to_api_key_mode() -> None:
settings = get_settings() settings = get_settings()
assert settings.codex_auth_mode == "api_key" assert settings.codex_auth_mode == "api_key"
assert settings.codex_auth_json_path is None assert settings.codex_auth_json_path is None
def test_job_lease_timeout_must_cover_max_review_runtime(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("MAX_REVIEW_MINUTES", "10")
monkeypatch.setenv("JOB_LEASE_TIMEOUT_SECONDS", "300")
get_settings.cache_clear()
with pytest.raises(ValidationError, match="JOB_LEASE_TIMEOUT_SECONDS must be at least"):
get_settings()

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from types import SimpleNamespace from types import SimpleNamespace
import httpx
from sqlalchemy import select from sqlalchemy import select
from gitea_codex_bot.config import get_settings from gitea_codex_bot.config import get_settings
@@ -14,8 +15,7 @@ from gitea_codex_bot.types import ParsedCommand
from gitea_codex_bot.workers.dispatcher import process_one_job from gitea_codex_bot.workers.dispatcher import process_one_job
def test_process_one_job_always_posts_new_review_comment(monkeypatch) -> None: def test_process_one_job_recreates_persistent_comment_when_edit_returns_404(monkeypatch) -> None:
posted_ids: list[int] = []
session_factory = get_session_factory() session_factory = get_session_factory()
with session_factory() as session: with session_factory() as session:
job = enqueue_job( job = enqueue_job(
@@ -39,27 +39,37 @@ def test_process_one_job_always_posts_new_review_comment(monkeypatch) -> None:
monkeypatch.setattr( monkeypatch.setattr(
"gitea_codex_bot.workers.dispatcher.run_review_ephemeral", "gitea_codex_bot.workers.dispatcher.run_review_ephemeral",
lambda *_args, **_kwargs: ( lambda *_args, **_kwargs: (
{"verdict": "has_issues", "confidence": 0.7, "summary": "runner error", "findings": []}, {
"verdict": "has_issues",
"confidence": 0.7,
"summary": "runner error",
"findings": [],
},
RepoReviewConfig(configured=True, enabled=True), RepoReviewConfig(configured=True, enabled=True),
), ),
) )
class _FakeGiteaClient: class _FakeGiteaClient:
def __init__(self, _settings) -> None: def __init__(self, _settings) -> None:
pass self.posted_comment_id = 0
def get_pull_request(self, _repo: str, _pr_number: int): def get_pull_request(self, _repo: str, _pr_number: int):
return SimpleNamespace(is_fork=False) return SimpleNamespace(is_fork=False)
def edit_issue_comment(self, _repo: str, _comment_id: int, _body: str) -> int:
request = httpx.Request("PATCH", "https://gitea.test/api/v1/repos/acme/repo/issues/comments/289")
response = httpx.Response(404, request=request, text='{"message":"not found"}')
raise httpx.HTTPStatusError("not found", request=request, response=response)
def post_issue_comment(self, _repo: str, _pr_number: int, _body: str) -> int: def post_issue_comment(self, _repo: str, _pr_number: int, _body: str) -> int:
new_id = 990 self.posted_comment_id = 990
posted_ids.append(new_id) return self.posted_comment_id
return new_id
monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.GiteaClient", _FakeGiteaClient) monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.GiteaClient", _FakeGiteaClient)
assert process_one_job(get_settings()) is True settings = get_settings()
assert posted_ids == [990] processed = process_one_job(settings)
assert processed is True
with session_factory() as session: with session_factory() as session:
persisted_comment_id = get_persistent_review_comment_id(session, "acme/repo", 9) persisted_comment_id = get_persistent_review_comment_id(session, "acme/repo", 9)
@@ -97,10 +107,15 @@ def test_process_one_job_passes_full_trigger_message_to_runner(monkeypatch) -> N
def post_issue_comment(self, _repo: str, _pr_number: int, _body: str) -> int: def post_issue_comment(self, _repo: str, _pr_number: int, _body: str) -> int:
return 901 return 901
def edit_issue_comment(self, _repo: str, _comment_id: int, _body: str) -> int:
return _comment_id
monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.run_review_ephemeral", _fake_run_review_ephemeral) monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.run_review_ephemeral", _fake_run_review_ephemeral)
monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.GiteaClient", _FakeGiteaClient) monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.GiteaClient", _FakeGiteaClient)
assert process_one_job(get_settings()) is True settings = get_settings()
processed = process_one_job(settings)
assert processed is True
assert captured["raw"] == "@codex review security --full\nFocus auth/session handling." assert captured["raw"] == "@codex review security --full\nFocus auth/session handling."
@@ -140,7 +155,9 @@ def test_process_one_job_skips_review_when_repo_config_disabled(monkeypatch) ->
monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.GiteaClient", _FakeGiteaClient) monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.GiteaClient", _FakeGiteaClient)
assert process_one_job(get_settings()) is True settings = get_settings()
processed = process_one_job(settings)
assert processed is True
assert any("Review is disabled" in body for body in posted_comments) assert any("Review is disabled" in body for body in posted_comments)
with session_factory() as session: with session_factory() as session:

View File

@@ -2,9 +2,7 @@ from __future__ import annotations
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from gitea_codex_bot.db import get_session_factory
from gitea_codex_bot.main import app from gitea_codex_bot.main import app
from gitea_codex_bot.models import JobStatus, ReviewJob
def test_root_returns_tailwind_landing_page() -> None: def test_root_returns_tailwind_landing_page() -> None:
@@ -17,10 +15,8 @@ def test_root_returns_tailwind_landing_page() -> None:
assert "Gitea Codex Review Bot" in response.text assert "Gitea Codex Review Bot" in response.text
assert "cdn.tailwindcss.com" in response.text assert "cdn.tailwindcss.com" in response.text
assert 'id="health-button"' in response.text assert 'id="health-button"' in response.text
assert 'id="failure-button"' in response.text
assert 'id="health-modal"' in response.text assert 'id="health-modal"' in response.text
assert 'fetch("/healthz"' in response.text assert 'fetch("/healthz"' in response.text
assert 'fetch("/healthz/latest-failure"' in response.text
def test_404_returns_tailwind_page_for_browser_requests() -> None: def test_404_returns_tailwind_page_for_browser_requests() -> None:
@@ -42,104 +38,3 @@ def test_404_returns_json_for_non_browser_requests() -> None:
assert response.status_code == 404 assert response.status_code == 404
assert response.headers["content-type"].startswith("application/json") assert response.headers["content-type"].startswith("application/json")
assert response.json() == {"detail": "Not Found"} assert response.json() == {"detail": "Not Found"}
def test_healthz_latest_failure_returns_empty_when_no_failed_jobs() -> None:
client = TestClient(app)
response = client.get("/healthz/latest-failure")
assert response.status_code == 200
assert response.json() == {"status": "ok", "has_failed_job": False}
def test_healthz_latest_failure_returns_latest_failed_job() -> None:
session_factory = get_session_factory()
with session_factory() as session:
first = ReviewJob(
repo="acme/repo",
pr_number=1,
head_sha="1111111",
trigger_comment_id=3001,
trigger_comment_body="@codex review",
command="review",
requested_by="alice",
status=JobStatus.failed,
last_error="first error",
)
second = ReviewJob(
repo="acme/repo",
pr_number=2,
head_sha="2222222",
trigger_comment_id=3002,
trigger_comment_body="@codex rerun",
command="rerun",
requested_by="bob",
status=JobStatus.failed,
last_error="second error",
)
session.add(first)
session.add(second)
session.commit()
client = TestClient(app)
response = client.get("/healthz/latest-failure")
assert response.status_code == 200
payload = response.json()
assert payload["status"] == "ok"
assert payload["has_failed_job"] is True
assert payload["repo"] == "acme/repo"
assert payload["pr_number"] == 2
assert payload["command"] == "rerun"
assert payload["head_sha"] == "2222222"
assert payload["error"] == "second error"
def test_healthz_latest_job_returns_empty_when_no_jobs() -> None:
client = TestClient(app)
response = client.get("/healthz/latest-job")
assert response.status_code == 200
assert response.json() == {"status": "ok", "has_job": False}
def test_healthz_latest_job_returns_latest_job_details() -> None:
session_factory = get_session_factory()
with session_factory() as session:
first = ReviewJob(
repo="acme/repo",
pr_number=3,
head_sha="3333333",
trigger_comment_id=3003,
trigger_comment_body="@codex review",
command="review",
requested_by="alice",
status=JobStatus.succeeded,
result_json={"summary": "first summary"},
)
second = ReviewJob(
repo="acme/repo",
pr_number=4,
head_sha="4444444",
trigger_comment_id=3004,
trigger_comment_body="@codex rerun",
command="rerun",
requested_by="bob",
status=JobStatus.failed,
last_error="failed later",
result_json={"summary": "second summary"},
)
session.add(first)
session.add(second)
session.commit()
client = TestClient(app)
response = client.get("/healthz/latest-job")
assert response.status_code == 200
payload = response.json()
assert payload["status"] == "ok"
assert payload["has_job"] is True
assert payload["repo"] == "acme/repo"
assert payload["pr_number"] == 4
assert payload["command"] == "rerun"
assert payload["head_sha"] == "4444444"
assert payload["job_status"] == "failed"
assert payload["error"] == "failed later"
assert payload["result_summary"] == "second summary"

View File

@@ -93,47 +93,6 @@ def test_webhook_accepts_review_and_queues(monkeypatch) -> None:
assert queued.trigger_comment_body == "@codex review security" assert queued.trigger_comment_body == "@codex review security"
def test_webhook_uses_latest_pr_head_sha_when_config_lookup_fails(monkeypatch) -> None:
posted_comments: list[str] = []
def _post_issue_comment(self, repo: str, pr_number: int, body: str) -> int:
posted_comments.append(body)
return 100
monkeypatch.setattr("gitea_codex_bot.services.gitea.GiteaClient.post_issue_comment", _post_issue_comment)
monkeypatch.setattr(
"gitea_codex_bot.services.gitea.GiteaClient.get_pull_request",
lambda *_args, **_kwargs: type("PR", (), {"head_sha": "newsha123"})(),
)
monkeypatch.setattr(
"gitea_codex_bot.services.gitea.GiteaClient.get_file_content",
lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("config unavailable")),
)
client = TestClient(app)
payload_obj = _payload("@codex review", username="alice", comment_id=112)
payload_obj["pull_request"]["head"]["sha"] = "oldsha999"
raw = json.dumps(payload_obj).encode()
response = client.post(
"/webhook/gitea",
content=raw,
headers={
"X-Gitea-Event": "issue_comment",
"X-Gitea-Delivery": "d-2b",
"X-Gitea-Signature": _sign(raw),
"Content-Type": "application/json",
},
)
assert response.status_code == 200
assert response.json()["status"] == "queued"
assert any("`newsha1`" in body for body in posted_comments)
session_factory = get_session_factory()
with session_factory() as session:
queued = session.execute(select(ReviewJob).where(ReviewJob.trigger_comment_id == 112)).scalar_one()
assert queued.head_sha == "newsha123"
def test_webhook_logs_when_no_codex_review_command(monkeypatch) -> None: def test_webhook_logs_when_no_codex_review_command(monkeypatch) -> None:
messages: list[str] = [] messages: list[str] = []
@@ -188,34 +147,6 @@ def test_webhook_logs_when_codex_command_is_not_review(monkeypatch) -> None:
assert any("Webhook without @codex review command" in item for item in messages) assert any("Webhook without @codex review command" in item for item in messages)
def test_webhook_logs_when_repo_not_allowed(monkeypatch) -> None:
messages: list[str] = []
def _log_info(message: str, *args, **_kwargs) -> None:
messages.append(message % args if args else message)
monkeypatch.setattr("gitea_codex_bot.main.logger.info", _log_info)
client = TestClient(app)
payload_obj = _payload("@codex review", username="alice", comment_id=225)
payload_obj["repository"]["full_name"] = "acme/not-allowed"
raw = json.dumps(payload_obj).encode()
response = client.post(
"/webhook/gitea",
content=raw,
headers={
"X-Gitea-Event": "issue_comment",
"X-Gitea-Delivery": "d-6",
"X-Gitea-Signature": _sign(raw),
"Content-Type": "application/json",
},
)
assert response.status_code == 200
assert response.json()["reason"] == "repo not allowed"
assert any("Webhook ignored: repo not in ALLOWED_REPOS" in item for item in messages)
def test_webhook_rejects_review_when_repo_config_disabled(monkeypatch) -> None: def test_webhook_rejects_review_when_repo_config_disabled(monkeypatch) -> None:
posted_comments: list[str] = [] posted_comments: list[str] = []