Files
gitea-codex/src/gitea_codex_bot/services/reviewer.py
Space-Banane c3925f37e1
All checks were successful
ci / test (pull_request) Successful in 35s
ci / publish (pull_request) Has been skipped
Resolve Runner issue, runner refused to launch
2026-05-23 13:53:26 +02:00

349 lines
14 KiB
Python

from __future__ import annotations
import json
import os
import shlex
import subprocess
from fnmatch import fnmatch
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
import httpx
from gitea_codex_bot.config import Settings
from gitea_codex_bot.services.gitea import GiteaClient, PullRequestContext
from gitea_codex_bot.services.repo_config import RepoReviewConfig, load_repo_review_config
from gitea_codex_bot.types import ParsedCommand
class ReviewError(RuntimeError):
pass
def _run_git(args: list[str], cwd: Path | None = None) -> str:
completed = subprocess.run(["git", *args], cwd=cwd, check=True, capture_output=True, text=True)
return completed.stdout
def checkout_pr(tmpdir: Path, pr: PullRequestContext) -> Path:
repo_dir = tmpdir / "repo"
_run_git(["clone", "--no-tags", "--depth", "50", pr.clone_url, str(repo_dir)])
_run_git(["fetch", "origin", pr.base_ref, pr.head_ref], cwd=repo_dir)
_run_git(["checkout", pr.head_sha], cwd=repo_dir)
return repo_dir
def collect_diff_context(repo_dir: Path, pr: PullRequestContext, max_diff_bytes: int) -> dict[str, Any]:
diff = _run_git(["diff", f"{pr.base_sha}...{pr.head_sha}"], cwd=repo_dir)
changed_files_raw = _run_git(["diff", "--name-only", f"{pr.base_sha}...{pr.head_sha}"], cwd=repo_dir)
changed_files = [line.strip() for line in changed_files_raw.splitlines() if line.strip()]
truncated = False
if len(diff.encode("utf-8")) > max_diff_bytes:
diff = diff.encode("utf-8")[:max_diff_bytes].decode("utf-8", errors="ignore")
truncated = True
return {"diff": diff, "changed_files": changed_files, "truncated": truncated}
def _apply_ignore_patterns(changed_files: list[str], ignore_patterns: list[str]) -> list[str]:
if not ignore_patterns:
return changed_files
kept: list[str] = []
for path in changed_files:
if any(fnmatch(path, pattern) for pattern in ignore_patterns):
continue
kept.append(path)
return kept
def _collect_changed_file_contents(repo_dir: Path, changed_files: list[str], max_total_bytes: int) -> str:
chunks: list[str] = []
total = 0
for rel in changed_files:
path = repo_dir / rel
if not path.exists() or not path.is_file():
continue
try:
content = path.read_text(encoding="utf-8", errors="ignore")
except OSError:
continue
block = f"\n### {rel}\n{content}\n"
block_bytes = len(block.encode("utf-8"))
if total + block_bytes > max_total_bytes:
break
chunks.append(block)
total += block_bytes
return "".join(chunks).strip()
def _collect_test_output(repo_dir: Path, timeout_seconds: int) -> str:
try:
completed = subprocess.run(
["pytest", "-q"],
cwd=repo_dir,
capture_output=True,
text=True,
timeout=timeout_seconds,
check=False,
)
output = (completed.stdout + "\n" + completed.stderr).strip()
return output[:10000]
except Exception as exc:
return f"Test execution unavailable: {exc}"
def _redact_secrets_from_diff(diff: str) -> str:
secret_terms = ("api_key", "token", "secret", "password", "private_key", "-----begin")
redacted_lines: list[str] = []
for line in diff.splitlines():
lower = line.lower()
if any(term in lower for term in secret_terms):
redacted_lines.append("[REDACTED_POTENTIAL_SECRET]")
else:
redacted_lines.append(line)
return "\n".join(redacted_lines)
def _build_prompt(
pr: PullRequestContext,
command: ParsedCommand,
diff_context: dict[str, Any],
repo_cfg: RepoReviewConfig,
*,
changed_file_contents: str,
test_output: str | None,
) -> str:
mode = command.mode if command.name in {"review", "rerun"} else "summary"
changed_files = diff_context.get("changed_files") or []
changed_files_section = os.linesep.join(changed_files) if changed_files else "(none)"
unified_diff = str(diff_context.get("diff", ""))
unified_diff_section = unified_diff if unified_diff.strip() else "(empty)"
return (
"You are reviewing a Gitea pull request.\n\n"
"Focus only on issues introduced by this PR.\n"
"Prioritize correctness, security, data loss, broken behavior, bad migrations, and missing tests.\n"
"Avoid style nitpicks.\n\n"
"You do not have internet/network access. Do not try to fetch URLs.\n"
"Use only the PR metadata, changed files, diff, and optional file/test content included below.\n"
"Never claim that PR content is inaccessible or missing if these sections are present.\n"
"If the changed-file list is `(none)` and unified diff is `(empty)`, treat this as a no-op PR and explain that no code changes were detected.\n\n"
"Return JSON only with schema:\n"
"{\n"
' "verdict": "correct" | "has_issues",\n'
' "confidence": 0.0,\n'
' "summary": "...",\n'
' "findings": [{"severity":"low|medium|high|critical","file":"...","line_start":1,"line_end":1,"title":"...","body":"...","suggestion":"..."}],\n'
' "markdown_comment": "Full markdown comment body to post to Gitea. Include clear section breaks and blank lines."\n'
"}\n\n"
f"PR URL: {pr.html_url}\n"
f"Mode: {mode}\n"
f"Trigger message: {command.raw}\n"
f"Repo focus: {', '.join(repo_cfg.focus)}\n"
f"Diff truncated: {diff_context['truncated']}\n"
f"Changed files:\n{changed_files_section}\n\n"
f"Unified diff:\n{unified_diff_section}\n\n"
f"Changed file content (optional):\n{changed_file_contents or '(not included)'}\n\n"
f"Test output (optional):\n{test_output or '(not included)'}\n"
)
def _call_openai_review(settings: Settings, prompt: str) -> dict[str, Any]:
api_key = settings.openai_api_key.get_secret_value() if settings.openai_api_key else ""
if not api_key.strip():
raise ReviewError("OPENAI_API_KEY is required for API-key review mode.")
headers: dict[str, str] = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
if settings.openai_org_id:
headers["OpenAI-Organization"] = settings.openai_org_id
if settings.openai_project_id:
headers["OpenAI-Project"] = settings.openai_project_id
body = {
"model": settings.openai_review_model,
"input": prompt,
"text": {"format": {"type": "json_object"}},
}
with httpx.Client(timeout=120.0) as client:
response = client.post("https://api.openai.com/v1/responses", headers=headers, json=body)
response.raise_for_status()
payload = response.json()
for item in payload.get("output", []):
for content in item.get("content", []):
text_value = content.get("text")
if text_value:
result = json.loads(text_value)
if isinstance(result, dict):
result["_meta"] = _build_openai_result_meta(payload, settings)
return result
raise ReviewError("OpenAI response did not contain JSON output text.")
def _build_openai_result_meta(payload: dict[str, Any], settings: Settings) -> dict[str, Any]:
usage_raw = payload.get("usage")
usage: dict[str, int] = {}
if isinstance(usage_raw, dict):
for output_key, source_key in (
("input_tokens", "input_tokens"),
("output_tokens", "output_tokens"),
("total_tokens", "total_tokens"),
):
value = usage_raw.get(source_key)
if isinstance(value, int):
usage[output_key] = value
model = payload.get("model")
if not isinstance(model, str) or not model.strip():
model = settings.openai_review_model
return {"source": "openai_api", "model": model, "usage": usage}
def _summarize_openai_failure(exc: Exception) -> str:
if isinstance(exc, httpx.HTTPStatusError):
status = exc.response.status_code
response_text = exc.response.text.strip()
if response_text:
compact = " ".join(response_text.split())
if len(compact) > 400:
compact = f"{compact[:400]}..."
return f"OpenAI API HTTP {status}: {compact}"
return f"OpenAI API HTTP {status}."
if isinstance(exc, httpx.TimeoutException):
return "OpenAI API request timed out."
message = str(exc).strip()
if message:
return message
return f"{exc.__class__.__name__} (no details)"
def _fallback_review(diff_context: dict[str, Any], *, failure_reason: str | None = None) -> dict[str, Any]:
findings: list[dict[str, Any]] = []
summary = "Fallback analysis was used because OpenAI review was unavailable."
if failure_reason:
summary = f"OpenAI review failed. Error: {failure_reason}"
findings.append(
{
"severity": "high",
"file": "unknown",
"line_start": 1,
"line_end": 1,
"title": "OpenAI review request failed",
"body": failure_reason,
"suggestion": "Fix API/auth/network issues and rerun @codex review.",
}
)
return {
"verdict": "correct" if not findings else "has_issues",
"confidence": 0.4 if not findings else 0.6,
"summary": summary,
"findings": findings,
}
def run_review_for_pr(
settings: Settings,
gitea: GiteaClient,
repo: str,
pr_number: int,
command: ParsedCommand,
) -> tuple[dict[str, Any], RepoReviewConfig]:
prompt, diff_context, repo_cfg = prepare_review_prompt(settings, gitea, repo, pr_number, command)
try:
result = _call_openai_review(settings, prompt)
except Exception as exc:
result = _fallback_review(diff_context, failure_reason=_summarize_openai_failure(exc))
return normalize_review_result(result), repo_cfg
def prepare_review_prompt(
settings: Settings,
gitea: GiteaClient,
repo: str,
pr_number: int,
command: ParsedCommand,
) -> tuple[str, dict[str, Any], RepoReviewConfig]:
pr = gitea.get_pull_request(repo, pr_number)
with TemporaryDirectory(prefix="gitea-codex-") as tmp:
tmpdir = Path(tmp)
repo_dir = checkout_pr(tmpdir, pr)
repo_cfg = load_repo_review_config(repo_dir)
if command.name == "review" and not command.mode_explicit:
configured_mode = repo_cfg.default_mode
command.mode = configured_mode if configured_mode in {"summary", "security", "performance", "tests", "full"} else "summary"
diff_context = collect_diff_context(repo_dir, pr, min(settings.max_diff_bytes, repo_cfg.max_diff_bytes))
diff_context["changed_files"] = _apply_ignore_patterns(diff_context["changed_files"], repo_cfg.ignore)
diff_context["diff"] = _redact_secrets_from_diff(diff_context["diff"])
changed_file_contents = ""
if command.full:
changed_file_contents = _collect_changed_file_contents(repo_dir, diff_context["changed_files"], settings.max_diff_bytes)
test_output = None
if repo_cfg.include_tests and command.mode == "tests":
test_output = _collect_test_output(repo_dir, timeout_seconds=min(settings.max_review_minutes * 60, 300))
prompt = _build_prompt(
pr,
command,
diff_context,
repo_cfg,
changed_file_contents=changed_file_contents,
test_output=test_output,
)
return prompt, diff_context, repo_cfg
def normalize_review_result(result: Any) -> dict[str, Any]:
if not isinstance(result, dict):
raise ReviewError(f"Invalid review result type: {type(result)!r}")
if "findings" not in result:
result["findings"] = []
if "summary" not in result:
result["summary"] = "No summary returned."
if "verdict" not in result:
result["verdict"] = "has_issues"
if "confidence" not in result:
result["confidence"] = 0.5
return result
def summarize_command(command: ParsedCommand) -> str:
return " ".join(["@codex", command.name, *command.arguments]).strip()
def fix_branch_name(pr_number: int, arguments: list[str] | None = None) -> str:
suffix = "fix"
if arguments:
words = [token.lower().strip() for token in arguments if token.strip() and not token.startswith("--")]
if words:
clean = "-".join(words[:4])
cleaned = "".join(ch if ch.isalnum() or ch == "-" else "-" for ch in clean).strip("-")
if cleaned:
suffix = f"fix-{cleaned}"
return f"codex/pr-{pr_number}-{suffix}"
def create_fix_patch_note(command: ParsedCommand) -> str:
details = shlex.join(command.arguments) if command.arguments else "latest findings"
return f"Fix command requested for {details}."
def create_fix_branch(
pr: PullRequestContext,
*,
note: str,
arguments: list[str] | None = None,
) -> str:
branch = fix_branch_name(pr.pr_number, arguments=arguments)
with TemporaryDirectory(prefix="gitea-codex-fix-") as tmp:
tmpdir = Path(tmp)
repo_dir = checkout_pr(tmpdir, pr)
_run_git(["checkout", "-b", branch], cwd=repo_dir)
notes_dir = repo_dir / ".codex"
notes_dir.mkdir(parents=True, exist_ok=True)
(notes_dir / "fix-note.md").write_text(f"# Codex Fix Note\n\n{note}\n", encoding="utf-8")
_run_git(["add", ".codex/fix-note.md"], cwd=repo_dir)
_run_git(["-c", "user.name=codex-bot", "-c", "user.email=codex-bot@example.invalid", "commit", "-m", f"Codex fix note for PR {pr.pr_number}"], cwd=repo_dir)
_run_git(["push", "origin", f"{branch}:{branch}", "--force"], cwd=repo_dir)
return branch