First MVP

This commit is contained in:
Space-Banane
2026-05-22 19:25:57 +02:00
parent 673f70b32a
commit 860ccb731d
40 changed files with 2336 additions and 0 deletions

View File

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
import re
from gitea_codex_bot.types import ParsedCommand
COMMAND_RE = re.compile(r"^@codex\s+(review|explain|fix|ignore|rerun)\b(.*)$", re.IGNORECASE | re.DOTALL)
def parse_command(body: str) -> ParsedCommand | None:
stripped = body.strip()
match = COMMAND_RE.match(stripped)
if not match:
return None
name = match.group(1).lower()
rest = match.group(2).strip()
tokens = [token for token in rest.split() if token]
parsed = ParsedCommand(name=name, raw=stripped, arguments=tokens)
if name == "review":
if "--full" in tokens:
parsed.full = True
parsed.mode = "full"
for mode in ("security", "performance", "tests"):
if mode in tokens:
parsed.mode = mode
break
elif name == "fix":
parsed.branch_fix = "--branch" in tokens
return parsed

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
from sqlalchemy import select
from sqlalchemy.orm import Session
from gitea_codex_bot.models import BotComment
REVIEW_MARKER = "codex-review"
def get_persistent_review_comment_id(session: Session, repo: str, pr_number: int) -> int | None:
row = session.execute(
select(BotComment)
.where(BotComment.repo == repo, BotComment.pr_number == pr_number, BotComment.marker == REVIEW_MARKER)
.limit(1)
).scalar_one_or_none()
return row.gitea_comment_id if row else None
def upsert_persistent_review_comment_id(
session: Session,
*,
repo: str,
pr_number: int,
head_sha: str,
comment_id: int,
) -> None:
row = session.execute(
select(BotComment)
.where(BotComment.repo == repo, BotComment.pr_number == pr_number, BotComment.marker == REVIEW_MARKER)
.limit(1)
).scalar_one_or_none()
if not row:
row = BotComment(repo=repo, pr_number=pr_number, head_sha=head_sha, gitea_comment_id=comment_id, marker=REVIEW_MARKER)
session.add(row)
else:
row.head_sha = head_sha
row.gitea_comment_id = comment_id
session.commit()

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from urllib.parse import quote
import httpx
from gitea_codex_bot.config import Settings
@dataclass(slots=True)
class PullRequestContext:
repo: str
pr_number: int
base_ref: str
base_sha: str
head_ref: str
head_sha: str
clone_url: str
html_url: str
is_fork: bool
class GiteaClient:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.base_url = settings.gitea_base_url
self.headers = {
"Authorization": f"token {settings.gitea_token.get_secret_value()}",
"Accept": "application/json",
"Content-Type": "application/json",
}
def _request(self, method: str, path: str, *, json_body: dict[str, Any] | None = None) -> Any:
with httpx.Client(timeout=20.0) as client:
response = client.request(
method,
f"{self.base_url}{path}",
headers=self.headers,
json=json_body,
)
response.raise_for_status()
if response.status_code == 204:
return None
return response.json()
@staticmethod
def split_repo(repo: str) -> tuple[str, str]:
owner, name = repo.split("/", 1)
return owner, name
def get_pull_request(self, repo: str, pr_number: int) -> PullRequestContext:
owner, name = self.split_repo(repo)
encoded_owner = quote(owner, safe="")
encoded_name = quote(name, safe="")
payload = self._request("GET", f"/api/v1/repos/{encoded_owner}/{encoded_name}/pulls/{pr_number}")
return PullRequestContext(
repo=repo,
pr_number=pr_number,
base_ref=payload["base"]["ref"],
base_sha=payload["base"]["sha"],
head_ref=payload["head"]["ref"],
head_sha=payload["head"]["sha"],
clone_url=payload["head"]["repo"]["clone_url"],
html_url=payload["html_url"],
is_fork=bool(payload["head"]["repo"]["full_name"] != payload["base"]["repo"]["full_name"]),
)
def post_issue_comment(self, repo: str, pr_number: int, body: str) -> int:
owner, name = self.split_repo(repo)
encoded_owner = quote(owner, safe="")
encoded_name = quote(name, safe="")
payload = self._request(
"POST",
f"/api/v1/repos/{encoded_owner}/{encoded_name}/issues/{pr_number}/comments",
json_body={"body": body},
)
return int(payload["id"])
def edit_issue_comment(self, repo: str, comment_id: int, body: str) -> int:
owner, name = self.split_repo(repo)
encoded_owner = quote(owner, safe="")
encoded_name = quote(name, safe="")
payload = self._request(
"PATCH",
f"/api/v1/repos/{encoded_owner}/{encoded_name}/issues/comments/{comment_id}",
json_body={"body": body},
)
return int(payload["id"])
def list_issue_comments(self, repo: str, pr_number: int) -> list[dict[str, Any]]:
owner, name = self.split_repo(repo)
encoded_owner = quote(owner, safe="")
encoded_name = quote(name, safe="")
payload = self._request("GET", f"/api/v1/repos/{encoded_owner}/{encoded_name}/issues/{pr_number}/comments")
return list(payload)

View File

@@ -0,0 +1,136 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from gitea_codex_bot.models import JobStatus, ReviewJob, ReviewRun, RunStatus, WebhookEvent
from gitea_codex_bot.services.security import payload_digest
from gitea_codex_bot.types import ParsedCommand
def persist_webhook_event(
session: Session,
*,
delivery_id: str | None,
event_name: str,
repo: str,
comment_id: int | None,
payload: bytes,
) -> bool:
event = WebhookEvent(
delivery_id=delivery_id,
event_name=event_name,
repo=repo,
comment_id=comment_id,
payload_sha256=payload_digest(payload),
)
session.add(event)
try:
session.commit()
return True
except IntegrityError:
session.rollback()
return False
def cooldown_remaining_seconds(session: Session, repo: str, pr_number: int, cooldown_seconds: int) -> int:
cutoff = datetime.now(timezone.utc) - timedelta(seconds=cooldown_seconds)
row = session.execute(
select(ReviewJob)
.where(ReviewJob.repo == repo, ReviewJob.pr_number == pr_number, ReviewJob.created_at >= cutoff)
.order_by(ReviewJob.created_at.desc())
.limit(1)
).scalar_one_or_none()
if not row:
return 0
created_at = row.created_at
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - created_at).total_seconds()
remaining = int(max(cooldown_seconds - age, 0))
return remaining
def enqueue_job(
session: Session,
*,
repo: str,
pr_number: int,
head_sha: str,
trigger_comment_id: int,
requested_by: str,
command: ParsedCommand,
) -> ReviewJob:
job = ReviewJob(
repo=repo,
pr_number=pr_number,
head_sha=head_sha,
trigger_comment_id=trigger_comment_id,
command=command.name,
command_args=" ".join(command.arguments) if command.arguments else None,
requested_by=requested_by,
status=JobStatus.queued,
)
session.add(job)
session.commit()
session.refresh(job)
return job
def claim_next_job(session: Session) -> ReviewJob | None:
job = session.execute(
select(ReviewJob).where(ReviewJob.status == JobStatus.queued).order_by(ReviewJob.created_at.asc()).limit(1).with_for_update(skip_locked=True)
).scalar_one_or_none()
if not job:
session.rollback()
return None
job.status = JobStatus.running
job.started_at = datetime.now(timezone.utc)
run = ReviewRun(job_id=job.id, status=RunStatus.running)
session.add(run)
session.commit()
session.refresh(job)
return job
def finish_job(
session: Session,
*,
job_id: int,
success: bool,
skipped: bool,
result: dict | None,
error_message: str | None,
) -> None:
job = session.get(ReviewJob, job_id)
if not job:
return
latest_run = (
session.execute(select(ReviewRun).where(ReviewRun.job_id == job_id).order_by(ReviewRun.id.desc()).limit(1)).scalar_one_or_none()
)
if skipped:
job.status = JobStatus.skipped
run_status = RunStatus.skipped
elif success:
job.status = JobStatus.succeeded
run_status = RunStatus.succeeded
else:
job.status = JobStatus.failed
run_status = RunStatus.failed
now = datetime.now(timezone.utc)
job.finished_at = now
job.last_error = error_message
if result is not None:
job.result_json = result
if latest_run:
latest_run.status = run_status
latest_run.finished_at = now
latest_run.result_json = result
latest_run.error_message = error_message
session.commit()

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
import yaml
@dataclass(slots=True)
class RepoReviewConfig:
enabled: bool = True
default_mode: str = "summary"
max_diff_bytes: int = 200000
include_tests: bool = True
focus: list[str] = field(default_factory=lambda: ["correctness", "security", "maintainability"])
ignore: list[str] = field(default_factory=list)
allow_fix: bool = False
def load_repo_review_config(repo_root: Path) -> RepoReviewConfig:
path = repo_root / ".codex-review.yml"
if not path.exists():
return RepoReviewConfig()
raw = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
review = raw.get("review", {}) or {}
commands = raw.get("commands", {}) or {}
return RepoReviewConfig(
enabled=bool(raw.get("enabled", True)),
default_mode=str(review.get("default_mode", "summary")),
max_diff_bytes=int(review.get("max_diff_bytes", 200000)),
include_tests=bool(review.get("include_tests", True)),
focus=list(review.get("focus", ["correctness", "security", "maintainability"])),
ignore=list(raw.get("ignore", [])),
allow_fix=bool(commands.get("allow_fix", False)),
)

View File

@@ -0,0 +1,50 @@
from __future__ import annotations
from gitea_codex_bot.types import ParsedCommand
def format_queue_ack(head_sha: str) -> str:
short_sha = head_sha[:7]
return f"👀 Codex review queued for commit `{short_sha}`."
def format_cooldown_ack(seconds: int) -> str:
return f"⏳ Cooldown active. Please wait {seconds}s before requesting another review on this PR."
def format_disabled_ack() -> str:
return "🚫 Review is disabled by `.codex-review.yml` for this repository."
def format_unsupported_ack(command: ParsedCommand) -> str:
return f"⚠️ Command `@codex {command.name}` is not enabled on this repository."
def format_result_comment(head_sha: str, result: dict) -> str:
verdict = result.get("verdict", "has_issues")
confidence = float(result.get("confidence", 0.0))
summary = str(result.get("summary", "No summary returned."))
findings = result.get("findings", []) or []
lines = [f"<!-- codex-review:head_sha={head_sha} -->", "## Codex Review", "", f"Verdict: `{verdict}`", f"Confidence: `{confidence:.2f}`", "", summary, ""]
if not findings:
lines.append("No blocking issues found.")
else:
lines.append("Findings:")
for idx, finding in enumerate(findings, start=1):
severity = finding.get("severity", "unknown")
file_path = finding.get("file", "unknown")
line_start = finding.get("line_start", "?")
line_end = finding.get("line_end", line_start)
title = finding.get("title", "Issue")
body = finding.get("body", "")
suggestion = finding.get("suggestion", "")
lines.extend(
[
f"{idx}. `{file_path}:{line_start}-{line_end}` ({severity})",
f" {title}",
f" {body}",
f" Suggestion: {suggestion}" if suggestion else " Suggestion: n/a",
]
)
return "\n".join(lines).strip()

View File

@@ -0,0 +1,290 @@
from __future__ import annotations
import json
import os
import shlex
import subprocess
from fnmatch import fnmatch
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
import httpx
from gitea_codex_bot.config import Settings
from gitea_codex_bot.services.gitea import GiteaClient, PullRequestContext
from gitea_codex_bot.services.repo_config import RepoReviewConfig, load_repo_review_config
from gitea_codex_bot.types import ParsedCommand
class ReviewError(RuntimeError):
pass
def _run_git(args: list[str], cwd: Path | None = None) -> str:
completed = subprocess.run(["git", *args], cwd=cwd, check=True, capture_output=True, text=True)
return completed.stdout
def checkout_pr(tmpdir: Path, pr: PullRequestContext) -> Path:
repo_dir = tmpdir / "repo"
_run_git(["clone", "--no-tags", "--depth", "50", pr.clone_url, str(repo_dir)])
_run_git(["fetch", "origin", pr.base_ref, pr.head_ref], cwd=repo_dir)
_run_git(["checkout", pr.head_sha], cwd=repo_dir)
return repo_dir
def collect_diff_context(repo_dir: Path, pr: PullRequestContext, max_diff_bytes: int) -> dict[str, Any]:
diff = _run_git(["diff", f"{pr.base_sha}...{pr.head_sha}"], cwd=repo_dir)
changed_files_raw = _run_git(["diff", "--name-only", f"{pr.base_sha}...{pr.head_sha}"], cwd=repo_dir)
changed_files = [line.strip() for line in changed_files_raw.splitlines() if line.strip()]
truncated = False
if len(diff.encode("utf-8")) > max_diff_bytes:
diff = diff.encode("utf-8")[:max_diff_bytes].decode("utf-8", errors="ignore")
truncated = True
return {"diff": diff, "changed_files": changed_files, "truncated": truncated}
def _apply_ignore_patterns(changed_files: list[str], ignore_patterns: list[str]) -> list[str]:
if not ignore_patterns:
return changed_files
kept: list[str] = []
for path in changed_files:
if any(fnmatch(path, pattern) for pattern in ignore_patterns):
continue
kept.append(path)
return kept
def _collect_changed_file_contents(repo_dir: Path, changed_files: list[str], max_total_bytes: int) -> str:
chunks: list[str] = []
total = 0
for rel in changed_files:
path = repo_dir / rel
if not path.exists() or not path.is_file():
continue
try:
content = path.read_text(encoding="utf-8", errors="ignore")
except OSError:
continue
block = f"\n### {rel}\n{content}\n"
block_bytes = len(block.encode("utf-8"))
if total + block_bytes > max_total_bytes:
break
chunks.append(block)
total += block_bytes
return "".join(chunks).strip()
def _collect_test_output(repo_dir: Path, timeout_seconds: int) -> str:
try:
completed = subprocess.run(
["pytest", "-q"],
cwd=repo_dir,
capture_output=True,
text=True,
timeout=timeout_seconds,
check=False,
)
output = (completed.stdout + "\n" + completed.stderr).strip()
return output[:10000]
except Exception as exc:
return f"Test execution unavailable: {exc}"
def _redact_secrets_from_diff(diff: str) -> str:
secret_terms = ("api_key", "token", "secret", "password", "private_key", "-----begin")
redacted_lines: list[str] = []
for line in diff.splitlines():
lower = line.lower()
if any(term in lower for term in secret_terms):
redacted_lines.append("[REDACTED_POTENTIAL_SECRET]")
else:
redacted_lines.append(line)
return "\n".join(redacted_lines)
def _build_prompt(
pr: PullRequestContext,
command: ParsedCommand,
diff_context: dict[str, Any],
repo_cfg: RepoReviewConfig,
*,
changed_file_contents: str,
test_output: str | None,
) -> str:
mode = command.mode if command.name in {"review", "rerun"} else "summary"
return (
"You are reviewing a Gitea pull request.\n\n"
"Focus only on issues introduced by this PR.\n"
"Prioritize correctness, security, data loss, broken behavior, bad migrations, and missing tests.\n"
"Avoid style nitpicks.\n\n"
"Return JSON only with schema:\n"
"{\n"
' "verdict": "correct" | "has_issues",\n'
' "confidence": 0.0,\n'
' "summary": "...",\n'
' "findings": [{"severity":"low|medium|high|critical","file":"...","line_start":1,"line_end":1,"title":"...","body":"...","suggestion":"..."}]\n'
"}\n\n"
f"PR URL: {pr.html_url}\n"
f"Mode: {mode}\n"
f"Repo focus: {', '.join(repo_cfg.focus)}\n"
f"Diff truncated: {diff_context['truncated']}\n"
f"Changed files:\n{os.linesep.join(diff_context['changed_files'])}\n\n"
f"Unified diff:\n{diff_context['diff']}\n\n"
f"Changed file content (optional):\n{changed_file_contents or '(not included)'}\n\n"
f"Test output (optional):\n{test_output or '(not included)'}\n"
)
def _call_openai_review(settings: Settings, prompt: str) -> dict[str, Any]:
headers: dict[str, str] = {
"Authorization": f"Bearer {settings.openai_api_key.get_secret_value()}",
"Content-Type": "application/json",
}
if settings.openai_org_id:
headers["OpenAI-Organization"] = settings.openai_org_id
if settings.openai_project_id:
headers["OpenAI-Project"] = settings.openai_project_id
body = {
"model": settings.openai_review_model,
"input": prompt,
"text": {"format": {"type": "json_object"}},
"reasoning": {"effort": settings.openai_reasoning_effort},
}
with httpx.Client(timeout=120.0) as client:
response = client.post("https://api.openai.com/v1/responses", headers=headers, json=body)
response.raise_for_status()
payload = response.json()
for item in payload.get("output", []):
for content in item.get("content", []):
text_value = content.get("text")
if text_value:
return json.loads(text_value)
raise ReviewError("OpenAI response did not contain JSON output text.")
def _fallback_review(diff_context: dict[str, Any]) -> dict[str, Any]:
findings = []
if "TODO" in diff_context["diff"]:
findings.append(
{
"severity": "low",
"file": "unknown",
"line_start": 1,
"line_end": 1,
"title": "TODO marker in diff",
"body": "The change introduces TODO markers that may indicate incomplete behavior.",
"suggestion": "Resolve or track TODOs before merging.",
}
)
return {
"verdict": "correct" if not findings else "has_issues",
"confidence": 0.4 if not findings else 0.6,
"summary": "Fallback analysis was used because OpenAI review was unavailable.",
"findings": findings,
}
def run_review_for_pr(
settings: Settings,
gitea: GiteaClient,
repo: str,
pr_number: int,
command: ParsedCommand,
) -> tuple[dict[str, Any], RepoReviewConfig]:
prompt, diff_context, repo_cfg = prepare_review_prompt(settings, gitea, repo, pr_number, command)
try:
result = _call_openai_review(settings, prompt)
except Exception:
result = _fallback_review(diff_context)
return normalize_review_result(result), repo_cfg
def prepare_review_prompt(
settings: Settings,
gitea: GiteaClient,
repo: str,
pr_number: int,
command: ParsedCommand,
) -> tuple[str, dict[str, Any], RepoReviewConfig]:
pr = gitea.get_pull_request(repo, pr_number)
with TemporaryDirectory(prefix="gitea-codex-") as tmp:
tmpdir = Path(tmp)
repo_dir = checkout_pr(tmpdir, pr)
repo_cfg = load_repo_review_config(repo_dir)
diff_context = collect_diff_context(repo_dir, pr, min(settings.max_diff_bytes, repo_cfg.max_diff_bytes))
diff_context["changed_files"] = _apply_ignore_patterns(diff_context["changed_files"], repo_cfg.ignore)
diff_context["diff"] = _redact_secrets_from_diff(diff_context["diff"])
changed_file_contents = ""
if command.full:
changed_file_contents = _collect_changed_file_contents(repo_dir, diff_context["changed_files"], settings.max_diff_bytes)
test_output = None
if repo_cfg.include_tests and command.mode == "tests":
test_output = _collect_test_output(repo_dir, timeout_seconds=min(settings.max_review_minutes * 60, 300))
prompt = _build_prompt(
pr,
command,
diff_context,
repo_cfg,
changed_file_contents=changed_file_contents,
test_output=test_output,
)
return prompt, diff_context, repo_cfg
def normalize_review_result(result: Any) -> dict[str, Any]:
if not isinstance(result, dict):
raise ReviewError(f"Invalid review result type: {type(result)!r}")
if "findings" not in result:
result["findings"] = []
if "summary" not in result:
result["summary"] = "No summary returned."
if "verdict" not in result:
result["verdict"] = "has_issues"
if "confidence" not in result:
result["confidence"] = 0.5
return result
def summarize_command(command: ParsedCommand) -> str:
return " ".join(["@codex", command.name, *command.arguments]).strip()
def fix_branch_name(pr_number: int, arguments: list[str] | None = None) -> str:
suffix = "fix"
if arguments:
words = [token.lower().strip() for token in arguments if token.strip() and not token.startswith("--")]
if words:
clean = "-".join(words[:4])
cleaned = "".join(ch if ch.isalnum() or ch == "-" else "-" for ch in clean).strip("-")
if cleaned:
suffix = f"fix-{cleaned}"
return f"codex/pr-{pr_number}-{suffix}"
def create_fix_patch_note(command: ParsedCommand) -> str:
details = shlex.join(command.arguments) if command.arguments else "latest findings"
return f"Fix command requested for {details}."
def create_fix_branch(
pr: PullRequestContext,
*,
note: str,
arguments: list[str] | None = None,
) -> str:
branch = fix_branch_name(pr.pr_number, arguments=arguments)
with TemporaryDirectory(prefix="gitea-codex-fix-") as tmp:
tmpdir = Path(tmp)
repo_dir = checkout_pr(tmpdir, pr)
_run_git(["checkout", "-b", branch], cwd=repo_dir)
notes_dir = repo_dir / ".codex"
notes_dir.mkdir(parents=True, exist_ok=True)
(notes_dir / "fix-note.md").write_text(f"# Codex Fix Note\n\n{note}\n", encoding="utf-8")
_run_git(["add", ".codex/fix-note.md"], cwd=repo_dir)
_run_git(["-c", "user.name=codex-bot", "-c", "user.email=codex-bot@example.invalid", "commit", "-m", f"Codex fix note for PR {pr.pr_number}"], cwd=repo_dir)
_run_git(["push", "origin", f"{branch}:{branch}", "--force"], cwd=repo_dir)
return branch

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
import hashlib
import hmac
def verify_gitea_signature(payload: bytes, secret: str, received_signature: str | None) -> bool:
if not received_signature:
return False
expected = hmac.new(secret.encode("utf-8"), payload, hashlib.sha256).hexdigest()
normalized = received_signature.removeprefix("sha256=").strip()
return hmac.compare_digest(expected, normalized)
def payload_digest(payload: bytes) -> str:
return hashlib.sha256(payload).hexdigest()