Compare commits

..

2 Commits

Author SHA1 Message Date
1fb67b630d Cap stuck-job requeue retries and notify on exhaustion
All checks were successful
ci / test (pull_request) Successful in 29s
ci / publish (pull_request) Has been skipped
2026-05-22 21:25:53 +00:00
dc48df1aab Add stuck running job recovery with lease timeout
Some checks failed
ci / test (pull_request) Failing after 30s
ci / publish (pull_request) Has been skipped
2026-05-22 21:10:11 +00:00
29 changed files with 878 additions and 1727 deletions

View File

@@ -4,8 +4,6 @@ GITEA_BASE_URL=https://gitea.reversed.dev
# Bot account token used to read PRs and write comments.
GITEA_TOKEN=replace
GITEA_BOT_USERNAME=codex-bot
# Optional extra command mentions (comma-separated), e.g. "@review-buddy,helper-bot".
GITEA_BOT_MENTIONS=
# Shared secret configured on the Gitea webhook.
GITEA_WEBHOOK_SECRET=replace
@@ -46,9 +44,15 @@ WORKDIR=/var/lib/gitea-codex/worktrees
MAX_DIFF_BYTES=200000
MAX_REVIEW_MINUTES=10
CONCURRENCY=1
JOB_LEASE_TIMEOUT_SECONDS=300
STUCK_JOB_RECOVERY_ACTION=requeue
MAX_STUCK_JOB_RETRIES=1
# Image used for ephemeral job containers (Node + npm + Codex CLI install).
REVIEW_RUNNER_IMAGE=node:22-bookworm-slim
# Keep false for review-only mode.
ENABLE_FIX_COMMANDS=false
# Security: fork PRs are skipped unless explicitly enabled.
ALLOW_UNTRUSTED_FORKS=false

View File

@@ -78,6 +78,7 @@ Primary implementation lives under `src/gitea_codex_bot`.
- `@codex review [security|performance|tests] [--full]`
- `@codex rerun`
- `@codex explain`
- `@codex fix [--branch ...]` (gated by `ENABLE_FIX_COMMANDS`)
- `@codex ignore`
## Local Development
@@ -119,10 +120,12 @@ Common optional:
- `OPENAI_API_KEY` (required when `CODEX_AUTH_MODE=api_key`)
- `OPENAI_PROJECT_ID`, `OPENAI_ORG_ID`
- `OPENAI_REVIEW_MODEL`
- `OPENAI_REASONING_EFFORT`
- `CODEX_AUTH_MODE` (`api_key` default, `chatgpt` supported)
- `CODEX_AUTH_JSON_PATH` (custom path to `auth.json` for `chatgpt` mode)
- `WORKDIR`, `MAX_DIFF_BYTES`, `MAX_REVIEW_MINUTES`, `CONCURRENCY`
- `REVIEW_RUNNER_IMAGE`
- `ENABLE_FIX_COMMANDS`
- `ALLOW_UNTRUSTED_FORKS`
## Database and Migrations

View File

@@ -11,10 +11,8 @@ COPY pyproject.toml README.md /app/
COPY src /app/src
COPY alembic.ini /app/
COPY alembic /app/alembic
COPY docker/entrypoint.sh /app/docker/entrypoint.sh
RUN pip install --no-cache-dir .
RUN chmod +x /app/docker/entrypoint.sh
EXPOSE 8000
CMD ["/app/docker/entrypoint.sh"]
CMD ["uvicorn", "gitea_codex_bot.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -6,7 +6,7 @@ Webhook-driven PR review bot for Gitea.
- Handles `issue_comment` and `pull_request_comment` events.
- Verifies `X-Gitea-Signature` HMAC (`sha256`).
- Triggers on `@codex ...`, `@<GITEA_BOT_USERNAME> ...`, plus optional custom aliases from `GITEA_BOT_MENTIONS`.
- Triggers on `@codex review`, `@codex rerun`, `@codex explain`, `@codex fix`, `@codex ignore`.
- Ignores bot-authored comments.
- Enforces strict repository allowlist (`ALLOWED_REPOS`).
- Deduplicates webhook deliveries/comments in DB.
@@ -54,7 +54,6 @@ Optional:
- `OPENAI_API_KEY` (required when `CODEX_AUTH_MODE=api_key`, optional when `CODEX_AUTH_MODE=chatgpt`)
- `OPENAI_PROJECT_ID`
- `OPENAI_ORG_ID`
- `GITEA_BOT_MENTIONS` (comma-separated extra mention aliases, e.g. `@review-buddy,helper-bot`)
- `CODEX_AUTH_MODE` (`api_key` default, or `chatgpt`)
- `CODEX_AUTH_JSON_PATH` (custom host path to `auth.json`; defaults to `~/.codex/auth.json` in `chatgpt` mode)
- `DATABASE_URL` (overrides composed DB URL)

18
TODO.md
View File

@@ -3,35 +3,35 @@
## Open Items By Priority
### P0 (Critical)
- [x] `BUG`: True isolated runner flow: clone/fetch/checkout PR branch inside the ephemeral container itself, not on host before prompt generation.
- [x] `BUG`: Remove host-side fallback path for review execution, or gate it behind explicit `ALLOW_HOST_FALLBACK=false` by default so isolation cannot be bypassed silently.
- [ ] `BUG`: True isolated runner flow: clone/fetch/checkout PR branch inside the ephemeral container itself, not on host before prompt generation.
- [ ] `BUG`: Remove host-side fallback path for review execution, or gate it behind explicit `ALLOW_HOST_FALLBACK=false` by default so isolation cannot be bypassed silently.
- [x] `BUG`: Enforce `.codex-review.yml` `enabled=false` at runtime (currently loaded but not enforced).
- [x] `BUG`: Remove fix command support from runtime and command parsing.
- [x] `BUG`: Add stuck-job recovery for `running` jobs (lease timeout + requeue/fail) so one crashed worker does not deadlock the queue.
- [x] `BUG`: Validate required secrets/settings are non-empty at startup (`GITEA_WEBHOOK_SECRET`, `GITEA_TOKEN`, `ALLOWED_REPOS`) and fail fast if blank.
- [x] `BUG`: Remove `.codex-review.yml` fix policy (`commands.allow_fix`) and rely on global `ENABLE_FIX_COMMANDS`.
- [ ] `BUG`: Add stuck-job recovery for `running` jobs (lease timeout + requeue/fail) so one crashed worker does not deadlock the queue.
- [ ] `BUG`: Validate required secrets/settings are non-empty at startup (`GITEA_WEBHOOK_SECRET`, `GITEA_TOKEN`, `ALLOWED_REPOS`) and fail fast if blank.
- [ ] `TEST`: Add integration test proving the runner executes the exact PR head SHA in isolated mode and does not rely on host checkout.
### P1 (Important)
- [x] `BUG`: Log webhook events rejected because repo is not listed in `ALLOWED_REPOS`.
- [ ] `FEATURE`: Full control UI to update the bots settings. Password in env variable protected login page. No more env variables.
- [ ] `FEATURE`: Automatic Trigger on new PRs and or commits on PRs with context that its a change that needs review not the whole PR again. GITEA_ALLOW_PR_AUTO_REVIEW=true would be needed
- [x] `BUG`: Container runner now uses configured `OPENAI_REVIEW_MODEL` and no longer configures reasoning-effort flags.
- [ ] `BUG`: Container runner hardcodes `codex exec --json -m gpt-5`; use `OPENAI_REVIEW_MODEL` and `OPENAI_REASONING_EFFORT` consistently across runner paths.
- [ ] `BUG`: Preserve command arguments losslessly (quoted args are currently flattened by `" ".join(...)` + `.split()` roundtrip).
- [ ] `BUG`: `parse_command` only matches when `@codex` is at the start of the comment; support inline command usage in normal review-discussion comments.
- [ ] `BUG`: Add max comment length handling/chunking before posting to Gitea to avoid failures on large review outputs.
- [ ] `FEATURE`: Add retries/backoff for `codex exec` bootstrap (`npm install -g @openai/codex`) to reduce transient network/setup failures.
- [ ] `FEATURE`: `WEBHOOK_MODE` is currently informational only; add runtime validation/check endpoint that confirms expected webhook scope (`repo` or `global`) is actually configured in Gitea by host admin.
- [ ] `TEST`: Add end-to-end test path against live Gitea + MariaDB + docker runner (webhook -> queue -> runner -> PR comment update).
- [x] `FEATURE`: Add username as possible command prefix, ex. "@bot-name review" in addition to "@codex review", for better UX discoverability.
- [ ] `FEATURE`: Add username as possible command prefix, ex. "@bot-name review" in addition to "@codex review", for better UX discoverability.
### P2 (Nice to Have)
- [x] `FEATURE`: Add a note line at the end of comments to show model tokens used and such.
- [x] `FEATURE`: Little static tailwind cdn styled page for any http endpoint that just shows what this is, incase this gets discovered by some random lad. Other routes than "/" should return a 404 with if a browser accessed it a again, tailwind cdn themed 404 page. Both should be nicely designed and minimalistic.
- [x] `FEATURE`: Apply `.codex-review.yml` `review.default_mode` when `@codex review` is issued without explicit mode.
- [ ] `FEATURE`: Add per-repo command policy in `.codex-review.yml` for enabling/disabling `review`, `explain`, and `rerun` independently.
- [ ] `FEATURE`: Add per-repo command policy in `.codex-review.yml` for enabling/disabling `review`, `fix`, `explain`, and `rerun` independently.
- [ ] `TEST`: Add structured log redaction tests to ensure PAT/keys never appear in logs/comments.
### P3 (Backlog)
- [ ] `FEATURE`: Add queue metrics and traces (queued/running age, success/failure counters, fallback usage) for operations visibility.
- [ ] `FEATURE`: Add superseded-job cancellation for same PR/head to avoid running obsolete queued jobs.
- [ ] `FEATURE`: Add `@codex status` command: "whoami", "permissions", "repo config status"
- [ ] `TEST`: Add property/fuzz tests for command parsing and webhook payload edge cases.

View File

@@ -1,44 +0,0 @@
#!/bin/sh
set -eu
echo "Checking migration baseline..."
python - <<'PY'
from sqlalchemy import create_engine, inspect, text
from gitea_codex_bot.config import get_settings
settings = get_settings()
engine = create_engine(settings.sqlalchemy_url)
with engine.connect() as conn:
inspector = inspect(conn)
tables = set(inspector.get_table_names())
has_alembic_version = "alembic_version" in tables
has_review_jobs = "review_jobs" in tables
has_webhook_events = "webhook_events" in tables
stamped_revision = None
if has_alembic_version:
row = conn.execute(text("SELECT version_num FROM alembic_version LIMIT 1")).fetchone()
if row and row[0]:
stamped_revision = row[0]
if (not has_alembic_version or not stamped_revision) and (has_review_jobs or has_webhook_events):
revision = "0001_initial"
if has_review_jobs:
columns = {c["name"] for c in inspector.get_columns("review_jobs")}
if "trigger_comment_body" in columns:
revision = "0002_trigger_comment_body"
conn.execute(text("CREATE TABLE IF NOT EXISTS alembic_version (version_num VARCHAR(32) NOT NULL, PRIMARY KEY (version_num))"))
conn.execute(text("DELETE FROM alembic_version"))
conn.execute(text("INSERT INTO alembic_version (version_num) VALUES (:revision)"), {"revision": revision})
conn.commit()
print(f"Stamped legacy database at revision {revision}")
PY
echo "Running database migrations..."
alembic upgrade head
echo "Starting API server..."
exec uvicorn gitea_codex_bot.main:app --host 0.0.0.0 --port 8000

View File

@@ -13,13 +13,13 @@ class Settings(BaseSettings):
gitea_base_url: str = Field(alias="GITEA_BASE_URL")
gitea_token: SecretStr = Field(alias="GITEA_TOKEN")
gitea_bot_username: str = Field(alias="GITEA_BOT_USERNAME")
gitea_bot_mentions: str = Field(default="", alias="GITEA_BOT_MENTIONS")
gitea_webhook_secret: SecretStr = Field(alias="GITEA_WEBHOOK_SECRET")
openai_api_key: SecretStr | None = Field(default=None, alias="OPENAI_API_KEY")
openai_project_id: str | None = Field(default=None, alias="OPENAI_PROJECT_ID")
openai_org_id: str | None = Field(default=None, alias="OPENAI_ORG_ID")
openai_review_model: str = Field(default="gpt-5.3-codex", alias="OPENAI_REVIEW_MODEL")
openai_reasoning_effort: Literal["none", "low", "medium", "high"] = Field(default="high", alias="OPENAI_REASONING_EFFORT")
codex_auth_mode: Literal["api_key", "chatgpt"] = Field(default="api_key", alias="CODEX_AUTH_MODE")
codex_auth_json_path: str | None = Field(default=None, alias="CODEX_AUTH_JSON_PATH")
@@ -38,8 +38,12 @@ class Settings(BaseSettings):
max_diff_bytes: int = Field(default=200000, alias="MAX_DIFF_BYTES")
max_review_minutes: int = Field(default=10, alias="MAX_REVIEW_MINUTES")
concurrency: int = Field(default=1, alias="CONCURRENCY")
job_lease_timeout_seconds: int = Field(default=300, alias="JOB_LEASE_TIMEOUT_SECONDS")
stuck_job_recovery_action: Literal["requeue", "fail"] = Field(default="requeue", alias="STUCK_JOB_RECOVERY_ACTION")
max_stuck_job_retries: int = Field(default=1, alias="MAX_STUCK_JOB_RETRIES")
review_runner_image: str = Field(default="node:22-bookworm-slim", alias="REVIEW_RUNNER_IMAGE")
enable_fix_commands: bool = Field(default=False, alias="ENABLE_FIX_COMMANDS")
allow_untrusted_forks: bool = Field(default=False, alias="ALLOW_UNTRUSTED_FORKS")
@field_validator("gitea_base_url")
@@ -59,13 +63,6 @@ class Settings(BaseSettings):
values = [item.strip() for item in self.allowed_repos.split(",")]
return {value for value in values if value}
@property
def bot_command_aliases(self) -> set[str]:
configured = [item.strip().lstrip("@").lower() for item in self.gitea_bot_mentions.split(",")]
aliases = {"codex", self.gitea_bot_username.strip().lstrip("@").lower()}
aliases.update(alias for alias in configured if alias)
return aliases
@lru_cache(maxsize=1)
def get_settings() -> Settings:

View File

@@ -11,13 +11,11 @@ from fastapi import Depends, FastAPI, Header, HTTPException, Request, status
from fastapi.exception_handlers import http_exception_handler
from fastapi.responses import HTMLResponse
from starlette.exceptions import HTTPException as StarletteHTTPException
from sqlalchemy import select
from sqlalchemy.orm import Session
from gitea_codex_bot.config import Settings, get_settings
from gitea_codex_bot.db import get_session
from gitea_codex_bot.models import JobStatus, ReviewJob
from gitea_codex_bot.services.commands import detect_prefixed_command, parse_command
from gitea_codex_bot.db import Base, get_engine, get_session
from gitea_codex_bot.services.commands import parse_command
from gitea_codex_bot.services.gitea import GiteaClient
from gitea_codex_bot.services.jobs import cooldown_remaining_seconds, enqueue_job, persist_webhook_event
from gitea_codex_bot.services.repo_config import RepoReviewConfig, parse_repo_review_config_text
@@ -35,17 +33,6 @@ logger = logging.getLogger(__name__)
def _validate_required_env(settings: Settings) -> None:
webhook_secret = settings.gitea_webhook_secret.get_secret_value()
if not webhook_secret.strip():
raise RuntimeError("GITEA_WEBHOOK_SECRET is required")
gitea_token = settings.gitea_token.get_secret_value()
if not gitea_token.strip():
raise RuntimeError("GITEA_TOKEN is required")
if not settings.allowed_repos.strip():
raise RuntimeError("ALLOWED_REPOS is required")
if settings.codex_auth_mode != "api_key":
return
api_key = settings.openai_api_key.get_secret_value() if settings.openai_api_key else ""
@@ -136,6 +123,7 @@ async def lifespan(app: FastAPI):
_validate_required_env(settings)
_log_startup_identity(settings)
_log_startup_auth_json_status(settings)
Base.metadata.create_all(bind=get_engine())
stop_event = asyncio.Event()
task = asyncio.create_task(worker_loop(settings, stop_event))
@@ -160,13 +148,6 @@ def _load_repo_review_config_for_pr(gitea: GiteaClient, repo: str, pr_number: in
return parse_repo_review_config_text(cfg_text, configured=True), head_sha
def _resolve_pr_head_sha(gitea: GiteaClient, repo: str, pr_number: int, fallback: str) -> str:
try:
return gitea.get_pull_request(repo, pr_number).head_sha
except Exception:
return fallback
def _render_landing_page() -> str:
return """<!doctype html>
<html lang="en">
@@ -184,8 +165,6 @@ def _render_landing_page() -> str:
<p class="mt-4 text-base leading-7 text-slate-300">This endpoint powers automated pull request review workflows for Gitea. It validates signed webhook events, queues review jobs, and posts structured feedback back to pull requests.</p>
<div class="mt-8 flex flex-wrap gap-3 text-sm">
<button id="health-button" type="button" class="rounded-lg border border-slate-700 bg-slate-800/80 px-3 py-2 text-slate-200 transition hover:border-slate-500 hover:bg-slate-700">Health: <code>/healthz</code></button>
<button id="failure-button" type="button" class="rounded-lg border border-amber-500/40 bg-amber-500/10 px-3 py-2 text-amber-200 transition hover:border-amber-400 hover:bg-amber-500/20">Latest failure: <code>/healthz/latest-failure</code></button>
<button id="job-button" type="button" class="rounded-lg border border-cyan-500/40 bg-cyan-500/10 px-3 py-2 text-cyan-200 transition hover:border-cyan-400 hover:bg-cyan-500/20">Latest job: <code>/healthz/latest-job</code></button>
<span class="rounded-lg border border-slate-700 bg-slate-800/80 px-3 py-2 text-slate-200">Webhook: <code>POST /webhook/gitea</code></span>
</div>
</section>
@@ -201,8 +180,6 @@ def _render_landing_page() -> str:
</div>
<script>
const healthButton = document.getElementById("health-button");
const failureButton = document.getElementById("failure-button");
const jobButton = document.getElementById("job-button");
const healthModal = document.getElementById("health-modal");
const closeModal = document.getElementById("close-modal");
const healthResult = document.getElementById("health-result");
@@ -220,57 +197,6 @@ def _render_landing_page() -> str:
}
}
async function loadLatestFailure() {
healthResult.textContent = "Loading...";
try {
const response = await fetch("/healthz/latest-failure", { headers: { Accept: "application/json" } });
const payload = await response.json();
if (!payload.has_failed_job) {
healthResult.textContent = "No failed jobs found.";
return;
}
const failedAt = payload.failed_at ? payload.failed_at : "unknown";
const errorText = payload.error ? payload.error : "unknown";
healthResult.textContent =
"Latest failed job #" + payload.job_id +
" | " + payload.repo + "#" + payload.pr_number +
" | command=" + payload.command +
" | commit=" + payload.head_sha.slice(0, 7) +
" | failed_at=" + failedAt +
" | error=" + errorText;
} catch (_error) {
healthResult.textContent = "Could not load latest failure output.";
}
}
async function loadLatestJob() {
healthResult.textContent = "Loading...";
try {
const response = await fetch("/healthz/latest-job", { headers: { Accept: "application/json" } });
const payload = await response.json();
if (!payload.has_job) {
healthResult.textContent = "No jobs found yet.";
return;
}
const startedAt = payload.started_at ? payload.started_at : "not started";
const finishedAt = payload.finished_at ? payload.finished_at : "not finished";
const errorText = payload.error ? payload.error : "none";
const summary = payload.result_summary ? payload.result_summary : "none";
healthResult.textContent =
"Latest job #" + payload.job_id +
" | " + payload.repo + "#" + payload.pr_number +
" | command=" + payload.command +
" | status=" + payload.job_status +
" | commit=" + payload.head_sha.slice(0, 7) +
" | started_at=" + startedAt +
" | finished_at=" + finishedAt +
" | error=" + errorText +
" | summary=" + summary;
} catch (_error) {
healthResult.textContent = "Could not load latest job output.";
}
}
function showModal() {
healthModal.classList.remove("hidden");
healthModal.classList.add("flex");
@@ -285,14 +211,6 @@ def _render_landing_page() -> str:
showModal();
await loadHealth();
});
failureButton.addEventListener("click", async function () {
showModal();
await loadLatestFailure();
});
jobButton.addEventListener("click", async function () {
showModal();
await loadLatestJob();
});
closeModal.addEventListener("click", hideModal);
healthModal.addEventListener("click", function (event) {
@@ -347,54 +265,6 @@ def healthz(settings: Settings = Depends(get_settings)) -> dict[str, str]:
return {"status": "ok"}
@app.get("/healthz/latest-failure")
def healthz_latest_failure(session: Session = Depends(get_session)) -> dict[str, Any]:
failed_job = session.execute(
select(ReviewJob).where(ReviewJob.status == JobStatus.failed).order_by(ReviewJob.created_at.desc(), ReviewJob.id.desc()).limit(1)
).scalar_one_or_none()
if not failed_job:
return {"status": "ok", "has_failed_job": False}
return {
"status": "ok",
"has_failed_job": True,
"job_id": failed_job.id,
"repo": failed_job.repo,
"pr_number": failed_job.pr_number,
"command": failed_job.command,
"head_sha": failed_job.head_sha,
"error": failed_job.last_error or "",
"failed_at": failed_job.finished_at.isoformat() if failed_job.finished_at else None,
}
@app.get("/healthz/latest-job")
def healthz_latest_job(session: Session = Depends(get_session)) -> dict[str, Any]:
latest_job = session.execute(select(ReviewJob).order_by(ReviewJob.created_at.desc(), ReviewJob.id.desc()).limit(1)).scalar_one_or_none()
if not latest_job:
return {"status": "ok", "has_job": False}
result_summary = ""
if isinstance(latest_job.result_json, dict):
summary = latest_job.result_json.get("summary")
if isinstance(summary, str):
result_summary = summary
return {
"status": "ok",
"has_job": True,
"job_id": latest_job.id,
"repo": latest_job.repo,
"pr_number": latest_job.pr_number,
"command": latest_job.command,
"head_sha": latest_job.head_sha,
"job_status": latest_job.status.value if hasattr(latest_job.status, "value") else str(latest_job.status),
"error": latest_job.last_error or "",
"result_summary": result_summary,
"created_at": latest_job.created_at.isoformat() if latest_job.created_at else None,
"started_at": latest_job.started_at.isoformat() if latest_job.started_at else None,
"finished_at": latest_job.finished_at.isoformat() if latest_job.finished_at else None,
}
@app.post("/webhook/gitea")
async def gitea_webhook(
request: Request,
@@ -421,31 +291,9 @@ async def gitea_webhook(
if sender_username == settings.gitea_bot_username:
return {"accepted": False, "reason": "bot comment ignored"}
if repo not in settings.allowed_repo_set:
logger.info(
"Webhook ignored: repo not in ALLOWED_REPOS repo=%s pr=%s comment_id=%s sender=%s",
repo,
pr_number,
comment_id,
sender_username,
)
return {"accepted": False, "reason": "repo not allowed"}
comment_body = str(payload.get("comment", {}).get("body", "")).strip()
parsed_command = parse_command(comment_body, aliases=settings.bot_command_aliases)
parsed_command = parse_command(comment_body)
if not parsed_command:
attempted_command = detect_prefixed_command(comment_body, aliases=settings.bot_command_aliases)
if attempted_command:
gitea = GiteaClient(settings)
if attempted_command == "fix":
gitea.post_issue_comment(repo, pr_number, "⚠️ `@codex fix` is no longer supported on this bot.")
return {"accepted": False, "reason": "unsupported command", "command": attempted_command}
gitea.post_issue_comment(
repo,
pr_number,
f"⚠️ Command `@codex {attempted_command}` is not supported. Try `@codex -h`.",
)
return {"accepted": False, "reason": "unsupported command", "command": attempted_command}
logger.info(
"Webhook ignored: no @codex review command repo=%s pr=%s comment_id=%s sender=%s",
repo,
@@ -464,6 +312,9 @@ async def gitea_webhook(
parsed_command.name,
)
if repo not in settings.allowed_repo_set:
return {"accepted": False, "reason": "repo not allowed"}
inserted = persist_webhook_event(
session,
delivery_id=x_gitea_delivery,
@@ -477,7 +328,6 @@ async def gitea_webhook(
gitea = GiteaClient(settings)
if parsed_command.name in {"review", "rerun"}:
head_sha = _resolve_pr_head_sha(gitea, repo, pr_number, head_sha)
repo_cfg: RepoReviewConfig | None = None
try:
repo_cfg, resolved_head_sha = _load_repo_review_config_for_pr(gitea, repo, pr_number)
@@ -485,7 +335,10 @@ async def gitea_webhook(
except Exception:
repo_cfg = None
if head_sha == "unknown":
head_sha = _resolve_pr_head_sha(gitea, repo, pr_number, head_sha)
try:
head_sha = gitea.get_pull_request(repo, pr_number).head_sha
except Exception:
pass
if repo_cfg and not repo_cfg.enabled:
gitea.post_issue_comment(repo, pr_number, format_disabled_ack())
return {"accepted": True, "reason": "review disabled by repo config"}
@@ -507,7 +360,7 @@ async def gitea_webhook(
gitea.post_issue_comment(repo, pr_number, format_queue_ack(head_sha))
return {"accepted": True, "job_id": job.id, "status": "queued"}
if parsed_command.name in {"explain", "ignore", "help"}:
if parsed_command.name in {"fix", "explain", "ignore"}:
job = enqueue_job(
session,
repo=repo,

View File

@@ -1,51 +1,19 @@
from __future__ import annotations
import re
from collections.abc import Iterable
from gitea_codex_bot.types import ParsedCommand
PREFIX_RE = re.compile(r"^@([^\s]+)\s+(.+)$", re.IGNORECASE | re.DOTALL)
HELP_ALIASES = {"-h", "--help", "help"}
SUPPORTED_COMMANDS = {"review", "explain", "ignore", "rerun"}
COMMAND_RE = re.compile(r"^@codex\s+(review|explain|fix|ignore|rerun)\b(.*)$", re.IGNORECASE | re.DOTALL)
def detect_prefixed_command(body: str, aliases: Iterable[str] | None = None) -> str | None:
def parse_command(body: str) -> ParsedCommand | None:
stripped = body.strip()
match = PREFIX_RE.match(stripped)
match = COMMAND_RE.match(stripped)
if not match:
return None
command_alias = match.group(1).lstrip("@").lower()
allowed_aliases = {alias.lstrip("@").lower() for alias in (aliases or {"codex"})}
if command_alias not in allowed_aliases:
return None
remainder = match.group(2).strip()
if not remainder:
return None
return remainder.split(maxsplit=1)[0].lower()
def parse_command(body: str, aliases: Iterable[str] | None = None) -> ParsedCommand | None:
stripped = body.strip()
match = PREFIX_RE.match(stripped)
if not match:
return None
command_alias = match.group(1).lstrip("@").lower()
allowed_aliases = {alias.lstrip("@").lower() for alias in (aliases or {"codex"})}
if command_alias not in allowed_aliases:
return None
remainder = match.group(2).strip()
if not remainder:
return None
parts = remainder.split(maxsplit=1)
raw_name = parts[0].lower()
rest = parts[1].strip() if len(parts) > 1 else ""
if raw_name in HELP_ALIASES:
return ParsedCommand(name="help", raw=stripped, arguments=[token for token in rest.split() if token])
if raw_name not in SUPPORTED_COMMANDS:
return None
name = raw_name
name = match.group(1).lower()
rest = match.group(2).strip()
tokens = [token for token in rest.split() if token]
parsed = ParsedCommand(name=name, raw=stripped, arguments=tokens)
@@ -59,4 +27,6 @@ def parse_command(body: str, aliases: Iterable[str] | None = None) -> ParsedComm
parsed.mode = mode
parsed.mode_explicit = True
break
elif name == "fix":
parsed.branch_fix = "--branch" in tokens
return parsed

View File

@@ -21,8 +21,6 @@ class PullRequestContext:
clone_url: str
html_url: str
is_fork: bool
base_clone_url: str | None = None
head_clone_url: str | None = None
class GiteaClient:
@@ -58,8 +56,6 @@ class GiteaClient:
encoded_owner = quote(owner, safe="")
encoded_name = quote(name, safe="")
payload = self._request("GET", f"/api/v1/repos/{encoded_owner}/{encoded_name}/pulls/{pr_number}")
base_clone_url = payload["base"]["repo"]["clone_url"]
head_clone_url = payload["head"]["repo"]["clone_url"]
return PullRequestContext(
repo=repo,
pr_number=pr_number,
@@ -67,9 +63,7 @@ class GiteaClient:
base_sha=payload["base"]["sha"],
head_ref=payload["head"]["ref"],
head_sha=payload["head"]["sha"],
clone_url=head_clone_url,
base_clone_url=base_clone_url,
head_clone_url=head_clone_url,
clone_url=payload["head"]["repo"]["clone_url"],
html_url=payload["html_url"],
is_fork=bool(payload["head"]["repo"]["full_name"] != payload["base"]["repo"]["full_name"]),
)
@@ -96,16 +90,6 @@ class GiteaClient:
)
return int(payload["id"])
def get_issue_comment(self, repo: str, comment_id: int) -> dict[str, Any]:
owner, name = self.split_repo(repo)
encoded_owner = quote(owner, safe="")
encoded_name = quote(name, safe="")
payload = self._request(
"GET",
f"/api/v1/repos/{encoded_owner}/{encoded_name}/issues/comments/{comment_id}",
)
return dict(payload)
def list_issue_comments(self, repo: str, pr_number: int) -> list[dict[str, Any]]:
owner, name = self.split_repo(repo)
encoded_owner = quote(owner, safe="")

View File

@@ -1,9 +1,9 @@
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from dataclasses import dataclass
from sqlalchemy import func, select
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
@@ -11,7 +11,17 @@ from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus, W
from gitea_codex_bot.services.security import payload_digest
from gitea_codex_bot.types import ParsedCommand
logger = logging.getLogger(__name__)
LEASE_TIMEOUT_ERROR_PREFIX = "Job lease timed out"
@dataclass
class RecoveryOutcome:
repo: str
pr_number: int
job_id: int
retries_used: int
failed: bool
message: str
def persist_webhook_event(
@@ -82,21 +92,10 @@ def enqueue_job(
session.add(job)
session.commit()
session.refresh(job)
logger.info(
"Job enqueued id=%s repo=%s pr=%s command=%s head_sha=%s trigger_comment_id=%s requested_by=%s",
job.id,
job.repo,
job.pr_number,
job.command,
job.head_sha,
job.trigger_comment_id,
job.requested_by,
)
return job
def claim_next_job(session: Session) -> ReviewJob | None:
recover_stuck_running_jobs(session, lease_timeout_seconds=300, max_retries=2)
job = session.execute(
select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True)
).scalar_one_or_none()
@@ -109,39 +108,41 @@ def claim_next_job(session: Session) -> ReviewJob | None:
session.add(run)
session.commit()
session.refresh(job)
logger.info(
"Job claimed id=%s repo=%s pr=%s command=%s head_sha=%s status=%s",
job.id,
job.repo,
job.pr_number,
job.command,
job.head_sha,
job.status.value if hasattr(job.status, "value") else job.status,
)
return job
def recover_stuck_running_jobs(session: Session, *, lease_timeout_seconds: int, max_retries: int) -> int:
def recover_stuck_jobs(session: Session, *, lease_timeout_seconds: int, action: str, max_retries: int) -> list[RecoveryOutcome]:
if lease_timeout_seconds <= 0:
return []
now = datetime.now(timezone.utc)
lease_cutoff = now - timedelta(seconds=lease_timeout_seconds)
stale_running_jobs = session.execute(
cutoff = now - timedelta(seconds=lease_timeout_seconds)
stale_jobs = session.execute(
select(ReviewJob)
.where(
ReviewJob.status == JobStatus.running,
ReviewJob.started_at.is_not(None),
ReviewJob.started_at <= lease_cutoff,
ReviewJob.started_at < cutoff,
)
.order_by(ReviewJob.started_at.asc())
.with_for_update(skip_locked=True)
).scalars().all()
if not stale_running_jobs:
return 0
recovered = 0
for job in stale_running_jobs:
attempt_count = _count_job_attempts(session, job.id)
timeout_error = (
f"Job lease timed out after {lease_timeout_seconds}s on attempt {attempt_count}. "
"Recovered by queue watchdog."
).scalars()
outcomes: list[RecoveryOutcome] = []
for job in stale_jobs:
prior_retries = session.execute(
select(ReviewRun)
.where(
ReviewRun.job_id == job.id,
ReviewRun.status == RunStatus.failed,
ReviewRun.error_message.is_not(None),
)
.order_by(ReviewRun.id.asc())
).scalars()
lease_retries_used = sum(1 for run in prior_retries if (run.error_message or "").startswith(LEASE_TIMEOUT_ERROR_PREFIX))
retries_used_after_this_timeout = lease_retries_used + 1
should_fail = action == "fail" or lease_retries_used >= max_retries
message = (
f"{LEASE_TIMEOUT_ERROR_PREFIX} after {lease_timeout_seconds}s while in running state; "
f"retries_used={retries_used_after_this_timeout}, max_retries={max_retries}."
)
latest_run = (
session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none()
@@ -149,36 +150,27 @@ def recover_stuck_running_jobs(session: Session, *, lease_timeout_seconds: int,
if latest_run and latest_run.status == RunStatus.running:
latest_run.status = RunStatus.failed
latest_run.finished_at = now
latest_run.error_message = timeout_error
retries_used = max(attempt_count - 1, 0)
if retries_used < max_retries:
latest_run.error_message = message
job.last_error = message
if should_fail:
job.status = JobStatus.failed
job.finished_at = now
else:
job.status = JobStatus.queued
job.started_at = None
job.finished_at = None
job.last_error = timeout_error
logger.warning(
"Recovered timed-out running job id=%s by requeueing attempt=%s retries_used=%s/%s",
job.id,
attempt_count,
retries_used,
max_retries,
outcomes.append(
RecoveryOutcome(
repo=job.repo,
pr_number=job.pr_number,
job_id=job.id,
retries_used=retries_used_after_this_timeout,
failed=should_fail,
message=message,
)
else:
job.status = JobStatus.failed
job.finished_at = now
job.last_error = timeout_error
logger.error(
"Recovered timed-out running job id=%s by failing permanently attempt=%s retries_used=%s/%s",
job.id,
attempt_count,
retries_used,
max_retries,
)
recovered += 1
)
session.commit()
return recovered
return outcomes
def finish_job(
@@ -203,20 +195,11 @@ def finish_job(
job.status = JobStatus.succeeded
run_status = RunStatus.succeeded
else:
attempt_count = _count_job_attempts(session, job_id)
retries_used = max(attempt_count - 1, 0)
if retries_used < 2:
job.status = JobStatus.queued
else:
job.status = JobStatus.failed
job.status = JobStatus.failed
run_status = RunStatus.failed
now = datetime.now(timezone.utc)
if job.status == JobStatus.queued:
job.started_at = None
job.finished_at = None
else:
job.finished_at = now
job.finished_at = now
job.last_error = error_message
if result is not None:
job.result_json = result
@@ -228,18 +211,3 @@ def finish_job(
latest_run.error_message = error_message
session.commit()
logger.info(
"Job finished id=%s repo=%s pr=%s status=%s run_status=%s skipped=%s error_present=%s",
job.id,
job.repo,
job.pr_number,
job.status.value if hasattr(job.status, "value") else job.status,
run_status.value if hasattr(run_status, "value") else run_status,
skipped,
bool(error_message),
)
def _count_job_attempts(session: Session, job_id: int) -> int:
attempts = session.execute(select(func.count(ReviewRun.id)).where(ReviewRun.job_id == job_id)).scalar_one()
return int(attempts or 0)

View File

@@ -10,9 +10,9 @@ import yaml
class RepoReviewConfig:
configured: bool = True
enabled: bool = True
default_mode: str = "full"
default_mode: str = "summary"
max_diff_bytes: int = 200000
include_tests: bool = False
include_tests: bool = True
focus: list[str] = field(default_factory=lambda: ["correctness", "security", "maintainability"])
ignore: list[str] = field(default_factory=list)
@@ -27,13 +27,13 @@ def load_repo_review_config(repo_root: Path) -> RepoReviewConfig:
def parse_repo_review_config_text(text: str, *, configured: bool) -> RepoReviewConfig:
raw = yaml.safe_load(text) or {}
review = raw.get("review", {}) or {}
default_mode = str(review.get("default_mode", "full")).strip().lower() or "full"
default_mode = str(review.get("default_mode", "summary")).strip().lower() or "summary"
return RepoReviewConfig(
configured=configured,
enabled=bool(raw.get("enabled", True)),
default_mode=default_mode,
max_diff_bytes=int(review.get("max_diff_bytes", 200000)),
include_tests=bool(review.get("include_tests", False)),
include_tests=bool(review.get("include_tests", True)),
focus=list(review.get("focus", ["correctness", "security", "maintainability"])),
ignore=list(raw.get("ignore", [])),
)

View File

@@ -39,13 +39,8 @@ def format_result_comment(head_sha: str, result: dict, *, repo_configured: bool
markdown_comment = result.get("markdown_comment")
if isinstance(markdown_comment, str) and markdown_comment.strip():
body = markdown_comment.strip()
details = _format_structured_details(result)
if details:
body = f"{body}\n\n---\n\n{details}"
if usage_note:
body = f"{body}\n\n{usage_note}"
if missing_config_note:
body = f"{body}\n\n{missing_config_note}"
return _inject_head_sha_marker(head_sha, body)
verdict = result.get("verdict", "has_issues")
@@ -110,42 +105,4 @@ def _format_usage_note(result: dict) -> str:
def _format_missing_config_note(repo_configured: bool) -> str:
if repo_configured:
return ""
return "> .codex-review.yml is not configured"
def _format_structured_details(result: dict) -> str:
verdict = str(result.get("verdict", "has_issues"))
summary = str(result.get("summary", "No summary returned."))
confidence_raw = result.get("confidence", 0.0)
try:
confidence = float(confidence_raw)
except (TypeError, ValueError):
confidence = 0.0
findings = result.get("findings", []) or []
lines = ["### Structured Findings", "", f"Verdict: `{verdict}`", f"Confidence: `{confidence:.2f}`", "", summary, ""]
if not findings:
lines.append("No blocking issues found.")
return "\n".join(lines).strip()
lines.append("Findings:")
for idx, finding in enumerate(findings, start=1):
if not isinstance(finding, dict):
lines.extend([f"{idx}. `unknown` (unknown)", " Issue", f" {finding}", " Suggestion: n/a"])
continue
severity = finding.get("severity", "unknown")
file_path = finding.get("file", "unknown")
line_start = finding.get("line_start", "?")
line_end = finding.get("line_end", line_start)
title = finding.get("title", "Issue")
body = finding.get("body", "")
suggestion = finding.get("suggestion", "")
lines.extend(
[
f"{idx}. `{file_path}:{line_start}-{line_end}` ({severity})",
f" {title}",
f" {body}",
f" Suggestion: {suggestion}" if suggestion else " Suggestion: n/a",
]
)
return "\n".join(lines).strip()
return ".codex-review.yml is not configured"

View File

@@ -1,12 +1,303 @@
from __future__ import annotations
import json
import os
import shlex
import subprocess
from fnmatch import fnmatch
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
import httpx
from gitea_codex_bot.config import Settings
from gitea_codex_bot.services.gitea import GiteaClient, PullRequestContext
from gitea_codex_bot.services.repo_config import RepoReviewConfig, load_repo_review_config
from gitea_codex_bot.types import ParsedCommand
class ReviewError(RuntimeError):
pass
def _run_git(args: list[str], cwd: Path | None = None) -> str:
completed = subprocess.run(["git", *args], cwd=cwd, check=True, capture_output=True, text=True)
return completed.stdout
def checkout_pr(tmpdir: Path, pr: PullRequestContext) -> Path:
repo_dir = tmpdir / "repo"
_run_git(["clone", "--no-tags", "--depth", "50", pr.clone_url, str(repo_dir)])
_run_git(["fetch", "origin", pr.base_ref, pr.head_ref], cwd=repo_dir)
_run_git(["checkout", pr.head_sha], cwd=repo_dir)
return repo_dir
def collect_diff_context(repo_dir: Path, pr: PullRequestContext, max_diff_bytes: int) -> dict[str, Any]:
diff = _run_git(["diff", f"{pr.base_sha}...{pr.head_sha}"], cwd=repo_dir)
changed_files_raw = _run_git(["diff", "--name-only", f"{pr.base_sha}...{pr.head_sha}"], cwd=repo_dir)
changed_files = [line.strip() for line in changed_files_raw.splitlines() if line.strip()]
truncated = False
if len(diff.encode("utf-8")) > max_diff_bytes:
diff = diff.encode("utf-8")[:max_diff_bytes].decode("utf-8", errors="ignore")
truncated = True
return {"diff": diff, "changed_files": changed_files, "truncated": truncated}
def _apply_ignore_patterns(changed_files: list[str], ignore_patterns: list[str]) -> list[str]:
if not ignore_patterns:
return changed_files
kept: list[str] = []
for path in changed_files:
if any(fnmatch(path, pattern) for pattern in ignore_patterns):
continue
kept.append(path)
return kept
def _collect_changed_file_contents(repo_dir: Path, changed_files: list[str], max_total_bytes: int) -> str:
chunks: list[str] = []
total = 0
for rel in changed_files:
path = repo_dir / rel
if not path.exists() or not path.is_file():
continue
try:
content = path.read_text(encoding="utf-8", errors="ignore")
except OSError:
continue
block = f"\n### {rel}\n{content}\n"
block_bytes = len(block.encode("utf-8"))
if total + block_bytes > max_total_bytes:
break
chunks.append(block)
total += block_bytes
return "".join(chunks).strip()
def _collect_test_output(repo_dir: Path, timeout_seconds: int) -> str:
try:
completed = subprocess.run(
["pytest", "-q"],
cwd=repo_dir,
capture_output=True,
text=True,
timeout=timeout_seconds,
check=False,
)
output = (completed.stdout + "\n" + completed.stderr).strip()
return output[:10000]
except Exception as exc:
return f"Test execution unavailable: {exc}"
def _redact_secrets_from_diff(diff: str) -> str:
secret_terms = ("api_key", "token", "secret", "password", "private_key", "-----begin")
redacted_lines: list[str] = []
for line in diff.splitlines():
lower = line.lower()
if any(term in lower for term in secret_terms):
redacted_lines.append("[REDACTED_POTENTIAL_SECRET]")
else:
redacted_lines.append(line)
return "\n".join(redacted_lines)
def _build_prompt(
pr: PullRequestContext,
command: ParsedCommand,
diff_context: dict[str, Any],
repo_cfg: RepoReviewConfig,
*,
changed_file_contents: str,
test_output: str | None,
) -> str:
mode = command.mode if command.name in {"review", "rerun"} else "summary"
return (
"You are reviewing a Gitea pull request.\n\n"
"Focus only on issues introduced by this PR.\n"
"Prioritize correctness, security, data loss, broken behavior, bad migrations, and missing tests.\n"
"Avoid style nitpicks.\n\n"
"Return JSON only with schema:\n"
"{\n"
' "verdict": "correct" | "has_issues",\n'
' "confidence": 0.0,\n'
' "summary": "...",\n'
' "findings": [{"severity":"low|medium|high|critical","file":"...","line_start":1,"line_end":1,"title":"...","body":"...","suggestion":"..."}],\n'
' "markdown_comment": "Full markdown comment body to post to Gitea. Include clear section breaks and blank lines."\n'
"}\n\n"
f"PR URL: {pr.html_url}\n"
f"Mode: {mode}\n"
f"Trigger message: {command.raw}\n"
f"Repo focus: {', '.join(repo_cfg.focus)}\n"
f"Diff truncated: {diff_context['truncated']}\n"
f"Changed files:\n{os.linesep.join(diff_context['changed_files'])}\n\n"
f"Unified diff:\n{diff_context['diff']}\n\n"
f"Changed file content (optional):\n{changed_file_contents or '(not included)'}\n\n"
f"Test output (optional):\n{test_output or '(not included)'}\n"
)
def _call_openai_review(settings: Settings, prompt: str) -> dict[str, Any]:
api_key = settings.openai_api_key.get_secret_value() if settings.openai_api_key else ""
if not api_key.strip():
raise ReviewError("OPENAI_API_KEY is required for API-key review mode.")
headers: dict[str, str] = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
if settings.openai_org_id:
headers["OpenAI-Organization"] = settings.openai_org_id
if settings.openai_project_id:
headers["OpenAI-Project"] = settings.openai_project_id
body = {
"model": settings.openai_review_model,
"input": prompt,
"text": {"format": {"type": "json_object"}},
"reasoning": {"effort": settings.openai_reasoning_effort},
}
with httpx.Client(timeout=120.0) as client:
response = client.post("https://api.openai.com/v1/responses", headers=headers, json=body)
response.raise_for_status()
payload = response.json()
for item in payload.get("output", []):
for content in item.get("content", []):
text_value = content.get("text")
if text_value:
result = json.loads(text_value)
if isinstance(result, dict):
result["_meta"] = _build_openai_result_meta(payload, settings)
return result
raise ReviewError("OpenAI response did not contain JSON output text.")
def _build_openai_result_meta(payload: dict[str, Any], settings: Settings) -> dict[str, Any]:
usage_raw = payload.get("usage")
usage: dict[str, int] = {}
if isinstance(usage_raw, dict):
for output_key, source_key in (
("input_tokens", "input_tokens"),
("output_tokens", "output_tokens"),
("total_tokens", "total_tokens"),
):
value = usage_raw.get(source_key)
if isinstance(value, int):
usage[output_key] = value
model = payload.get("model")
if not isinstance(model, str) or not model.strip():
model = settings.openai_review_model
return {"source": "openai_api", "model": model, "usage": usage}
def _summarize_openai_failure(exc: Exception) -> str:
if isinstance(exc, httpx.HTTPStatusError):
status = exc.response.status_code
response_text = exc.response.text.strip()
if response_text:
compact = " ".join(response_text.split())
if len(compact) > 400:
compact = f"{compact[:400]}..."
return f"OpenAI API HTTP {status}: {compact}"
return f"OpenAI API HTTP {status}."
if isinstance(exc, httpx.TimeoutException):
return "OpenAI API request timed out."
message = str(exc).strip()
if message:
return message
return f"{exc.__class__.__name__} (no details)"
def _fallback_review(diff_context: dict[str, Any], *, failure_reason: str | None = None) -> dict[str, Any]:
findings: list[dict[str, Any]] = []
summary = "Fallback analysis was used because OpenAI review was unavailable."
if failure_reason:
summary = f"OpenAI review failed. Error: {failure_reason}"
findings.append(
{
"severity": "high",
"file": "unknown",
"line_start": 1,
"line_end": 1,
"title": "OpenAI review request failed",
"body": failure_reason,
"suggestion": "Fix API/auth/network issues and rerun @codex review.",
}
)
if "TODO" in diff_context["diff"]:
findings.append(
{
"severity": "low",
"file": "unknown",
"line_start": 1,
"line_end": 1,
"title": "TODO marker in diff",
"body": "The change introduces TODO markers that may indicate incomplete behavior.",
"suggestion": "Resolve or track TODOs before merging.",
}
)
return {
"verdict": "correct" if not findings else "has_issues",
"confidence": 0.4 if not findings else 0.6,
"summary": summary,
"findings": findings,
}
def run_review_for_pr(
settings: Settings,
gitea: GiteaClient,
repo: str,
pr_number: int,
command: ParsedCommand,
) -> tuple[dict[str, Any], RepoReviewConfig]:
prompt, diff_context, repo_cfg = prepare_review_prompt(settings, gitea, repo, pr_number, command)
try:
result = _call_openai_review(settings, prompt)
except Exception as exc:
result = _fallback_review(diff_context, failure_reason=_summarize_openai_failure(exc))
return normalize_review_result(result), repo_cfg
def prepare_review_prompt(
settings: Settings,
gitea: GiteaClient,
repo: str,
pr_number: int,
command: ParsedCommand,
) -> tuple[str, dict[str, Any], RepoReviewConfig]:
pr = gitea.get_pull_request(repo, pr_number)
with TemporaryDirectory(prefix="gitea-codex-") as tmp:
tmpdir = Path(tmp)
repo_dir = checkout_pr(tmpdir, pr)
repo_cfg = load_repo_review_config(repo_dir)
if command.name == "review" and not command.mode_explicit:
configured_mode = repo_cfg.default_mode
command.mode = configured_mode if configured_mode in {"summary", "security", "performance", "tests", "full"} else "summary"
diff_context = collect_diff_context(repo_dir, pr, min(settings.max_diff_bytes, repo_cfg.max_diff_bytes))
diff_context["changed_files"] = _apply_ignore_patterns(diff_context["changed_files"], repo_cfg.ignore)
diff_context["diff"] = _redact_secrets_from_diff(diff_context["diff"])
changed_file_contents = ""
if command.full:
changed_file_contents = _collect_changed_file_contents(repo_dir, diff_context["changed_files"], settings.max_diff_bytes)
test_output = None
if repo_cfg.include_tests and command.mode == "tests":
test_output = _collect_test_output(repo_dir, timeout_seconds=min(settings.max_review_minutes * 60, 300))
prompt = _build_prompt(
pr,
command,
diff_context,
repo_cfg,
changed_file_contents=changed_file_contents,
test_output=test_output,
)
return prompt, diff_context, repo_cfg
def normalize_review_result(result: Any) -> dict[str, Any]:
if not isinstance(result, dict):
raise ReviewError(f"Invalid review result type: {type(result)!r}")
@@ -19,3 +310,44 @@ def normalize_review_result(result: Any) -> dict[str, Any]:
if "confidence" not in result:
result["confidence"] = 0.5
return result
def summarize_command(command: ParsedCommand) -> str:
return " ".join(["@codex", command.name, *command.arguments]).strip()
def fix_branch_name(pr_number: int, arguments: list[str] | None = None) -> str:
suffix = "fix"
if arguments:
words = [token.lower().strip() for token in arguments if token.strip() and not token.startswith("--")]
if words:
clean = "-".join(words[:4])
cleaned = "".join(ch if ch.isalnum() or ch == "-" else "-" for ch in clean).strip("-")
if cleaned:
suffix = f"fix-{cleaned}"
return f"codex/pr-{pr_number}-{suffix}"
def create_fix_patch_note(command: ParsedCommand) -> str:
details = shlex.join(command.arguments) if command.arguments else "latest findings"
return f"Fix command requested for {details}."
def create_fix_branch(
pr: PullRequestContext,
*,
note: str,
arguments: list[str] | None = None,
) -> str:
branch = fix_branch_name(pr.pr_number, arguments=arguments)
with TemporaryDirectory(prefix="gitea-codex-fix-") as tmp:
tmpdir = Path(tmp)
repo_dir = checkout_pr(tmpdir, pr)
_run_git(["checkout", "-b", branch], cwd=repo_dir)
notes_dir = repo_dir / ".codex"
notes_dir.mkdir(parents=True, exist_ok=True)
(notes_dir / "fix-note.md").write_text(f"# Codex Fix Note\n\n{note}\n", encoding="utf-8")
_run_git(["add", ".codex/fix-note.md"], cwd=repo_dir)
_run_git(["-c", "user.name=codex-bot", "-c", "user.email=codex-bot@example.invalid", "commit", "-m", f"Codex fix note for PR {pr.pr_number}"], cwd=repo_dir)
_run_git(["push", "origin", f"{branch}:{branch}", "--force"], cwd=repo_dir)
return branch

View File

@@ -4,7 +4,7 @@ from dataclasses import dataclass, field
from typing import Literal
CommandName = Literal["review", "explain", "ignore", "rerun", "help"]
CommandName = Literal["review", "explain", "fix", "ignore", "rerun"]
@dataclass(slots=True)
@@ -14,4 +14,5 @@ class ParsedCommand:
mode: str = "summary"
mode_explicit: bool = False
full: bool = False
branch_fix: bool = False
arguments: list[str] = field(default_factory=list)

View File

@@ -12,48 +12,14 @@ from pathlib import Path
from typing import Any
from gitea_codex_bot.config import Settings
from gitea_codex_bot.services.gitea import GiteaClient, PullRequestContext
from gitea_codex_bot.services.repo_config import RepoReviewConfig, parse_repo_review_config_text
from gitea_codex_bot.services.reviewer import normalize_review_result
from gitea_codex_bot.services.gitea import GiteaClient
from gitea_codex_bot.services.repo_config import RepoReviewConfig
from gitea_codex_bot.services.reviewer import normalize_review_result, prepare_review_prompt, run_review_for_pr
from gitea_codex_bot.types import ParsedCommand
CONTAINER_CODEX_HOME = "/root/.codex"
REVIEW_OUTPUT_FILE = "/tmp/codex-review-result.json"
REVIEW_SCHEMA_FILE = "/tmp/codex-review-schema.json"
REVIEW_EMITTED_FILE = "/tmp/codex-review-emitted.flag"
RESULT_START_MARKER = "__CODEX_REVIEW_RESULT_BEGIN__"
RESULT_END_MARKER = "__CODEX_REVIEW_RESULT_END__"
logger = logging.getLogger(__name__)
REVIEW_RESULT_SCHEMA: dict[str, Any] = {
"type": "object",
"additionalProperties": False,
"required": ["verdict", "confidence", "summary", "findings", "markdown_comment"],
"properties": {
"verdict": {"type": "string", "enum": ["correct", "has_issues"]},
"confidence": {"type": "number"},
"summary": {"type": "string"},
"markdown_comment": {"type": "string"},
"findings": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": False,
"required": ["severity", "file", "line_start", "line_end", "title", "body", "suggestion"],
"properties": {
"severity": {"type": "string", "enum": ["low", "medium", "high", "critical"]},
"file": {"type": "string"},
"line_start": {"type": "integer"},
"line_end": {"type": "integer"},
"title": {"type": "string"},
"body": {"type": "string"},
"suggestion": {"type": ["string", "null"]},
},
},
},
},
}
def run_review_ephemeral(
settings: Settings,
@@ -63,131 +29,38 @@ def run_review_ephemeral(
command: ParsedCommand,
) -> tuple[dict[str, Any], RepoReviewConfig]:
gitea = GiteaClient(settings)
pr = gitea.get_pull_request(repo, pr_number)
repo_cfg = _load_repo_review_config_from_gitea(gitea, repo, pr.head_sha)
_apply_repo_default_review_mode(command, repo_cfg)
review_prompt = _build_exec_review_prompt(command, repo_cfg, pr)
prompt, _diff_context, repo_cfg = prepare_review_prompt(settings, gitea, repo, pr_number, command)
container_name = f"codex-review-{uuid.uuid4().hex[:12]}"
marker_nonce = uuid.uuid4().hex
result_start_marker = f"{RESULT_START_MARKER}_{marker_nonce}"
result_end_marker = f"{RESULT_END_MARKER}_{marker_nonce}"
extra_env: dict[str, str] = {
"GITEA_TOKEN": settings.gitea_token.get_secret_value(),
"GITEA_GIT_USERNAME": settings.gitea_bot_username,
}
if settings.openai_api_key:
extra_env["OPENAI_API_KEY"] = settings.openai_api_key.get_secret_value()
if settings.openai_org_id:
extra_env["OPENAI_ORG_ID"] = settings.openai_org_id
if settings.openai_project_id:
extra_env["OPENAI_PROJECT_ID"] = settings.openai_project_id
install_and_run = _build_install_and_run_command(settings)
extra_env: dict[str, str] = {}
if settings.codex_auth_mode == "chatgpt":
extra_env["CODEX_AUTH_JSON_B64"] = _load_codex_auth_json_b64(settings)
cmd = _build_docker_command(settings, container_name=container_name, install_and_run=install_and_run)
try:
completed = _run_ephemeral_container(
settings,
pr=pr,
container_name=container_name,
review_prompt=review_prompt,
result_start_marker=result_start_marker,
result_end_marker=result_end_marker,
extra_env=extra_env,
completed = subprocess.run(
cmd,
input=prompt,
text=True,
check=False,
capture_output=True,
timeout=settings.max_review_minutes * 60,
env={**os.environ, **extra_env},
)
if completed.returncode != 0:
raise RuntimeError(_format_runner_failure(completed))
parsed = _parse_review_result_from_stdout_artifact(
completed.stdout,
result_start_marker=result_start_marker,
result_end_marker=result_end_marker,
)
parsed = _parse_codex_exec_stdout(completed.stdout)
parsed["_meta"] = _extract_result_meta_from_codex_stdout(completed.stdout, settings)
return normalize_review_result(parsed), repo_cfg
except Exception as exc:
logger.warning("Ephemeral runner failed without host fallback: %s", exc)
return _ephemeral_runner_failure_result(exc, settings.codex_auth_mode), repo_cfg
if settings.codex_auth_mode == "chatgpt":
logger.warning("Ephemeral chatgpt runner failed, skipping API-key fallback: %s", exc)
return _chatgpt_runner_failure_result(exc), repo_cfg
result, _repo_cfg = run_review_for_pr(settings, gitea, repo, pr_number, command)
return result, _repo_cfg
def _run_ephemeral_container(
settings: Settings,
*,
pr: PullRequestContext,
container_name: str,
review_prompt: str,
result_start_marker: str,
result_end_marker: str,
extra_env: dict[str, str],
) -> subprocess.CompletedProcess[str]:
install_and_run = _build_install_and_run_command(
settings,
pr=pr,
review_prompt=review_prompt,
result_start_marker=result_start_marker,
result_end_marker=result_end_marker,
)
cmd = _build_docker_command(settings, container_name=container_name, install_and_run=install_and_run)
return subprocess.run(
cmd,
text=True,
check=False,
capture_output=True,
timeout=settings.max_review_minutes * 60,
env={**os.environ, **extra_env},
)
def _build_install_and_run_command(
settings: Settings,
*,
pr: PullRequestContext,
review_prompt: str,
result_start_marker: str,
result_end_marker: str,
) -> str:
runner_fallback_json = json.dumps(
{
"verdict": "has_issues",
"confidence": 0.67,
"summary": "Ephemeral codex execution failed before producing a review result.",
"markdown_comment": "Ephemeral codex execution failed before producing a review result.",
"findings": [
{
"severity": "high",
"file": "runner",
"line_start": 1,
"line_end": 1,
"title": "Ephemeral review runner failed",
"body": "codex exec failed before emitting a valid structured artifact.",
"suggestion": "Check ephemeral runner logs for auth/model/network issues and rerun @codex review.",
}
],
},
separators=(",", ":"),
)
steps = [
"set -euo pipefail",
f"rm -f {shlex.quote(REVIEW_EMITTED_FILE)}",
"emit_review_artifact() { "
"rc=\"$1\"; "
f"if [ ! -s {shlex.quote(REVIEW_OUTPUT_FILE)} ]; then "
f"cat > {shlex.quote(REVIEW_OUTPUT_FILE)} <<'JSON'\n{runner_fallback_json}\nJSON\n"
"fi; "
f'if [ ! -f {shlex.quote(REVIEW_EMITTED_FILE)} ]; then echo "{result_start_marker}"; cat {shlex.quote(REVIEW_OUTPUT_FILE)}; echo "{result_end_marker}"; touch {shlex.quote(REVIEW_EMITTED_FILE)}; fi; '
"return \"$rc\"; "
"}",
"trap 'rc=$?; set +e; emit_review_artifact \"$rc\"; exit \"$rc\"' EXIT",
]
if settings.codex_auth_mode != "chatgpt":
steps.extend(
[
'if [ -z "${OPENAI_API_KEY:-}" ]; then echo "OPENAI_API_KEY missing in runner env" >&2; exit 8; fi',
]
)
steps.extend(
[
'if [ -z "${GITEA_TOKEN:-}" ]; then echo "GITEA_TOKEN missing in runner env" >&2; exit 8; fi',
'if [ -z "${GITEA_GIT_USERNAME:-}" ]; then echo "GITEA_GIT_USERNAME missing in runner env" >&2; exit 8; fi',
]
)
def _build_install_and_run_command(settings: Settings) -> str:
steps = ["set -euo pipefail"]
if settings.codex_auth_mode == "chatgpt":
steps.extend(
[
@@ -198,101 +71,16 @@ def _build_install_and_run_command(
)
steps.extend(
[
"apt-get update >/tmp/apt-update.log 2>&1 && apt-get install -y --no-install-recommends ca-certificates git >/tmp/apt-install.log 2>&1 || { rc=$?; echo 'ca-certificates/git install failed'; tail -n 80 /tmp/apt-update.log || true; tail -n 80 /tmp/apt-install.log || true; exit $rc; }",
"npm install -g @openai/codex@latest >/tmp/codex-install.log 2>&1 || { rc=$?; echo 'codex install failed'; tail -n 200 /tmp/codex-install.log || true; exit $rc; }",
"codex --version >/tmp/codex-version.log 2>&1 || { rc=$?; echo 'codex version check failed'; tail -n 40 /tmp/codex-version.log || true; exit $rc; }",
]
)
schema_json = json.dumps(REVIEW_RESULT_SCHEMA, separators=(",", ":"))
steps.extend(
[
f"cat > {REVIEW_SCHEMA_FILE} <<'JSON'\n{schema_json}\nJSON",
'auth_b64="$(printf "%s" "${GITEA_GIT_USERNAME}:${GITEA_TOKEN}" | base64 | tr -d \'\\n\')"',
f'git -c http.extraHeader="Authorization: Basic $auth_b64" clone --no-tags --depth 80 {shlex.quote(pr.clone_url)} /work/repo',
"cd /work/repo",
"fetch_required() { "
"remote=\"$1\"; ref=\"$2\"; sha=\"$3\"; label=\"$4\"; "
"if git -c http.extraHeader=\"Authorization: Basic $auth_b64\" fetch --no-tags \"$remote\" \"$ref\"; then return 0; fi; "
"if git -c http.extraHeader=\"Authorization: Basic $auth_b64\" fetch --no-tags \"$remote\" \"$sha\"; then return 0; fi; "
"echo \"Failed to fetch $label from remote '$remote' using ref '$ref' or sha '$sha'\" >&2; "
"return 7; "
"}",
f"base_remote={'upstream' if pr.base_clone_url and pr.base_clone_url != pr.clone_url else 'origin'}",
f"if [ \"$base_remote\" = \"upstream\" ]; then git remote add upstream {shlex.quote(pr.base_clone_url or '')}; fi",
f"fetch_required origin {shlex.quote(pr.head_ref)} {shlex.quote(pr.head_sha)} head",
f"fetch_required \"$base_remote\" {shlex.quote(pr.base_ref)} {shlex.quote(pr.base_sha)} base",
f"git checkout --detach {shlex.quote(pr.head_sha)}",
'resolved_head="$(git rev-parse HEAD)"',
f'if [ "$resolved_head" != {shlex.quote(pr.head_sha)} ]; then echo "Checked out SHA mismatch: expected {pr.head_sha}, got $resolved_head" >&2; exit 9; fi',
"unset GITEA_TOKEN auth_b64",
"git config --global --unset-all http.extraHeader >/dev/null 2>&1 || true",
"apt-get update >/tmp/apt-update.log 2>&1 && apt-get install -y --no-install-recommends ca-certificates >/tmp/apt-install.log 2>&1 || { rc=$?; echo 'ca-certificates install failed'; tail -n 80 /tmp/apt-update.log || true; tail -n 80 /tmp/apt-install.log || true; exit $rc; }",
"npm install -g @openai/codex >/tmp/codex-install.log 2>&1 || { rc=$?; echo 'codex install failed'; tail -n 200 /tmp/codex-install.log || true; exit $rc; }",
]
)
model = settings.openai_review_model.strip()
codex_exec_parts = [
"codex exec",
"--sandbox",
"danger-full-access",
"--json",
"--output-schema",
shlex.quote(REVIEW_SCHEMA_FILE),
"-o",
shlex.quote(REVIEW_OUTPUT_FILE),
]
if model:
codex_exec_parts.append(f"-m {shlex.quote(model)}")
codex_exec_parts.append(shlex.quote(review_prompt))
steps.extend(
[
"set +e",
"codex_rc=0",
" ".join(codex_exec_parts) + ' || codex_rc="$?"',
"set -e",
f'if [ "$codex_rc" -ne 0 ] || [ ! -s {shlex.quote(REVIEW_OUTPUT_FILE)} ]; then cat > {REVIEW_OUTPUT_FILE} <<\'JSON\'\n{runner_fallback_json}\nJSON\nfi',
"emit_review_artifact 0",
]
)
return "\n".join(steps)
def _apply_repo_default_review_mode(command: ParsedCommand, repo_cfg: RepoReviewConfig) -> None:
if command.name != "review" or command.mode_explicit:
return
configured_mode = repo_cfg.default_mode
command.mode = configured_mode if configured_mode in {"summary", "security", "performance", "tests", "full"} else "summary"
def _build_exec_review_prompt(command: ParsedCommand, repo_cfg: RepoReviewConfig, pr: PullRequestContext) -> str:
raw = (command.raw or "").strip()
remainder = raw
match = re.match(r"^@[^\s]+\s+\S+\s*(.*)$", raw, flags=re.IGNORECASE | re.DOTALL)
if match:
remainder = match.group(1).strip()
intent = remainder or "review this pull request and report introduced issues."
focus = ", ".join(repo_cfg.focus) if repo_cfg.focus else "correctness, security, maintainability"
ignore = ", ".join(repo_cfg.ignore) if repo_cfg.ignore else "(none)"
mode = command.mode if command.name in {"review", "rerun"} else "summary"
allow_test_execution = command.mode == "tests" or repo_cfg.include_tests
tests_policy = (
"Tests may be executed for this run because tests mode/include_tests is explicitly enabled."
if allow_test_execution
else "Do not run tests, benchmarks, or other executables. Review changes statically unless explicitly asked."
)
return "\n".join(
[
f"review: {intent}",
"Review only issues introduced by this PR.",
f"Compare exactly these commits: base `{pr.base_sha}` ... head `{pr.head_sha}`.",
"Use local git data from this checkout; do not review unrelated history.",
f"Requested mode: {mode}.",
f"Focus areas: {focus}.",
f"Ignore patterns: {ignore}.",
f"Include tests setting: {repo_cfg.include_tests}.",
tests_policy,
f"Full review requested: {command.full}.",
"Return strict JSON matching the provided output schema.",
]
)
steps.append(f"codex exec --skip-git-repo-check --json -m {shlex.quote(model)}")
else:
steps.append("codex exec --skip-git-repo-check --json")
return "; ".join(steps)
def _build_docker_command(settings: Settings, *, container_name: str, install_and_run: str) -> list[str]:
@@ -305,8 +93,6 @@ def _build_docker_command(settings: Settings, *, container_name: str, install_an
container_name,
"-e",
"CODEX_DISABLE_TELEMETRY=1",
"-e",
"CODEX_SANDBOX_MODE=danger-full-access",
]
if settings.codex_auth_mode == "chatgpt":
cmd.extend(
@@ -328,25 +114,16 @@ def _build_docker_command(settings: Settings, *, container_name: str, install_an
"OPENAI_PROJECT_ID",
]
)
cmd.extend(
[
"-e",
"GITEA_TOKEN",
"-e",
"GITEA_GIT_USERNAME",
]
)
cmd.extend([settings.review_runner_image, "bash", "-lc", install_and_run])
return cmd
def _ephemeral_runner_failure_result(exc: Exception, auth_mode: str) -> dict[str, Any]:
def _chatgpt_runner_failure_result(exc: Exception) -> dict[str, Any]:
message = str(exc).strip() or exc.__class__.__name__
mode_label = "ChatGPT auth" if auth_mode == "chatgpt" else "API-key auth"
summary = f"{mode_label} runner failed before review execution. Error: {message}"
summary = f"ChatGPT auth runner failed before review execution. Error: {message}"
return {
"verdict": "has_issues",
"confidence": 0.67,
"confidence": 0.6,
"summary": summary,
"findings": [
{
@@ -354,7 +131,7 @@ def _ephemeral_runner_failure_result(exc: Exception, auth_mode: str) -> dict[str
"file": "runner",
"line_start": 1,
"line_end": 1,
"title": "Ephemeral review runner failed",
"title": "Ephemeral chatgpt review runner failed",
"body": message,
"suggestion": "Check ephemeral runner logs for model/auth/network issues, then rerun @codex review.",
}
@@ -404,39 +181,27 @@ def ensure_workdir(path: str) -> Path:
return target
def _load_repo_review_config_from_gitea(gitea: GiteaClient, repo: str, head_sha: str) -> RepoReviewConfig:
content = gitea.get_file_content(repo, ".codex-review.yml", ref=head_sha)
if content is None:
return RepoReviewConfig(configured=False)
return parse_repo_review_config_text(content, configured=True)
def _parse_review_result_from_stdout_artifact(
stdout: str,
*,
result_start_marker: str,
result_end_marker: str,
) -> dict[str, Any]:
start_pos = stdout.find(result_start_marker)
if start_pos == -1:
raise RuntimeError("Runner output did not include final review artifact markers.")
artifact_start = start_pos + len(result_start_marker)
# Prefer the last end marker so marker-like text inside JSON does not
# truncate the payload when earlier incidental matches exist.
end_pos = stdout.rfind(result_end_marker)
if end_pos == -1 or end_pos <= artifact_start:
raise RuntimeError("Runner output did not include final review artifact markers.")
artifact = stdout[artifact_start:end_pos].strip()
if not artifact:
raise RuntimeError("Runner output contained empty final review artifact.")
try:
payload = json.loads(artifact)
except json.JSONDecodeError as exc:
raise RuntimeError(f"Final review artifact was not valid JSON: {exc}") from exc
if not isinstance(payload, dict):
raise RuntimeError(f"Final review artifact JSON must be an object, got {type(payload)!r}.")
return payload
def _parse_codex_exec_stdout(stdout: str) -> dict[str, Any]:
last_text: str | None = None
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
payload = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(payload, dict) and {"verdict", "summary", "findings"}.issubset(payload.keys()):
return payload
extracted = _extract_text(payload)
if extracted:
last_text = extracted
parsed = _parse_review_json_from_text(extracted)
if parsed:
return parsed
if not last_text:
raise RuntimeError("codex exec output did not include parseable review payload text")
raise RuntimeError(f"codex exec output text did not contain review JSON; text_tail={_tail_text(last_text, 400)}")
def _extract_result_meta_from_codex_stdout(stdout: str, settings: Settings) -> dict[str, Any]:
@@ -498,3 +263,49 @@ def _find_first_dict_for_key(payload: Any, key: str) -> dict[str, Any] | None:
if found:
return found
return None
def _parse_review_json_from_text(text: str) -> dict[str, Any] | None:
candidates: list[str] = [text.strip()]
fenced = re.search(r"```(?:json)?\s*(\{.*\})\s*```", text, flags=re.DOTALL | re.IGNORECASE)
if fenced:
candidates.append(fenced.group(1).strip())
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end > start:
candidates.append(text[start : end + 1].strip())
seen: set[str] = set()
for candidate in candidates:
if not candidate or candidate in seen:
continue
seen.add(candidate)
try:
payload = json.loads(candidate)
except json.JSONDecodeError:
continue
if isinstance(payload, dict) and {"verdict", "summary", "findings"}.issubset(payload.keys()):
return payload
return None
def _extract_text(payload: Any) -> str | None:
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
for key in ("text", "message", "content", "output"):
value = payload.get(key)
text = _extract_text(value)
if text:
return text
for value in payload.values():
if not isinstance(value, (dict, list)):
continue
text = _extract_text(value)
if text:
return text
if isinstance(payload, list):
for item in payload:
text = _extract_text(item)
if text:
return text
return None

View File

@@ -4,16 +4,18 @@ import asyncio
import logging
from typing import Any
from sqlalchemy import func, select
import httpx
from sqlalchemy import select
from sqlalchemy.orm import Session
from gitea_codex_bot.config import Settings
from gitea_codex_bot.db import get_session_factory
from gitea_codex_bot.models import JobStatus, ReviewJob
from gitea_codex_bot.services.comments import upsert_persistent_review_comment_id
from gitea_codex_bot.models import ReviewJob
from gitea_codex_bot.services.comments import get_persistent_review_comment_id, upsert_persistent_review_comment_id
from gitea_codex_bot.services.gitea import GiteaClient
from gitea_codex_bot.services.jobs import claim_next_job, finish_job
from gitea_codex_bot.services.jobs import claim_next_job, finish_job, recover_stuck_jobs
from gitea_codex_bot.services.review_format import format_disabled_ack, format_result_comment
from gitea_codex_bot.services.reviewer import create_fix_branch, create_fix_patch_note
from gitea_codex_bot.types import ParsedCommand
from gitea_codex_bot.workers.container_runner import run_review_ephemeral
@@ -23,7 +25,7 @@ logger = logging.getLogger(__name__)
def _command_from_job(job: ReviewJob) -> ParsedCommand:
args = job.command_args.split() if job.command_args else []
raw = (job.trigger_comment_body or "").strip() or f"@codex {job.command}"
return ParsedCommand(name=job.command, raw=raw, arguments=args, full="--full" in args)
return ParsedCommand(name=job.command, raw=raw, arguments=args, full="--full" in args, branch_fix="--branch" in args)
def _handle_non_review_command(
@@ -33,13 +35,6 @@ def _handle_non_review_command(
job: ReviewJob,
command: ParsedCommand,
) -> tuple[bool, bool, dict[str, Any] | None, str | None]:
if command.name == "help":
try:
message = _build_help_comment(settings, session, gitea, job)
gitea.post_issue_comment(job.repo, job.pr_number, message)
return True, True, {"summary": "Help/status summary posted."}, None
except Exception as exc:
return True, False, None, f"Failed to post help summary: {exc}"
if command.name == "ignore":
return True, True, {"summary": "Ignore command acknowledged. No review run executed."}, None
if command.name == "explain":
@@ -60,136 +55,66 @@ def _handle_non_review_command(
message = "## Codex Explain\n\nNo previous result found for this command."
gitea.post_issue_comment(job.repo, job.pr_number, message)
return True, True, {"summary": message}, None
if str(command.name).lower() == "fix":
message = "⚠️ `@codex fix` is no longer supported on this bot."
gitea.post_issue_comment(job.repo, job.pr_number, message)
return True, True, {"summary": message}, None
if command.name == "fix":
if not settings.enable_fix_commands:
message = "⚠️ `@codex fix` is disabled on this bot instance."
gitea.post_issue_comment(job.repo, job.pr_number, message)
return True, True, {"summary": message}, None
note = create_fix_patch_note(command)
if command.branch_fix:
try:
pr = gitea.get_pull_request(job.repo, job.pr_number)
branch = create_fix_branch(pr, note=note, arguments=command.arguments)
message = f"## Codex Fix\n\n{note}\n\nCreated branch `{branch}`."
gitea.post_issue_comment(job.repo, job.pr_number, message)
return True, True, {"summary": note, "mode": "branch", "branch": branch}, None
except Exception as exc:
return True, False, None, f"Failed to create fix branch: {exc}"
gitea.post_issue_comment(job.repo, job.pr_number, f"## Codex Fix\n\n{note}\n\nPatch suggestion mode.")
return True, True, {"summary": note, "mode": "patch"}, None
return False, False, None, None
def _build_help_comment(settings: Settings, session: Session, gitea: GiteaClient, job: ReviewJob) -> str:
comments = gitea.list_issue_comments(job.repo, job.pr_number)
comment_summaries = _summarize_comments(comments, settings.gitea_bot_username)
latest_review = session.execute(
select(ReviewJob)
.where(
ReviewJob.repo == job.repo,
ReviewJob.pr_number == job.pr_number,
ReviewJob.command.in_(["review", "rerun"]),
)
.order_by(ReviewJob.id.desc())
.limit(1)
).scalar_one_or_none()
pending_count = session.execute(
select(func.count(ReviewJob.id)).where(
ReviewJob.repo == job.repo,
ReviewJob.pr_number == job.pr_number,
ReviewJob.status.in_([JobStatus.queued, JobStatus.running]),
)
).scalar_one()
latest_status_line = "No previous review run."
if latest_review is not None:
latest_status = latest_review.status.value if hasattr(latest_review.status, "value") else str(latest_review.status)
latest_summary = ""
if isinstance(latest_review.result_json, dict):
summary_raw = latest_review.result_json.get("summary")
if isinstance(summary_raw, str):
latest_summary = " ".join(summary_raw.split())
latest_status_line = f"Latest review command: `{latest_review.command}` status `{latest_status}`."
if latest_summary:
latest_status_line = f"{latest_status_line} Summary: {latest_summary[:180]}"
lines = [
"## Codex Help",
"",
"Supported commands:",
"- `@codex review [security|performance|tests] [--full]`",
"- `@codex rerun`",
"- `@codex explain`",
"- `@codex ignore`",
"- `@codex -h` / `@codex --help` / `@codex help`",
"",
"Status note:",
f"- Pending jobs on this PR: `{pending_count}`",
f"- {latest_status_line}",
"",
f"Discussion summary ({comment_summaries['total']} comments, human `{comment_summaries['human']}`, bot `{comment_summaries['bot']}`):",
]
if comment_summaries["items"]:
lines.extend(comment_summaries["items"])
else:
lines.append("- No comments available to summarize.")
return "\n".join(lines).strip()
def _summarize_comments(comments: list[dict[str, Any]], bot_username: str) -> dict[str, Any]:
normalized_bot = (bot_username or "").strip().lower()
bot_count = 0
summarized: list[str] = []
recent = comments[-8:] if comments else []
for row in comments:
user = row.get("user")
username = ""
if isinstance(user, dict):
username = str(user.get("username") or user.get("login") or "").strip().lower()
if username and username == normalized_bot:
bot_count += 1
for row in recent:
body_raw = str(row.get("body") or "").strip()
if not body_raw:
continue
one_line = " ".join(body_raw.split())
preview = one_line if len(one_line) <= 180 else f"{one_line[:180]}..."
user = row.get("user")
username = "unknown"
if isinstance(user, dict):
username = str(user.get("username") or user.get("login") or "unknown").strip() or "unknown"
summarized.append(f"- @{username}: {preview}")
total = len(comments)
human_count = max(total - bot_count, 0)
return {"total": total, "human": human_count, "bot": bot_count, "items": summarized}
def _post_review_failure_comment(gitea: GiteaClient, job: ReviewJob, error_message: str) -> None:
message = (
"⚠️ Codex review run failed after queueing.\n\n"
f"- Commit: `{job.head_sha[:7]}`\n"
f"- Error: `{error_message[:500]}`\n\n"
"Please rerun `@codex rerun` after checking worker logs."
)
gitea.post_issue_comment(job.repo, job.pr_number, message)
def process_one_job(settings: Settings) -> bool:
session_factory = get_session_factory()
gitea = GiteaClient(settings)
with session_factory() as session:
recoveries = recover_stuck_jobs(
session,
lease_timeout_seconds=settings.job_lease_timeout_seconds,
action=settings.stuck_job_recovery_action,
max_retries=settings.max_stuck_job_retries,
)
for recovery in recoveries:
if not recovery.failed:
continue
try:
gitea.post_issue_comment(
recovery.repo,
recovery.pr_number,
(
"⚠️ Codex worker recovery failed this job after stuck lease retries were exhausted.\n\n"
f"- job_id: `{recovery.job_id}`\n"
f"- retries used: `{recovery.retries_used}`\n"
f"- details: {recovery.message}\n\n"
"Please re-run with `@codex review` after investigating runner/worker stability."
),
)
except Exception:
logger.exception("Failed to post stuck-job exhaustion comment for job id=%s", recovery.job_id)
with session_factory() as session:
job = claim_next_job(session)
if not job:
return False
command = _command_from_job(job)
gitea = GiteaClient(settings)
logger.info(
"Processing job id=%s repo=%s pr=%s command=%s args=%s head_sha=%s",
job.id,
job.repo,
job.pr_number,
command.name,
command.arguments,
job.head_sha,
)
with session_factory() as session:
db_job = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
handled, skipped, result, error = _handle_non_review_command(settings, session, gitea, db_job, command)
if handled:
logger.info(
"Non-review command handled job id=%s command=%s skipped=%s error_present=%s",
db_job.id,
command.name,
skipped,
bool(error),
)
finish_job(session, job_id=db_job.id, success=error is None, skipped=skipped, result=result, error_message=error)
return True
@@ -209,15 +134,6 @@ def process_one_job(settings: Settings) -> bool:
)
return True
result, repo_cfg = run_review_ephemeral(settings, repo=job.repo, pr_number=job.pr_number, command=command)
logger.info(
"Runner returned job id=%s repo=%s pr=%s repo_cfg_enabled=%s repo_cfg_configured=%s result_keys=%s",
job.id,
job.repo,
job.pr_number,
repo_cfg.enabled,
repo_cfg.configured,
sorted(result.keys()),
)
if not repo_cfg.enabled:
with session_factory() as session:
gitea.post_issue_comment(job.repo, job.pr_number, format_disabled_ack())
@@ -232,14 +148,22 @@ def process_one_job(settings: Settings) -> bool:
return True
comment_body = format_result_comment(job.head_sha, result, repo_configured=repo_cfg.configured)
with session_factory() as session:
comment_id = gitea.post_issue_comment(job.repo, job.pr_number, comment_body)
logger.info(
"Posted review comment job id=%s repo=%s pr=%s comment_id=%s",
job.id,
job.repo,
job.pr_number,
comment_id,
)
comment_id = get_persistent_review_comment_id(session, job.repo, job.pr_number)
if comment_id:
try:
gitea.edit_issue_comment(job.repo, comment_id, comment_body)
except httpx.HTTPStatusError as exc:
if exc.response.status_code != 404:
raise
logger.warning(
"Persistent review comment not found; posting a new one repo=%s pr=%s old_comment_id=%s",
job.repo,
job.pr_number,
comment_id,
)
comment_id = gitea.post_issue_comment(job.repo, job.pr_number, comment_body)
else:
comment_id = gitea.post_issue_comment(job.repo, job.pr_number, comment_body)
upsert_persistent_review_comment_id(
session,
repo=job.repo,
@@ -247,24 +171,11 @@ def process_one_job(settings: Settings) -> bool:
head_sha=job.head_sha,
comment_id=comment_id,
)
logger.info(
"Persistent comment mapping upserted job id=%s repo=%s pr=%s comment_id=%s head_sha=%s",
job.id,
job.repo,
job.pr_number,
comment_id,
job.head_sha,
)
finish_job(session, job_id=job.id, success=True, skipped=False, result=result, error_message=None)
except Exception as exc:
logger.exception("Review job failed id=%s", job.id)
error_text = str(exc).strip() or exc.__class__.__name__
try:
_post_review_failure_comment(gitea, job, error_text)
except Exception:
logger.exception("Failed to post review failure comment id=%s", job.id)
with session_factory() as session:
finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message=error_text)
finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message=str(exc))
return True

View File

@@ -4,8 +4,9 @@ import json
import sys
from gitea_codex_bot.config import get_settings
from gitea_codex_bot.services.gitea import GiteaClient
from gitea_codex_bot.services.reviewer import run_review_for_pr
from gitea_codex_bot.types import ParsedCommand
from gitea_codex_bot.workers.container_runner import run_review_ephemeral
def main() -> int:
@@ -17,14 +18,11 @@ def main() -> int:
raw=f"@codex {command_payload['name']}",
mode=command_payload.get("mode", "summary"),
full=bool(command_payload.get("full", False)),
branch_fix=bool(command_payload.get("branch_fix", False)),
arguments=list(command_payload.get("arguments", [])),
)
result, _repo_cfg = run_review_ephemeral(
settings,
repo=payload["repo"],
pr_number=int(payload["pr_number"]),
command=command,
)
gitea = GiteaClient(settings)
result, _repo_cfg = run_review_for_pr(settings, gitea, payload["repo"], int(payload["pr_number"]), command)
print(json.dumps(result))
return 0

View File

@@ -1,4 +1,4 @@
from gitea_codex_bot.services.commands import detect_prefixed_command, parse_command
from gitea_codex_bot.services.commands import parse_command
def test_parse_review_command_modes() -> None:
@@ -17,43 +17,12 @@ def test_parse_review_command_defaults_to_non_explicit_summary_mode() -> None:
assert cmd.mode_explicit is False
def test_parse_fix_command_returns_none() -> None:
assert parse_command("@codex fix --branch finding 2") is None
def test_parse_fix_branch() -> None:
cmd = parse_command("@codex fix --branch finding 2")
assert cmd is not None
assert cmd.name == "fix"
assert cmd.branch_fix is True
def test_invalid_command_returns_none() -> None:
assert parse_command("hello") is None
def test_parse_review_command_for_bot_username_alias() -> None:
cmd = parse_command("@codex-bot review", aliases={"codex", "codex-bot"})
assert cmd is not None
assert cmd.name == "review"
def test_parse_review_command_for_custom_alias() -> None:
cmd = parse_command("@review-buddy review tests", aliases={"codex", "review-buddy"})
assert cmd is not None
assert cmd.name == "review"
assert cmd.mode == "tests"
def test_parse_help_short_flag() -> None:
cmd = parse_command("@codex -h")
assert cmd is not None
assert cmd.name == "help"
def test_parse_help_long_flag_and_arguments() -> None:
cmd = parse_command("@codex --help status quick", aliases={"codex"})
assert cmd is not None
assert cmd.name == "help"
assert cmd.arguments == ["status", "quick"]
def test_detect_prefixed_command_for_unsupported_name() -> None:
assert detect_prefixed_command("@codex shipit now", aliases={"codex"}) == "shipit"
def test_detect_prefixed_command_returns_none_for_non_alias() -> None:
assert detect_prefixed_command("@someone review", aliases={"codex"}) is None

View File

@@ -11,16 +11,3 @@ def test_codex_auth_defaults_to_api_key_mode() -> None:
settings = get_settings()
assert settings.codex_auth_mode == "api_key"
assert settings.codex_auth_json_path is None
def test_bot_command_aliases_include_codex_and_username() -> None:
settings = get_settings()
assert settings.bot_command_aliases == {"codex", "codex-bot"}
def test_bot_command_aliases_include_custom_mentions(monkeypatch) -> None:
monkeypatch.setenv("GITEA_BOT_MENTIONS", "@review-buddy,helper-bot")
get_settings.cache_clear()
settings = get_settings()
assert settings.bot_command_aliases == {"codex", "codex-bot", "review-buddy", "helper-bot"}

View File

@@ -5,56 +5,18 @@ from pathlib import Path
import pytest
from gitea_codex_bot.config import get_settings
from gitea_codex_bot.services.gitea import PullRequestContext
from gitea_codex_bot.services.repo_config import RepoReviewConfig
from gitea_codex_bot.types import ParsedCommand
from gitea_codex_bot.workers.container_runner import (
CONTAINER_CODEX_HOME,
RESULT_END_MARKER,
RESULT_START_MARKER,
_apply_repo_default_review_mode,
_build_docker_command,
_build_exec_review_prompt,
_build_install_and_run_command,
_extract_result_meta_from_codex_stdout,
_load_codex_auth_json_b64,
_load_repo_review_config_from_gitea,
_parse_review_result_from_stdout_artifact,
_parse_codex_exec_stdout,
_resolve_codex_auth_json_path,
run_review_ephemeral,
)
def _sample_pr() -> PullRequestContext:
return PullRequestContext(
repo="acme/repo",
pr_number=1,
base_ref="main",
base_sha="b" * 40,
head_ref="feature",
head_sha="a" * 40,
clone_url="https://gitea.test/acme/repo.git",
html_url="https://gitea.test/acme/repo/pulls/1",
is_fork=False,
)
def _sample_fork_pr() -> PullRequestContext:
return PullRequestContext(
repo="acme/repo",
pr_number=2,
base_ref="main",
base_sha="c" * 40,
head_ref="feature",
head_sha="d" * 40,
clone_url="https://gitea.test/fork/repo.git",
base_clone_url="https://gitea.test/acme/repo.git",
head_clone_url="https://gitea.test/fork/repo.git",
html_url="https://gitea.test/acme/repo/pulls/2",
is_fork=True,
)
def test_build_docker_command_api_key_mode_uses_openai_env() -> None:
settings = get_settings()
@@ -63,8 +25,6 @@ def test_build_docker_command_api_key_mode_uses_openai_env() -> None:
assert "OPENAI_API_KEY" in cmd
assert "OPENAI_ORG_ID" in cmd
assert "OPENAI_PROJECT_ID" in cmd
assert "GITEA_TOKEN" in cmd
assert "GITEA_GIT_USERNAME" in cmd
assert "--mount" not in cmd
@@ -85,82 +45,20 @@ def test_build_docker_command_chatgpt_mode_mounts_auth_json(
assert "OPENAI_API_KEY" not in cmd
assert f"CODEX_HOME={CONTAINER_CODEX_HOME}" in env_items
assert "CODEX_AUTH_JSON_B64" in env_items
assert "GITEA_TOKEN" in env_items
assert "GITEA_GIT_USERNAME" in env_items
def test_build_install_command_chatgpt_mode_sets_git_checkout_and_review(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
def test_build_install_command_chatgpt_mode_copies_auth_json(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
auth_file = tmp_path / "auth.json"
auth_file.write_text("{}", encoding="utf-8")
monkeypatch.setenv("CODEX_AUTH_MODE", "chatgpt")
monkeypatch.setenv("CODEX_AUTH_JSON_PATH", str(auth_file))
get_settings.cache_clear()
settings = get_settings()
pr = _sample_pr()
command = _build_install_and_run_command(
settings,
pr=pr,
review_prompt="review: security --full",
result_start_marker=f"{RESULT_START_MARKER}_x",
result_end_marker=f"{RESULT_END_MARKER}_x",
)
command = _build_install_and_run_command(settings)
assert 'printf "%s" "$CODEX_AUTH_JSON_B64" | base64 -d > /root/.codex/auth.json' in command
assert "git -c http.extraHeader=" in command
assert f"clone --no-tags --depth 80 {pr.clone_url} /work/repo" in command
assert "fetch_required() {" in command
assert f"fetch_required origin {pr.head_ref} {pr.head_sha} head" in command
assert f"fetch_required \"$base_remote\" {pr.base_ref} {pr.base_sha} base" in command
assert "base_remote=origin" in command
assert f"git checkout --detach {pr.head_sha}" in command
assert "resolved_head=\"$(git rev-parse HEAD)\"" in command
assert "unset GITEA_TOKEN auth_b64" in command
assert (
"codex exec --sandbox danger-full-access --json --output-schema /tmp/codex-review-schema.json "
"-o /tmp/codex-review-result.json"
) in command
assert "review: security --full" in command
assert "--output-schema /tmp/codex-review-schema.json" in command
assert "-o /tmp/codex-review-result.json" in command
assert "npm install -g @openai/codex@latest" in command
assert "codex --version >/tmp/codex-version.log" in command
assert " - " not in command
assert f'echo "{RESULT_START_MARKER}_x"' in command
assert f'echo "{RESULT_END_MARKER}_x"' in command
def test_build_install_command_does_not_include_reasoning_effort_flag() -> None:
settings = get_settings()
pr = _sample_pr()
command = _build_install_and_run_command(
settings,
pr=pr,
review_prompt="review: tests",
result_start_marker=f"{RESULT_START_MARKER}_x",
result_end_marker=f"{RESULT_END_MARKER}_x",
)
assert "--reasoning-effort" not in command
def test_build_install_command_uses_upstream_remote_for_fork_pr_base_fetch() -> None:
settings = get_settings()
pr = _sample_fork_pr()
command = _build_install_and_run_command(
settings,
pr=pr,
review_prompt="review: tests",
result_start_marker=f"{RESULT_START_MARKER}_x",
result_end_marker=f"{RESULT_END_MARKER}_x",
)
assert "base_remote=upstream" in command
assert f"git remote add upstream {pr.base_clone_url}" in command
assert f"fetch_required origin {pr.head_ref} {pr.head_sha} head" in command
assert f"fetch_required \"$base_remote\" {pr.base_ref} {pr.base_sha} base" in command
assert "codex exec --skip-git-repo-check --json -m gpt-5.3-codex" in command
def test_chatgpt_mode_requires_existing_auth_json(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
@@ -187,29 +85,6 @@ def test_load_codex_auth_json_b64_roundtrip(monkeypatch: pytest.MonkeyPatch, tmp
assert encoded
def test_load_repo_review_config_from_gitea_when_missing() -> None:
class _Gitea:
def get_file_content(self, *_args, **_kwargs):
return None
cfg = _load_repo_review_config_from_gitea(_Gitea(), "acme/repo", "a" * 40)
assert cfg.configured is False
assert cfg.enabled is True
def test_load_repo_review_config_from_gitea_when_present() -> None:
class _Gitea:
def get_file_content(self, *_args, **_kwargs):
return "enabled: false\nreview:\n default_mode: tests\n"
cfg = _load_repo_review_config_from_gitea(_Gitea(), "acme/repo", "a" * 40)
assert cfg.configured is True
assert cfg.enabled is False
assert cfg.default_mode == "tests"
def test_run_review_ephemeral_chatgpt_does_not_fallback_to_api_key_path(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
@@ -221,22 +96,23 @@ def test_run_review_ephemeral_chatgpt_does_not_fallback_to_api_key_path(
get_settings.cache_clear()
settings = get_settings()
class _FakeGiteaClient:
def __init__(self, _settings) -> None:
pass
def get_pull_request(self, *_args, **_kwargs):
return _sample_pr()
def get_file_content(self, *_args, **_kwargs):
return None
monkeypatch.setattr("gitea_codex_bot.workers.container_runner.GiteaClient", _FakeGiteaClient)
monkeypatch.setattr(
"gitea_codex_bot.workers.container_runner.prepare_review_prompt",
lambda *_args, **_kwargs: ("prompt", {"diff": ""}, object()),
)
monkeypatch.setattr("gitea_codex_bot.workers.container_runner.GiteaClient", lambda _settings: object())
monkeypatch.setattr(
"gitea_codex_bot.workers.container_runner.subprocess.run",
lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("docker unavailable")),
)
def _api_fallback_should_not_run(*_args, **_kwargs):
raise AssertionError("API-key fallback should not run in chatgpt mode")
monkeypatch.setattr("gitea_codex_bot.workers.container_runner.run_review_for_pr", _api_fallback_should_not_run)
from gitea_codex_bot.types import ParsedCommand
result, _repo_cfg = run_review_ephemeral(
settings,
repo="acme/repo",
@@ -248,191 +124,33 @@ def test_run_review_ephemeral_chatgpt_does_not_fallback_to_api_key_path(
assert "ChatGPT auth runner failed" in result["summary"]
def test_run_review_ephemeral_api_key_mode_does_not_fallback_to_host(monkeypatch: pytest.MonkeyPatch) -> None:
get_settings.cache_clear()
settings = get_settings()
class _FakeGiteaClient:
def __init__(self, _settings) -> None:
pass
def get_pull_request(self, *_args, **_kwargs):
return _sample_pr()
def get_file_content(self, *_args, **_kwargs):
return None
monkeypatch.setattr("gitea_codex_bot.workers.container_runner.GiteaClient", _FakeGiteaClient)
monkeypatch.setattr(
"gitea_codex_bot.workers.container_runner.subprocess.run",
lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("docker unavailable")),
)
result, _repo_cfg = run_review_ephemeral(
settings,
repo="acme/repo",
pr_number=1,
command=ParsedCommand(name="review", raw="@codex review"),
)
assert result["verdict"] == "has_issues"
assert "API-key auth runner failed" in result["summary"]
def test_run_review_ephemeral_single_attempt_success(monkeypatch: pytest.MonkeyPatch) -> None:
get_settings.cache_clear()
settings = get_settings()
class _FakeGiteaClient:
def __init__(self, _settings) -> None:
pass
def get_pull_request(self, *_args, **_kwargs):
return _sample_pr()
def get_file_content(self, *_args, **_kwargs):
return None
monkeypatch.setattr("gitea_codex_bot.workers.container_runner.GiteaClient", _FakeGiteaClient)
monkeypatch.setattr(
"gitea_codex_bot.workers.container_runner.uuid.uuid4",
lambda: type("U", (), {"hex": "abc123def4567890abc123def4567890"})(),
)
calls: list[list[str]] = []
def _fake_run(cmd, *args, **kwargs):
calls.append(cmd)
return type(
"Completed",
(),
{
"returncode": 0,
"stdout": (
'{"type":"response.started","model":"gpt-5.3-codex"}\n'
f"{RESULT_START_MARKER}_abc123def4567890abc123def4567890\n"
'{"verdict":"correct","confidence":0.9,"summary":"ok","findings":[],"markdown_comment":"ok"}\n'
f"{RESULT_END_MARKER}_abc123def4567890abc123def4567890\n"
),
"stderr": "",
},
)()
monkeypatch.setattr("gitea_codex_bot.workers.container_runner.subprocess.run", _fake_run)
result, _repo_cfg = run_review_ephemeral(
settings,
repo="acme/repo",
pr_number=1,
command=ParsedCommand(name="review", raw="@codex review"),
)
assert result["verdict"] == "correct"
assert len(calls) == 1
first_shell = calls[0][-1]
assert "--reasoning-effort" not in first_shell
def test_parse_review_result_from_stdout_artifact() -> None:
stdout = (
"noise\n"
f"{RESULT_START_MARKER}_test\n"
'{"verdict":"correct","confidence":0.9,"summary":"ok","findings":[],"markdown_comment":"ok"}\n'
f"{RESULT_END_MARKER}_test\n"
)
parsed = _parse_review_result_from_stdout_artifact(
stdout,
result_start_marker=f"{RESULT_START_MARKER}_test",
result_end_marker=f"{RESULT_END_MARKER}_test",
def test_parse_codex_exec_stdout_from_stream_item_text_json() -> None:
stdout = '\n'.join(
[
'{"type":"thread.started","thread_id":"abc"}',
'{"type":"item.completed","item":{"type":"agent_message","text":"{\\"verdict\\":\\"correct\\",\\"confidence\\":0.9,\\"summary\\":\\"ok\\",\\"findings\\":[]}"}}',
]
)
parsed = _parse_codex_exec_stdout(stdout)
assert parsed["verdict"] == "correct"
assert parsed["summary"] == "ok"
def test_parse_review_result_from_stdout_artifact_fails_without_markers() -> None:
with pytest.raises(RuntimeError):
_parse_review_result_from_stdout_artifact(
"no markers here",
result_start_marker=f"{RESULT_START_MARKER}_x",
result_end_marker=f"{RESULT_END_MARKER}_x",
)
def test_build_exec_review_prompt_strips_mention_and_command() -> None:
prompt = _build_exec_review_prompt(
ParsedCommand(name="review", raw="@codex review security --full\nfocus session handling"),
RepoReviewConfig(),
_sample_pr(),
def test_parse_codex_exec_stdout_from_fenced_json_text() -> None:
stdout = '\n'.join(
[
'{"type":"thread.started","thread_id":"abc"}',
'{"type":"item.completed","item":{"type":"agent_message","text":"Here is the result:\\n```json\\n{\\"verdict\\":\\"has_issues\\",\\"confidence\\":0.8,\\"summary\\":\\"x\\",\\"findings\\":[]}\\n```"}}',
]
)
assert prompt.startswith("review: security --full\nfocus session handling")
assert "Compare exactly these commits:" in prompt
def test_build_exec_review_prompt_falls_back_when_no_extra_text() -> None:
prompt = _build_exec_review_prompt(ParsedCommand(name="rerun", raw="@codex rerun"), RepoReviewConfig(), _sample_pr())
assert prompt.startswith("review: review this pull request and report introduced issues.")
def test_build_exec_review_prompt_disables_test_execution_by_default() -> None:
prompt = _build_exec_review_prompt(ParsedCommand(name="review", raw="@codex review"), RepoReviewConfig(), _sample_pr())
assert "Do not run tests, benchmarks, or other executables." in prompt
def test_build_exec_review_prompt_allows_test_execution_for_tests_mode() -> None:
prompt = _build_exec_review_prompt(
ParsedCommand(name="review", raw="@codex review tests", mode="tests", mode_explicit=True),
RepoReviewConfig(),
_sample_pr(),
)
assert "Tests may be executed for this run" in prompt
def test_apply_repo_default_review_mode_uses_full_when_not_configured() -> None:
command = ParsedCommand(name="review", raw="@codex review")
cfg = RepoReviewConfig()
_apply_repo_default_review_mode(command, cfg)
assert command.mode == "full"
def test_apply_repo_default_review_mode_for_review_command() -> None:
command = ParsedCommand(name="review", raw="@codex review")
cfg = RepoReviewConfig(default_mode="tests")
_apply_repo_default_review_mode(command, cfg)
assert command.mode == "tests"
def test_parse_review_result_from_stdout_artifact_uses_end_marker_after_start() -> None:
stdout = (
f"{RESULT_START_MARKER}_a\n"
'{"verdict":"correct","confidence":0.9,"summary":"contains marker text __CODEX_REVIEW_RESULT_END___a","findings":[],"markdown_comment":"ok"}\n'
f"{RESULT_END_MARKER}_a\n"
)
parsed = _parse_review_result_from_stdout_artifact(
stdout,
result_start_marker=f"{RESULT_START_MARKER}_a",
result_end_marker=f"{RESULT_END_MARKER}_a",
)
assert parsed["verdict"] == "correct"
def test_parse_review_result_from_stdout_artifact_handles_inline_end_marker() -> None:
stdout = (
"noise\n"
f"{RESULT_START_MARKER}_a\n"
'{"verdict":"correct","confidence":0.9,"summary":"ok","findings":[],"markdown_comment":"ok"}'
f"{RESULT_END_MARKER}_a\n"
)
parsed = _parse_review_result_from_stdout_artifact(
stdout,
result_start_marker=f"{RESULT_START_MARKER}_a",
result_end_marker=f"{RESULT_END_MARKER}_a",
)
assert parsed["verdict"] == "correct"
assert parsed["summary"] == "ok"
parsed = _parse_codex_exec_stdout(stdout)
assert parsed["verdict"] == "has_issues"
assert parsed["summary"] == "x"
def test_extract_result_meta_from_codex_stdout_collects_model_and_usage() -> None:
settings = get_settings()
stdout = "\n".join(
stdout = '\n'.join(
[
'{"type":"response.started","model":"gpt-5.3-codex"}',
'{"type":"response.completed","response":{"usage":{"input_tokens":101,"output_tokens":22,"total_tokens":123}}}',

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from types import SimpleNamespace
import httpx
from sqlalchemy import select
from gitea_codex_bot.config import get_settings
@@ -14,8 +15,7 @@ from gitea_codex_bot.types import ParsedCommand
from gitea_codex_bot.workers.dispatcher import process_one_job
def test_process_one_job_always_posts_new_review_comment(monkeypatch) -> None:
posted_ids: list[int] = []
def test_process_one_job_recreates_persistent_comment_when_edit_returns_404(monkeypatch) -> None:
session_factory = get_session_factory()
with session_factory() as session:
job = enqueue_job(
@@ -39,27 +39,37 @@ def test_process_one_job_always_posts_new_review_comment(monkeypatch) -> None:
monkeypatch.setattr(
"gitea_codex_bot.workers.dispatcher.run_review_ephemeral",
lambda *_args, **_kwargs: (
{"verdict": "has_issues", "confidence": 0.7, "summary": "runner error", "findings": []},
{
"verdict": "has_issues",
"confidence": 0.7,
"summary": "runner error",
"findings": [],
},
RepoReviewConfig(configured=True, enabled=True),
),
)
class _FakeGiteaClient:
def __init__(self, _settings) -> None:
pass
self.posted_comment_id = 0
def get_pull_request(self, _repo: str, _pr_number: int):
return SimpleNamespace(is_fork=False)
def edit_issue_comment(self, _repo: str, _comment_id: int, _body: str) -> int:
request = httpx.Request("PATCH", "https://gitea.test/api/v1/repos/acme/repo/issues/comments/289")
response = httpx.Response(404, request=request, text='{"message":"not found"}')
raise httpx.HTTPStatusError("not found", request=request, response=response)
def post_issue_comment(self, _repo: str, _pr_number: int, _body: str) -> int:
new_id = 990
posted_ids.append(new_id)
return new_id
self.posted_comment_id = 990
return self.posted_comment_id
monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.GiteaClient", _FakeGiteaClient)
assert process_one_job(get_settings()) is True
assert posted_ids == [990]
settings = get_settings()
processed = process_one_job(settings)
assert processed is True
with session_factory() as session:
persisted_comment_id = get_persistent_review_comment_id(session, "acme/repo", 9)
@@ -97,10 +107,15 @@ def test_process_one_job_passes_full_trigger_message_to_runner(monkeypatch) -> N
def post_issue_comment(self, _repo: str, _pr_number: int, _body: str) -> int:
return 901
def edit_issue_comment(self, _repo: str, _comment_id: int, _body: str) -> int:
return _comment_id
monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.run_review_ephemeral", _fake_run_review_ephemeral)
monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.GiteaClient", _FakeGiteaClient)
assert process_one_job(get_settings()) is True
settings = get_settings()
processed = process_one_job(settings)
assert processed is True
assert captured["raw"] == "@codex review security --full\nFocus auth/session handling."
@@ -140,50 +155,11 @@ def test_process_one_job_skips_review_when_repo_config_disabled(monkeypatch) ->
monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.GiteaClient", _FakeGiteaClient)
assert process_one_job(get_settings()) is True
settings = get_settings()
processed = process_one_job(settings)
assert processed is True
assert any("Review is disabled" in body for body in posted_comments)
with session_factory() as session:
stored_job = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
assert stored_job.status.value == "skipped"
def test_process_one_job_help_command_posts_summary(monkeypatch) -> None:
posted_comments: list[str] = []
session_factory = get_session_factory()
with session_factory() as session:
enqueue_job(
session,
repo="acme/repo",
pr_number=12,
head_sha="abc12345",
trigger_comment_id=114,
trigger_comment_body="@codex -h",
requested_by="alice",
command=ParsedCommand(name="help", raw="@codex -h"),
)
class _FakeGiteaClient:
def __init__(self, _settings) -> None:
pass
def list_issue_comments(self, _repo: str, _pr_number: int):
return [
{"body": "Please check auth edge cases", "user": {"username": "alice"}},
{"body": "On it, running review now.", "user": {"username": "codex-bot"}},
]
def post_issue_comment(self, _repo: str, _pr_number: int, body: str) -> int:
posted_comments.append(body)
return 903
monkeypatch.setattr("gitea_codex_bot.workers.dispatcher.GiteaClient", _FakeGiteaClient)
assert process_one_job(get_settings()) is True
assert posted_comments
body = posted_comments[0]
assert "## Codex Help" in body
assert "@codex -h" in body
assert "@codex fix" not in body
assert "Discussion summary" in body
assert "@alice: Please check auth edge cases" in body

View File

@@ -6,25 +6,6 @@ from gitea_codex_bot.config import get_settings
from gitea_codex_bot.main import _validate_required_env
@pytest.mark.parametrize(
("env_name", "env_value", "error_text"),
[
("GITEA_WEBHOOK_SECRET", " ", "GITEA_WEBHOOK_SECRET is required"),
("GITEA_TOKEN", " ", "GITEA_TOKEN is required"),
("ALLOWED_REPOS", " ", "ALLOWED_REPOS is required"),
],
)
def test_validate_required_env_fails_on_blank_required_settings(
monkeypatch: pytest.MonkeyPatch, env_name: str, env_value: str, error_text: str
) -> None:
monkeypatch.setenv(env_name, env_value)
get_settings.cache_clear()
settings = get_settings()
with pytest.raises(RuntimeError, match=error_text):
_validate_required_env(settings)
def test_validate_required_env_requires_api_key_in_api_key_mode(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("OPENAI_API_KEY", "")
monkeypatch.setenv("CODEX_AUTH_MODE", "api_key")

View File

@@ -2,9 +2,7 @@ from __future__ import annotations
from fastapi.testclient import TestClient
from gitea_codex_bot.db import get_session_factory
from gitea_codex_bot.main import app
from gitea_codex_bot.models import JobStatus, ReviewJob
def test_root_returns_tailwind_landing_page() -> None:
@@ -17,10 +15,8 @@ def test_root_returns_tailwind_landing_page() -> None:
assert "Gitea Codex Review Bot" in response.text
assert "cdn.tailwindcss.com" in response.text
assert 'id="health-button"' in response.text
assert 'id="failure-button"' in response.text
assert 'id="health-modal"' in response.text
assert 'fetch("/healthz"' in response.text
assert 'fetch("/healthz/latest-failure"' in response.text
def test_404_returns_tailwind_page_for_browser_requests() -> None:
@@ -42,104 +38,3 @@ def test_404_returns_json_for_non_browser_requests() -> None:
assert response.status_code == 404
assert response.headers["content-type"].startswith("application/json")
assert response.json() == {"detail": "Not Found"}
def test_healthz_latest_failure_returns_empty_when_no_failed_jobs() -> None:
client = TestClient(app)
response = client.get("/healthz/latest-failure")
assert response.status_code == 200
assert response.json() == {"status": "ok", "has_failed_job": False}
def test_healthz_latest_failure_returns_latest_failed_job() -> None:
session_factory = get_session_factory()
with session_factory() as session:
first = ReviewJob(
repo="acme/repo",
pr_number=1,
head_sha="1111111",
trigger_comment_id=3001,
trigger_comment_body="@codex review",
command="review",
requested_by="alice",
status=JobStatus.failed,
last_error="first error",
)
second = ReviewJob(
repo="acme/repo",
pr_number=2,
head_sha="2222222",
trigger_comment_id=3002,
trigger_comment_body="@codex rerun",
command="rerun",
requested_by="bob",
status=JobStatus.failed,
last_error="second error",
)
session.add(first)
session.add(second)
session.commit()
client = TestClient(app)
response = client.get("/healthz/latest-failure")
assert response.status_code == 200
payload = response.json()
assert payload["status"] == "ok"
assert payload["has_failed_job"] is True
assert payload["repo"] == "acme/repo"
assert payload["pr_number"] == 2
assert payload["command"] == "rerun"
assert payload["head_sha"] == "2222222"
assert payload["error"] == "second error"
def test_healthz_latest_job_returns_empty_when_no_jobs() -> None:
client = TestClient(app)
response = client.get("/healthz/latest-job")
assert response.status_code == 200
assert response.json() == {"status": "ok", "has_job": False}
def test_healthz_latest_job_returns_latest_job_details() -> None:
session_factory = get_session_factory()
with session_factory() as session:
first = ReviewJob(
repo="acme/repo",
pr_number=3,
head_sha="3333333",
trigger_comment_id=3003,
trigger_comment_body="@codex review",
command="review",
requested_by="alice",
status=JobStatus.succeeded,
result_json={"summary": "first summary"},
)
second = ReviewJob(
repo="acme/repo",
pr_number=4,
head_sha="4444444",
trigger_comment_id=3004,
trigger_comment_body="@codex rerun",
command="rerun",
requested_by="bob",
status=JobStatus.failed,
last_error="failed later",
result_json={"summary": "second summary"},
)
session.add(first)
session.add(second)
session.commit()
client = TestClient(app)
response = client.get("/healthz/latest-job")
assert response.status_code == 200
payload = response.json()
assert payload["status"] == "ok"
assert payload["has_job"] is True
assert payload["repo"] == "acme/repo"
assert payload["pr_number"] == 4
assert payload["command"] == "rerun"
assert payload["head_sha"] == "4444444"
assert payload["job_status"] == "failed"
assert payload["error"] == "failed later"
assert payload["result_summary"] == "second summary"

View File

@@ -1,8 +0,0 @@
from gitea_codex_bot.services.repo_config import parse_repo_review_config_text
def test_parse_repo_review_config_defaults_to_full_and_no_tests() -> None:
cfg = parse_repo_review_config_text("enabled: true\n", configured=True)
assert cfg.default_mode == "full"
assert cfg.include_tests is False

View File

@@ -3,33 +3,19 @@ from __future__ import annotations
from gitea_codex_bot.services.review_format import format_result_comment
def test_format_result_comment_appends_structured_details_to_markdown_comment() -> None:
def test_format_result_comment_uses_markdown_comment_verbatim_with_marker() -> None:
body = format_result_comment(
"abc1234",
{
"verdict": "has_issues",
"verdict": "correct",
"confidence": 0.9,
"summary": "2 issues detected.",
"findings": [
{
"severity": "high",
"file": "src/app.py",
"line_start": 20,
"line_end": 22,
"title": "Unsafe command execution",
"body": "User input is passed directly into shell=True.",
"suggestion": "Use a fixed argument list and avoid shell=True.",
}
],
"markdown_comment": "## Codex Review\n\nShort agent message only.",
"summary": "ignored when markdown_comment exists",
"findings": [],
"markdown_comment": "## Codex Review\n\nAll good.\n\nNo issues found.",
},
)
assert body.startswith("<!-- codex-review:head_sha=abc1234 -->\n## Codex Review")
assert "Short agent message only." in body
assert "### Structured Findings" in body
assert "2 issues detected." in body
assert "`src/app.py:20-22` (high)" in body
assert "Unsafe command execution" in body
assert "All good.\n\nNo issues found." in body
def test_format_result_comment_replaces_existing_marker() -> None:
@@ -82,10 +68,10 @@ def test_format_result_comment_appends_missing_config_note_for_system_layout() -
},
repo_configured=False,
)
assert body.endswith("> .codex-review.yml is not configured")
assert body.endswith(".codex-review.yml is not configured")
def test_format_result_comment_appends_missing_config_note_to_agent_markdown() -> None:
def test_format_result_comment_does_not_append_missing_config_note_to_agent_markdown() -> None:
body = format_result_comment(
"ff0011",
{
@@ -93,4 +79,4 @@ def test_format_result_comment_appends_missing_config_note_to_agent_markdown() -
},
repo_configured=False,
)
assert body.endswith("> .codex-review.yml is not configured")
assert ".codex-review.yml is not configured" not in body

View File

@@ -0,0 +1,87 @@
from __future__ import annotations
import httpx
from gitea_codex_bot.config import get_settings
from gitea_codex_bot.services.repo_config import RepoReviewConfig
from gitea_codex_bot.services.reviewer import _build_prompt, _fallback_review, prepare_review_prompt, run_review_for_pr
from gitea_codex_bot.types import ParsedCommand
def test_fallback_review_surfaces_failure_reason() -> None:
result = _fallback_review({"diff": ""}, failure_reason="OpenAI API HTTP 401: invalid_api_key")
assert result["verdict"] == "has_issues"
assert result["summary"] == "OpenAI review failed. Error: OpenAI API HTTP 401: invalid_api_key"
assert result["findings"][0]["title"] == "OpenAI review request failed"
assert result["findings"][0]["body"] == "OpenAI API HTTP 401: invalid_api_key"
def test_run_review_for_pr_uses_openai_http_error_in_fallback(monkeypatch) -> None:
def _fake_prepare(*_args, **_kwargs):
return "prompt", {"diff": "TODO: tighten validation"}, RepoReviewConfig()
def _raise_http_error(*_args, **_kwargs):
request = httpx.Request("POST", "https://api.openai.com/v1/responses")
response = httpx.Response(429, request=request, text='{"error":{"message":"rate_limited"}}')
raise httpx.HTTPStatusError("rate limited", request=request, response=response)
monkeypatch.setattr("gitea_codex_bot.services.reviewer.prepare_review_prompt", _fake_prepare)
monkeypatch.setattr("gitea_codex_bot.services.reviewer._call_openai_review", _raise_http_error)
settings = get_settings()
command = ParsedCommand(name="review", raw="@codex review")
result, _repo_cfg = run_review_for_pr(settings, object(), "acme/repo", 9, command)
assert result["summary"].startswith("OpenAI review failed. Error: OpenAI API HTTP 429:")
assert result["findings"][0]["title"] == "OpenAI review request failed"
assert "rate_limited" in result["findings"][0]["body"]
assert any(finding["title"] == "TODO marker in diff" for finding in result["findings"])
def test_build_prompt_includes_trigger_message() -> None:
pr = type("PR", (), {"html_url": "https://gitea.example/pr/1"})()
command = ParsedCommand(name="review", raw="@codex review security\nPlease focus auth.")
diff_context = {"truncated": False, "changed_files": ["app.py"], "diff": "diff --git a/app.py b/app.py"}
repo_cfg = RepoReviewConfig()
prompt = _build_prompt(
pr,
command,
diff_context,
repo_cfg,
changed_file_contents="",
test_output=None,
)
assert "Trigger message: @codex review security\nPlease focus auth." in prompt
def test_prepare_review_prompt_applies_repo_default_mode_when_command_mode_not_explicit(monkeypatch, tmp_path) -> None:
repo_dir = tmp_path / "repo"
repo_dir.mkdir(parents=True, exist_ok=True)
(repo_dir / ".codex-review.yml").write_text("review:\n default_mode: tests\n", encoding="utf-8")
pr = type(
"PR",
(),
{
"base_sha": "b" * 40,
"head_sha": "a" * 40,
"html_url": "https://gitea.example/pr/1",
},
)()
monkeypatch.setattr("gitea_codex_bot.services.reviewer.checkout_pr", lambda *_args, **_kwargs: repo_dir)
monkeypatch.setattr(
"gitea_codex_bot.services.reviewer.collect_diff_context",
lambda *_args, **_kwargs: {"diff": "", "changed_files": [], "truncated": False},
)
settings = get_settings()
gitea = type("GiteaStub", (), {"get_pull_request": lambda *_args, **_kwargs: pr})()
command = ParsedCommand(name="review", raw="@codex review")
prompt, _diff, _cfg = prepare_review_prompt(settings, gitea, "acme/repo", 9, command)
assert "Mode: tests" in prompt

View File

@@ -1,12 +1,12 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from gitea_codex_bot.db import get_session_factory
from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job, recover_stuck_running_jobs
from datetime import datetime, timedelta, timezone
from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus
from gitea_codex_bot.services.jobs import claim_next_job, enqueue_job, finish_job, recover_stuck_jobs
from gitea_codex_bot.types import ParsedCommand
@@ -39,120 +39,133 @@ def test_claim_and_transition() -> None:
assert loaded.result_json is not None
def test_failed_job_retries_then_fails_terminally() -> None:
def test_claim_recovers_stuck_running_job_by_requeue() -> None:
session_factory = get_session_factory()
with session_factory() as session:
job = enqueue_job(
first = enqueue_job(
session,
repo="acme/repo",
pr_number=271,
head_sha="f00dbabe",
trigger_comment_id=9902,
pr_number=1,
head_sha="aaaabbbb",
trigger_comment_id=1001,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
# First attempt fails => requeue.
with session_factory() as session:
claimed = claim_next_job(session)
assert claimed is not None
finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message="boom-1")
with session_factory() as session:
loaded = session.get(ReviewJob, job.id)
assert loaded is not None
assert loaded.status == JobStatus.queued
assert loaded.started_at is None
assert loaded.finished_at is None
# Second attempt fails => requeue.
with session_factory() as session:
claimed = claim_next_job(session)
assert claimed is not None
finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message="boom-2")
with session_factory() as session:
loaded = session.get(ReviewJob, job.id)
assert loaded is not None
assert loaded.status == JobStatus.queued
# Third attempt fails => terminal failed (max 2 retries exhausted).
with session_factory() as session:
claimed = claim_next_job(session)
assert claimed is not None
finish_job(session, job_id=job.id, success=False, skipped=False, result=None, error_message="boom-3")
with session_factory() as session:
loaded = session.get(ReviewJob, job.id)
assert loaded is not None
assert loaded.status == JobStatus.failed
assert loaded.finished_at is not None
def test_recover_stuck_running_job_requeues_before_retry_limit() -> None:
session_factory = get_session_factory()
with session_factory() as session:
job = enqueue_job(
second = enqueue_job(
session,
repo="acme/repo",
pr_number=272,
head_sha="feedface",
trigger_comment_id=9903,
pr_number=2,
head_sha="ccccdddd",
trigger_comment_id=1002,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
with session_factory() as session:
claimed = claim_next_job(session)
assert claimed is not None
stale_start = datetime.now(timezone.utc) - timedelta(minutes=6)
db_job = session.get(ReviewJob, job.id)
assert db_job is not None
db_job.started_at = stale_start
assert claimed.id == first.id
with session_factory() as session:
stuck = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
stuck.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
recovered = recover_stuck_running_jobs(session, lease_timeout_seconds=300, max_retries=2)
assert recovered == 1
outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(outcomes) == 1
assert outcomes[0].failed is False
recovered = claim_next_job(session)
assert recovered is not None
assert recovered.id == first.id
assert recovered.status == JobStatus.running
with session_factory() as session:
db_job = session.get(ReviewJob, job.id)
assert db_job is not None
assert db_job.status == JobStatus.queued
assert db_job.started_at is None
latest_run = session.execute(select(ReviewRun).where(ReviewRun.job_id == job.id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one()
assert latest_run.status.value == "failed"
assert latest_run.error_message is not None
assert "timed out" in latest_run.error_message
stale = session.execute(select(ReviewJob).where(ReviewJob.id == first.id)).scalar_one()
assert stale.last_error is not None
assert "lease timed out" in stale.last_error
failed_runs = session.execute(
select(ReviewRun).where(ReviewRun.job_id == first.id, ReviewRun.status == RunStatus.failed)
).scalars()
assert len(list(failed_runs)) >= 1
queued_later = session.execute(select(ReviewJob).where(ReviewJob.id == second.id)).scalar_one()
assert queued_later.status in (JobStatus.queued, JobStatus.running)
def test_recover_stuck_running_job_fails_after_retry_limit() -> None:
def test_claim_recovers_stuck_running_job_by_fail() -> None:
session_factory = get_session_factory()
with session_factory() as session:
stuck_job = enqueue_job(
session,
repo="acme/repo",
pr_number=3,
head_sha="eeeeffff",
trigger_comment_id=1003,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
claimed = claim_next_job(session)
assert claimed is not None
assert claimed.id == stuck_job.id
with session_factory() as session:
stale = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
outcomes = recover_stuck_jobs(session, lease_timeout_seconds=300, action="fail", max_retries=1)
assert len(outcomes) == 1
assert outcomes[0].failed is True
no_job = claim_next_job(session)
assert no_job is None
with session_factory() as session:
failed = session.execute(select(ReviewJob).where(ReviewJob.id == stuck_job.id)).scalar_one()
assert failed.status == JobStatus.failed
assert failed.finished_at is not None
def test_requeue_allows_one_retry_then_fails_on_second_timeout() -> None:
session_factory = get_session_factory()
with session_factory() as session:
job = enqueue_job(
session,
repo="acme/repo",
pr_number=273,
head_sha="deadc0de",
trigger_comment_id=9904,
pr_number=4,
head_sha="11112222",
trigger_comment_id=1004,
trigger_comment_body="@codex review",
requested_by="alice",
command=ParsedCommand(name="review", raw="@codex review"),
)
# Build up to third running attempt to hit retry limit when it times out.
for _ in range(3):
with session_factory() as session:
claimed = claim_next_job(session)
assert claimed is not None
db_job = session.get(ReviewJob, job.id)
assert db_job is not None
db_job.started_at = datetime.now(timezone.utc) - timedelta(minutes=6)
session.commit()
with session_factory() as session:
recover_stuck_running_jobs(session, lease_timeout_seconds=300, max_retries=2)
claimed = claim_next_job(session)
assert claimed is not None
assert claimed.id == job.id
with session_factory() as session:
db_job = session.get(ReviewJob, job.id)
assert db_job is not None
assert db_job.status == JobStatus.failed
assert db_job.finished_at is not None
stale = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
stale.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
first = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(first) == 1
assert first[0].failed is False
claimed_again = claim_next_job(session)
assert claimed_again is not None
assert claimed_again.id == job.id
with session_factory() as session:
stale_again = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
stale_again.started_at = datetime.now(timezone.utc) - timedelta(seconds=601)
session.commit()
with session_factory() as session:
second = recover_stuck_jobs(session, lease_timeout_seconds=300, action="requeue", max_retries=1)
assert len(second) == 1
assert second[0].failed is True
failed = session.execute(select(ReviewJob).where(ReviewJob.id == job.id)).scalar_one()
assert failed.status == JobStatus.failed

View File

@@ -93,84 +93,6 @@ def test_webhook_accepts_review_and_queues(monkeypatch) -> None:
assert queued.trigger_comment_body == "@codex review security"
def test_webhook_accepts_review_for_bot_username_alias(monkeypatch) -> None:
posted_comments: list[str] = []
def _post_issue_comment(self, repo: str, pr_number: int, body: str) -> int:
posted_comments.append(body)
return 100
monkeypatch.setattr("gitea_codex_bot.services.gitea.GiteaClient.post_issue_comment", _post_issue_comment)
monkeypatch.setattr(
"gitea_codex_bot.services.gitea.GiteaClient.get_pull_request",
lambda *_args, **_kwargs: type("PR", (), {"head_sha": "abcdef123"})(),
)
monkeypatch.setattr("gitea_codex_bot.services.gitea.GiteaClient.get_file_content", lambda *_args, **_kwargs: None)
client = TestClient(app)
payload_obj = _payload("@codex-bot review security", username="alice", comment_id=311)
raw = json.dumps(payload_obj).encode()
response = client.post(
"/webhook/gitea",
content=raw,
headers={
"X-Gitea-Event": "issue_comment",
"X-Gitea-Delivery": "d-2-alias",
"X-Gitea-Signature": _sign(raw),
"Content-Type": "application/json",
},
)
assert response.status_code == 200
assert response.json()["status"] == "queued"
assert posted_comments
session_factory = get_session_factory()
with session_factory() as session:
queued = session.execute(select(ReviewJob).where(ReviewJob.trigger_comment_id == 311)).scalar_one()
assert queued.trigger_comment_body == "@codex-bot review security"
def test_webhook_uses_latest_pr_head_sha_when_config_lookup_fails(monkeypatch) -> None:
posted_comments: list[str] = []
def _post_issue_comment(self, repo: str, pr_number: int, body: str) -> int:
posted_comments.append(body)
return 100
monkeypatch.setattr("gitea_codex_bot.services.gitea.GiteaClient.post_issue_comment", _post_issue_comment)
monkeypatch.setattr(
"gitea_codex_bot.services.gitea.GiteaClient.get_pull_request",
lambda *_args, **_kwargs: type("PR", (), {"head_sha": "newsha123"})(),
)
monkeypatch.setattr(
"gitea_codex_bot.services.gitea.GiteaClient.get_file_content",
lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("config unavailable")),
)
client = TestClient(app)
payload_obj = _payload("@codex review", username="alice", comment_id=112)
payload_obj["pull_request"]["head"]["sha"] = "oldsha999"
raw = json.dumps(payload_obj).encode()
response = client.post(
"/webhook/gitea",
content=raw,
headers={
"X-Gitea-Event": "issue_comment",
"X-Gitea-Delivery": "d-2b",
"X-Gitea-Signature": _sign(raw),
"Content-Type": "application/json",
},
)
assert response.status_code == 200
assert response.json()["status"] == "queued"
assert any("`newsha1`" in body for body in posted_comments)
session_factory = get_session_factory()
with session_factory() as session:
queued = session.execute(select(ReviewJob).where(ReviewJob.trigger_comment_id == 112)).scalar_one()
assert queued.head_sha == "newsha123"
def test_webhook_logs_when_no_codex_review_command(monkeypatch) -> None:
messages: list[str] = []
@@ -225,123 +147,6 @@ def test_webhook_logs_when_codex_command_is_not_review(monkeypatch) -> None:
assert any("Webhook without @codex review command" in item for item in messages)
def test_webhook_accepts_help_short_flag_and_queues(monkeypatch) -> None:
monkeypatch.setattr(
"gitea_codex_bot.services.gitea.GiteaClient.get_pull_request",
lambda *_args, **_kwargs: type("PR", (), {"head_sha": "abcdef123"})(),
)
client = TestClient(app)
payload_obj = _payload("@codex -h", username="alice", comment_id=333)
raw = json.dumps(payload_obj).encode()
response = client.post(
"/webhook/gitea",
content=raw,
headers={
"X-Gitea-Event": "issue_comment",
"X-Gitea-Delivery": "d-help-1",
"X-Gitea-Signature": _sign(raw),
"Content-Type": "application/json",
},
)
assert response.status_code == 200
assert response.json()["status"] == "queued"
session_factory = get_session_factory()
with session_factory() as session:
queued = session.execute(select(ReviewJob).where(ReviewJob.trigger_comment_id == 333)).scalar_one()
assert queued.command == "help"
def test_webhook_replies_fix_is_no_longer_supported(monkeypatch) -> None:
posted_comments: list[str] = []
monkeypatch.setattr(
"gitea_codex_bot.services.gitea.GiteaClient.post_issue_comment",
lambda _self, _repo, _pr, body: posted_comments.append(body) or 100,
)
client = TestClient(app)
payload_obj = _payload("@codex fix --branch", username="alice", comment_id=444)
raw = json.dumps(payload_obj).encode()
response = client.post(
"/webhook/gitea",
content=raw,
headers={
"X-Gitea-Event": "issue_comment",
"X-Gitea-Delivery": "d-fix-unsupported",
"X-Gitea-Signature": _sign(raw),
"Content-Type": "application/json",
},
)
assert response.status_code == 200
assert response.json()["reason"] == "unsupported command"
assert response.json()["command"] == "fix"
assert any("no longer supported" in body for body in posted_comments)
session_factory = get_session_factory()
with session_factory() as session:
queued = session.execute(select(ReviewJob).where(ReviewJob.trigger_comment_id == 444)).scalar_one_or_none()
assert queued is None
def test_webhook_replies_for_unknown_prefixed_command(monkeypatch) -> None:
posted_comments: list[str] = []
monkeypatch.setattr(
"gitea_codex_bot.services.gitea.GiteaClient.post_issue_comment",
lambda _self, _repo, _pr, body: posted_comments.append(body) or 100,
)
client = TestClient(app)
payload_obj = _payload("@codex deploy", username="alice", comment_id=445)
raw = json.dumps(payload_obj).encode()
response = client.post(
"/webhook/gitea",
content=raw,
headers={
"X-Gitea-Event": "issue_comment",
"X-Gitea-Delivery": "d-unknown-unsupported",
"X-Gitea-Signature": _sign(raw),
"Content-Type": "application/json",
},
)
assert response.status_code == 200
assert response.json()["reason"] == "unsupported command"
assert response.json()["command"] == "deploy"
assert any("not supported" in body for body in posted_comments)
def test_webhook_logs_when_repo_not_allowed(monkeypatch) -> None:
messages: list[str] = []
def _log_info(message: str, *args, **_kwargs) -> None:
messages.append(message % args if args else message)
monkeypatch.setattr("gitea_codex_bot.main.logger.info", _log_info)
client = TestClient(app)
payload_obj = _payload("@codex review", username="alice", comment_id=225)
payload_obj["repository"]["full_name"] = "acme/not-allowed"
raw = json.dumps(payload_obj).encode()
response = client.post(
"/webhook/gitea",
content=raw,
headers={
"X-Gitea-Event": "issue_comment",
"X-Gitea-Delivery": "d-6",
"X-Gitea-Signature": _sign(raw),
"Content-Type": "application/json",
},
)
assert response.status_code == 200
assert response.json()["reason"] == "repo not allowed"
assert any("Webhook ignored: repo not in ALLOWED_REPOS" in item for item in messages)
def test_webhook_rejects_review_when_repo_config_disabled(monkeypatch) -> None:
posted_comments: list[str] = []