from __future__ import annotations import base64 import json import logging import os import re import shlex import subprocess import uuid from pathlib import Path from typing import Any from gitea_codex_bot.config import Settings from gitea_codex_bot.services.gitea import GiteaClient, PullRequestContext from gitea_codex_bot.services.repo_config import RepoReviewConfig, parse_repo_review_config_text from gitea_codex_bot.services.reviewer import normalize_review_result from gitea_codex_bot.types import ParsedCommand CONTAINER_CODEX_HOME = "/root/.codex" REVIEW_OUTPUT_FILE = "/tmp/codex-review-result.json" REVIEW_SCHEMA_FILE = "/tmp/codex-review-schema.json" REVIEW_EMITTED_FILE = "/tmp/codex-review-emitted.flag" RESULT_START_MARKER = "__CODEX_REVIEW_RESULT_BEGIN__" RESULT_END_MARKER = "__CODEX_REVIEW_RESULT_END__" logger = logging.getLogger(__name__) REVIEW_RESULT_SCHEMA: dict[str, Any] = { "type": "object", "additionalProperties": False, "required": ["verdict", "confidence", "summary", "findings", "markdown_comment"], "properties": { "verdict": {"type": "string", "enum": ["correct", "has_issues"]}, "confidence": {"type": "number"}, "summary": {"type": "string"}, "markdown_comment": {"type": "string"}, "findings": { "type": "array", "items": { "type": "object", "additionalProperties": False, "required": ["severity", "file", "line_start", "line_end", "title", "body", "suggestion"], "properties": { "severity": {"type": "string", "enum": ["low", "medium", "high", "critical"]}, "file": {"type": "string"}, "line_start": {"type": "integer"}, "line_end": {"type": "integer"}, "title": {"type": "string"}, "body": {"type": "string"}, "suggestion": {"type": ["string", "null"]}, }, }, }, }, } def run_review_ephemeral( settings: Settings, *, repo: str, pr_number: int, command: ParsedCommand, ) -> tuple[dict[str, Any], RepoReviewConfig]: gitea = GiteaClient(settings) pr = gitea.get_pull_request(repo, pr_number) repo_cfg = _load_repo_review_config_from_gitea(gitea, repo, pr.head_sha) _apply_repo_default_review_mode(command, repo_cfg) review_prompt = _build_exec_review_prompt(command, repo_cfg, pr) container_name = f"codex-review-{uuid.uuid4().hex[:12]}" marker_nonce = uuid.uuid4().hex result_start_marker = f"{RESULT_START_MARKER}_{marker_nonce}" result_end_marker = f"{RESULT_END_MARKER}_{marker_nonce}" extra_env: dict[str, str] = { "GITEA_TOKEN": settings.gitea_token.get_secret_value(), "GITEA_GIT_USERNAME": settings.gitea_bot_username, } if settings.openai_api_key: extra_env["OPENAI_API_KEY"] = settings.openai_api_key.get_secret_value() if settings.openai_org_id: extra_env["OPENAI_ORG_ID"] = settings.openai_org_id if settings.openai_project_id: extra_env["OPENAI_PROJECT_ID"] = settings.openai_project_id if settings.codex_auth_mode == "chatgpt": extra_env["CODEX_AUTH_JSON_B64"] = _load_codex_auth_json_b64(settings) try: completed = _run_ephemeral_container( settings, pr=pr, container_name=container_name, review_prompt=review_prompt, result_start_marker=result_start_marker, result_end_marker=result_end_marker, extra_env=extra_env, ) if completed.returncode != 0: raise RuntimeError(_format_runner_failure(completed)) parsed = _parse_review_result_from_stdout_artifact( completed.stdout, result_start_marker=result_start_marker, result_end_marker=result_end_marker, ) parsed["_meta"] = _extract_result_meta_from_codex_stdout(completed.stdout, settings) return normalize_review_result(parsed), repo_cfg except Exception as exc: logger.warning("Ephemeral runner failed without host fallback: %s", exc) return _ephemeral_runner_failure_result(exc, settings.codex_auth_mode), repo_cfg def _run_ephemeral_container( settings: Settings, *, pr: PullRequestContext, container_name: str, review_prompt: str, result_start_marker: str, result_end_marker: str, extra_env: dict[str, str], ) -> subprocess.CompletedProcess[str]: install_and_run = _build_install_and_run_command( settings, pr=pr, review_prompt=review_prompt, result_start_marker=result_start_marker, result_end_marker=result_end_marker, ) cmd = _build_docker_command(settings, container_name=container_name, install_and_run=install_and_run) return subprocess.run( cmd, text=True, check=False, capture_output=True, timeout=settings.max_review_minutes * 60, env={**os.environ, **extra_env}, ) def _build_install_and_run_command( settings: Settings, *, pr: PullRequestContext, review_prompt: str, result_start_marker: str, result_end_marker: str, ) -> str: runner_fallback_json = json.dumps( { "verdict": "has_issues", "confidence": 0.67, "summary": "Ephemeral codex execution failed before producing a review result.", "markdown_comment": "Ephemeral codex execution failed before producing a review result.", "findings": [ { "severity": "high", "file": "runner", "line_start": 1, "line_end": 1, "title": "Ephemeral review runner failed", "body": "codex exec failed before emitting a valid structured artifact.", "suggestion": "Check ephemeral runner logs for auth/model/network issues and rerun @codex review.", } ], }, separators=(",", ":"), ) steps = [ "set -euo pipefail", f"rm -f {shlex.quote(REVIEW_EMITTED_FILE)}", "emit_review_artifact() { " "rc=\"$1\"; " f"if [ ! -s {shlex.quote(REVIEW_OUTPUT_FILE)} ]; then " f"cat > {shlex.quote(REVIEW_OUTPUT_FILE)} <<'JSON'\n{runner_fallback_json}\nJSON\n" "fi; " f'if [ ! -f {shlex.quote(REVIEW_EMITTED_FILE)} ]; then echo "{result_start_marker}"; cat {shlex.quote(REVIEW_OUTPUT_FILE)}; echo "{result_end_marker}"; touch {shlex.quote(REVIEW_EMITTED_FILE)}; fi; ' "return \"$rc\"; " "}", "trap 'rc=$?; set +e; emit_review_artifact \"$rc\"; exit \"$rc\"' EXIT", ] if settings.codex_auth_mode != "chatgpt": steps.extend( [ 'if [ -z "${OPENAI_API_KEY:-}" ]; then echo "OPENAI_API_KEY missing in runner env" >&2; exit 8; fi', ] ) steps.extend( [ 'if [ -z "${GITEA_TOKEN:-}" ]; then echo "GITEA_TOKEN missing in runner env" >&2; exit 8; fi', 'if [ -z "${GITEA_GIT_USERNAME:-}" ]; then echo "GITEA_GIT_USERNAME missing in runner env" >&2; exit 8; fi', ] ) if settings.codex_auth_mode == "chatgpt": steps.extend( [ f"mkdir -p {CONTAINER_CODEX_HOME}", 'printf "%s" "$CODEX_AUTH_JSON_B64" | base64 -d > /root/.codex/auth.json', f"chmod 600 {CONTAINER_CODEX_HOME}/auth.json", ] ) steps.extend( [ "apt-get update >/tmp/apt-update.log 2>&1 && apt-get install -y --no-install-recommends ca-certificates git >/tmp/apt-install.log 2>&1 || { rc=$?; echo 'ca-certificates/git install failed'; tail -n 80 /tmp/apt-update.log || true; tail -n 80 /tmp/apt-install.log || true; exit $rc; }", "npm install -g @openai/codex@latest >/tmp/codex-install.log 2>&1 || { rc=$?; echo 'codex install failed'; tail -n 200 /tmp/codex-install.log || true; exit $rc; }", "codex --version >/tmp/codex-version.log 2>&1 || { rc=$?; echo 'codex version check failed'; tail -n 40 /tmp/codex-version.log || true; exit $rc; }", ] ) schema_json = json.dumps(REVIEW_RESULT_SCHEMA, separators=(",", ":")) steps.extend( [ f"cat > {REVIEW_SCHEMA_FILE} <<'JSON'\n{schema_json}\nJSON", 'auth_b64="$(printf "%s" "${GITEA_GIT_USERNAME}:${GITEA_TOKEN}" | base64 | tr -d \'\\n\')"', f'git -c http.extraHeader="Authorization: Basic $auth_b64" clone --no-tags --depth 80 {shlex.quote(pr.clone_url)} /work/repo', "cd /work/repo", "fetch_required() { " "remote=\"$1\"; ref=\"$2\"; sha=\"$3\"; label=\"$4\"; " "if git -c http.extraHeader=\"Authorization: Basic $auth_b64\" fetch --no-tags \"$remote\" \"$ref\"; then return 0; fi; " "if git -c http.extraHeader=\"Authorization: Basic $auth_b64\" fetch --no-tags \"$remote\" \"$sha\"; then return 0; fi; " "echo \"Failed to fetch $label from remote '$remote' using ref '$ref' or sha '$sha'\" >&2; " "return 7; " "}", f"base_remote={'upstream' if pr.base_clone_url and pr.base_clone_url != pr.clone_url else 'origin'}", f"if [ \"$base_remote\" = \"upstream\" ]; then git remote add upstream {shlex.quote(pr.base_clone_url or '')}; fi", f"fetch_required origin {shlex.quote(pr.head_ref)} {shlex.quote(pr.head_sha)} head", f"fetch_required \"$base_remote\" {shlex.quote(pr.base_ref)} {shlex.quote(pr.base_sha)} base", f"git checkout --detach {shlex.quote(pr.head_sha)}", 'resolved_head="$(git rev-parse HEAD)"', f'if [ "$resolved_head" != {shlex.quote(pr.head_sha)} ]; then echo "Checked out SHA mismatch: expected {pr.head_sha}, got $resolved_head" >&2; exit 9; fi', "unset GITEA_TOKEN auth_b64", "git config --global --unset-all http.extraHeader >/dev/null 2>&1 || true", ] ) model = settings.openai_review_model.strip() codex_exec_parts = [ "codex exec", "--sandbox", "danger-full-access", "--json", "--output-schema", shlex.quote(REVIEW_SCHEMA_FILE), "-o", shlex.quote(REVIEW_OUTPUT_FILE), ] if model: codex_exec_parts.append(f"-m {shlex.quote(model)}") codex_exec_parts.append(shlex.quote(review_prompt)) steps.extend( [ "set +e", "codex_rc=0", " ".join(codex_exec_parts) + ' || codex_rc="$?"', "set -e", f'if [ "$codex_rc" -ne 0 ] || [ ! -s {shlex.quote(REVIEW_OUTPUT_FILE)} ]; then cat > {REVIEW_OUTPUT_FILE} <<\'JSON\'\n{runner_fallback_json}\nJSON\nfi', "emit_review_artifact 0", ] ) return "\n".join(steps) def _apply_repo_default_review_mode(command: ParsedCommand, repo_cfg: RepoReviewConfig) -> None: if command.name != "review" or command.mode_explicit: return configured_mode = repo_cfg.default_mode command.mode = configured_mode if configured_mode in {"summary", "security", "performance", "tests", "full"} else "summary" def _build_exec_review_prompt(command: ParsedCommand, repo_cfg: RepoReviewConfig, pr: PullRequestContext) -> str: raw = (command.raw or "").strip() remainder = raw match = re.match(r"^@[^\s]+\s+\S+\s*(.*)$", raw, flags=re.IGNORECASE | re.DOTALL) if match: remainder = match.group(1).strip() intent = remainder or "review this pull request and report introduced issues." focus = ", ".join(repo_cfg.focus) if repo_cfg.focus else "correctness, security, maintainability" ignore = ", ".join(repo_cfg.ignore) if repo_cfg.ignore else "(none)" mode = command.mode if command.name in {"review", "rerun"} else "summary" allow_test_execution = command.mode == "tests" or repo_cfg.include_tests tests_policy = ( "Tests may be executed for this run because tests mode/include_tests is explicitly enabled." if allow_test_execution else "Do not run tests, benchmarks, or other executables. Review changes statically unless explicitly asked." ) return "\n".join( [ f"review: {intent}", "Review only issues introduced by this PR.", f"Compare exactly these commits: base `{pr.base_sha}` ... head `{pr.head_sha}`.", "Use local git data from this checkout; do not review unrelated history.", f"Requested mode: {mode}.", f"Focus areas: {focus}.", f"Ignore patterns: {ignore}.", f"Include tests setting: {repo_cfg.include_tests}.", tests_policy, f"Full review requested: {command.full}.", "Return strict JSON matching the provided output schema.", ] ) def _build_docker_command(settings: Settings, *, container_name: str, install_and_run: str) -> list[str]: cmd = [ "docker", "run", "--rm", "-i", "--name", container_name, "-e", "CODEX_DISABLE_TELEMETRY=1", "-e", "CODEX_SANDBOX_MODE=danger-full-access", ] if settings.codex_auth_mode == "chatgpt": cmd.extend( [ "-e", f"CODEX_HOME={CONTAINER_CODEX_HOME}", "-e", "CODEX_AUTH_JSON_B64", ] ) else: cmd.extend( [ "-e", "OPENAI_API_KEY", "-e", "OPENAI_ORG_ID", "-e", "OPENAI_PROJECT_ID", ] ) cmd.extend( [ "-e", "GITEA_TOKEN", "-e", "GITEA_GIT_USERNAME", ] ) cmd.extend([settings.review_runner_image, "bash", "-lc", install_and_run]) return cmd def _ephemeral_runner_failure_result(exc: Exception, auth_mode: str) -> dict[str, Any]: message = str(exc).strip() or exc.__class__.__name__ mode_label = "ChatGPT auth" if auth_mode == "chatgpt" else "API-key auth" summary = f"{mode_label} runner failed before review execution. Error: {message}" return { "verdict": "has_issues", "confidence": 0.67, "summary": summary, "findings": [ { "severity": "high", "file": "runner", "line_start": 1, "line_end": 1, "title": "Ephemeral review runner failed", "body": message, "suggestion": "Check ephemeral runner logs for model/auth/network issues, then rerun @codex review.", } ], } def _format_runner_failure(completed: subprocess.CompletedProcess[str]) -> str: stdout_tail = _tail_text(completed.stdout) stderr_tail = _tail_text(completed.stderr) message = f"ephemeral runner exited with code {completed.returncode}" if stdout_tail: message = f"{message}; stdout_tail={stdout_tail}" if stderr_tail: message = f"{message}; stderr_tail={stderr_tail}" return message def _tail_text(text: str, limit: int = 1200) -> str: compact = " ".join(text.split()) if len(compact) <= limit: return compact return f"...{compact[-limit:]}" def _resolve_codex_auth_json_path(settings: Settings) -> Path: raw_path = settings.codex_auth_json_path.strip() if settings.codex_auth_json_path else "~/.codex/auth.json" path = Path(raw_path).expanduser() if not path.exists() or not path.is_file(): raise FileNotFoundError( f"CODEX_AUTH_MODE=chatgpt requires a readable auth.json file. Checked path: {path}" ) return path.resolve() def _load_codex_auth_json_b64(settings: Settings) -> str: auth_path = _resolve_codex_auth_json_path(settings) content = auth_path.read_text(encoding="utf-8") # Validate JSON before handing it to the ephemeral runner. json.loads(content) return base64.b64encode(content.encode("utf-8")).decode("ascii") def ensure_workdir(path: str) -> Path: target = Path(path) target.mkdir(parents=True, exist_ok=True) return target def _load_repo_review_config_from_gitea(gitea: GiteaClient, repo: str, head_sha: str) -> RepoReviewConfig: content = gitea.get_file_content(repo, ".codex-review.yml", ref=head_sha) if content is None: return RepoReviewConfig(configured=False) return parse_repo_review_config_text(content, configured=True) def _parse_review_result_from_stdout_artifact( stdout: str, *, result_start_marker: str, result_end_marker: str, ) -> dict[str, Any]: start_pos = stdout.find(result_start_marker) if start_pos == -1: raise RuntimeError("Runner output did not include final review artifact markers.") artifact_start = start_pos + len(result_start_marker) # Prefer the last end marker so marker-like text inside JSON does not # truncate the payload when earlier incidental matches exist. end_pos = stdout.rfind(result_end_marker) if end_pos == -1 or end_pos <= artifact_start: raise RuntimeError("Runner output did not include final review artifact markers.") artifact = stdout[artifact_start:end_pos].strip() if not artifact: raise RuntimeError("Runner output contained empty final review artifact.") try: payload = json.loads(artifact) except json.JSONDecodeError as exc: raise RuntimeError(f"Final review artifact was not valid JSON: {exc}") from exc if not isinstance(payload, dict): raise RuntimeError(f"Final review artifact JSON must be an object, got {type(payload)!r}.") return payload def _extract_result_meta_from_codex_stdout(stdout: str, settings: Settings) -> dict[str, Any]: model = settings.openai_review_model usage: dict[str, int] = {} for line in stdout.splitlines(): line = line.strip() if not line: continue try: payload = json.loads(line) except json.JSONDecodeError: continue discovered_model = _find_first_string_for_key(payload, "model") if discovered_model: model = discovered_model discovered_usage = _find_first_dict_for_key(payload, "usage") if isinstance(discovered_usage, dict): for output_key, source_key in ( ("input_tokens", "input_tokens"), ("output_tokens", "output_tokens"), ("total_tokens", "total_tokens"), ): value = discovered_usage.get(source_key) if isinstance(value, int): usage[output_key] = value return {"source": "ephemeral_runner", "model": model, "usage": usage} def _find_first_string_for_key(payload: Any, key: str) -> str | None: if isinstance(payload, dict): value = payload.get(key) if isinstance(value, str) and value.strip(): return value for nested in payload.values(): found = _find_first_string_for_key(nested, key) if found: return found if isinstance(payload, list): for item in payload: found = _find_first_string_for_key(item, key) if found: return found return None def _find_first_dict_for_key(payload: Any, key: str) -> dict[str, Any] | None: if isinstance(payload, dict): value = payload.get(key) if isinstance(value, dict): return value for nested in payload.values(): found = _find_first_dict_for_key(nested, key) if found: return found if isinstance(payload, list): for item in payload: found = _find_first_dict_for_key(item, key) if found: return found return None