from __future__ import annotations import base64 import json import logging import os import re import shlex import subprocess import uuid from pathlib import Path from typing import Any from gitea_codex_bot.config import Settings from gitea_codex_bot.services.gitea import GiteaClient from gitea_codex_bot.services.repo_config import RepoReviewConfig from gitea_codex_bot.services.reviewer import normalize_review_result, prepare_review_prompt from gitea_codex_bot.types import ParsedCommand CONTAINER_CODEX_HOME = "/root/.codex" logger = logging.getLogger(__name__) def run_review_ephemeral( settings: Settings, *, repo: str, pr_number: int, command: ParsedCommand, ) -> tuple[dict[str, Any], RepoReviewConfig]: gitea = GiteaClient(settings) prompt, _diff_context, repo_cfg = prepare_review_prompt(settings, gitea, repo, pr_number, command) container_name = f"codex-review-{uuid.uuid4().hex[:12]}" extra_env: dict[str, str] = {} if settings.codex_auth_mode == "chatgpt": extra_env["CODEX_AUTH_JSON_B64"] = _load_codex_auth_json_b64(settings) try: completed = _run_ephemeral_container( settings, container_name=container_name, prompt=prompt, extra_env=extra_env, include_reasoning_effort=True, ) if _needs_reasoning_effort_compat_retry(completed): logger.info("Ephemeral runner does not support --reasoning-effort; retrying without it.") completed = _run_ephemeral_container( settings, container_name=container_name, prompt=prompt, extra_env=extra_env, include_reasoning_effort=False, ) if completed.returncode != 0: raise RuntimeError(_format_runner_failure(completed)) parsed = _parse_codex_exec_stdout(completed.stdout) parsed["_meta"] = _extract_result_meta_from_codex_stdout(completed.stdout, settings) return normalize_review_result(parsed), repo_cfg except Exception as exc: logger.warning("Ephemeral runner failed without host fallback: %s", exc) return _ephemeral_runner_failure_result(exc, settings.codex_auth_mode), repo_cfg def _run_ephemeral_container( settings: Settings, *, container_name: str, prompt: str, extra_env: dict[str, str], include_reasoning_effort: bool, ) -> subprocess.CompletedProcess[str]: install_and_run = _build_install_and_run_command(settings, include_reasoning_effort=include_reasoning_effort) cmd = _build_docker_command(settings, container_name=container_name, install_and_run=install_and_run) return subprocess.run( cmd, input=prompt, text=True, check=False, capture_output=True, timeout=settings.max_review_minutes * 60, env={**os.environ, **extra_env}, ) def _build_install_and_run_command(settings: Settings, *, include_reasoning_effort: bool = True) -> str: steps = ["set -euo pipefail"] if settings.codex_auth_mode == "chatgpt": steps.extend( [ f"mkdir -p {CONTAINER_CODEX_HOME}", 'printf "%s" "$CODEX_AUTH_JSON_B64" | base64 -d > /root/.codex/auth.json', f"chmod 600 {CONTAINER_CODEX_HOME}/auth.json", ] ) steps.extend( [ "apt-get update >/tmp/apt-update.log 2>&1 && apt-get install -y --no-install-recommends ca-certificates >/tmp/apt-install.log 2>&1 || { rc=$?; echo 'ca-certificates install failed'; tail -n 80 /tmp/apt-update.log || true; tail -n 80 /tmp/apt-install.log || true; exit $rc; }", "npm install -g @openai/codex >/tmp/codex-install.log 2>&1 || { rc=$?; echo 'codex install failed'; tail -n 200 /tmp/codex-install.log || true; exit $rc; }", ] ) model = settings.openai_review_model.strip() reasoning_effort = settings.openai_reasoning_effort.strip() codex_exec_parts = ["codex exec --skip-git-repo-check --json"] if model: codex_exec_parts.append(f"-m {shlex.quote(model)}") if include_reasoning_effort and reasoning_effort: codex_exec_parts.append(f"--reasoning-effort {shlex.quote(reasoning_effort)}") steps.append(" ".join(codex_exec_parts)) return "; ".join(steps) def _needs_reasoning_effort_compat_retry(completed: subprocess.CompletedProcess[str]) -> bool: if completed.returncode == 0: return False stderr_text = completed.stderr or "" return "unexpected argument '--reasoning-effort' found" in stderr_text def _build_docker_command(settings: Settings, *, container_name: str, install_and_run: str) -> list[str]: cmd = [ "docker", "run", "--rm", "-i", "--name", container_name, "-e", "CODEX_DISABLE_TELEMETRY=1", ] if settings.codex_auth_mode == "chatgpt": cmd.extend( [ "-e", f"CODEX_HOME={CONTAINER_CODEX_HOME}", "-e", "CODEX_AUTH_JSON_B64", ] ) else: cmd.extend( [ "-e", "OPENAI_API_KEY", "-e", "OPENAI_ORG_ID", "-e", "OPENAI_PROJECT_ID", ] ) cmd.extend([settings.review_runner_image, "bash", "-lc", install_and_run]) return cmd def _ephemeral_runner_failure_result(exc: Exception, auth_mode: str) -> dict[str, Any]: message = str(exc).strip() or exc.__class__.__name__ mode_label = "ChatGPT auth" if auth_mode == "chatgpt" else "API-key auth" summary = f"{mode_label} runner failed before review execution. Error: {message}" return { "verdict": "has_issues", "confidence": 0.6, "summary": summary, "findings": [ { "severity": "high", "file": "runner", "line_start": 1, "line_end": 1, "title": "Ephemeral review runner failed", "body": message, "suggestion": "Check ephemeral runner logs for model/auth/network issues, then rerun @codex review.", } ], } def _format_runner_failure(completed: subprocess.CompletedProcess[str]) -> str: stdout_tail = _tail_text(completed.stdout) stderr_tail = _tail_text(completed.stderr) message = f"ephemeral runner exited with code {completed.returncode}" if stdout_tail: message = f"{message}; stdout_tail={stdout_tail}" if stderr_tail: message = f"{message}; stderr_tail={stderr_tail}" return message def _tail_text(text: str, limit: int = 1200) -> str: compact = " ".join(text.split()) if len(compact) <= limit: return compact return f"...{compact[-limit:]}" def _resolve_codex_auth_json_path(settings: Settings) -> Path: raw_path = settings.codex_auth_json_path.strip() if settings.codex_auth_json_path else "~/.codex/auth.json" path = Path(raw_path).expanduser() if not path.exists() or not path.is_file(): raise FileNotFoundError( f"CODEX_AUTH_MODE=chatgpt requires a readable auth.json file. Checked path: {path}" ) return path.resolve() def _load_codex_auth_json_b64(settings: Settings) -> str: auth_path = _resolve_codex_auth_json_path(settings) content = auth_path.read_text(encoding="utf-8") # Validate JSON before handing it to the ephemeral runner. json.loads(content) return base64.b64encode(content.encode("utf-8")).decode("ascii") def ensure_workdir(path: str) -> Path: target = Path(path) target.mkdir(parents=True, exist_ok=True) return target def _parse_codex_exec_stdout(stdout: str) -> dict[str, Any]: last_text: str | None = None for line in stdout.splitlines(): line = line.strip() if not line: continue try: payload = json.loads(line) except json.JSONDecodeError: continue if isinstance(payload, dict) and {"verdict", "summary", "findings"}.issubset(payload.keys()): return payload extracted = _extract_text(payload) if extracted: last_text = extracted parsed = _parse_review_json_from_text(extracted) if parsed: return parsed if not last_text: raise RuntimeError("codex exec output did not include parseable review payload text") raise RuntimeError(f"codex exec output text did not contain review JSON; text_tail={_tail_text(last_text, 400)}") def _extract_result_meta_from_codex_stdout(stdout: str, settings: Settings) -> dict[str, Any]: model = settings.openai_review_model usage: dict[str, int] = {} for line in stdout.splitlines(): line = line.strip() if not line: continue try: payload = json.loads(line) except json.JSONDecodeError: continue discovered_model = _find_first_string_for_key(payload, "model") if discovered_model: model = discovered_model discovered_usage = _find_first_dict_for_key(payload, "usage") if isinstance(discovered_usage, dict): for output_key, source_key in ( ("input_tokens", "input_tokens"), ("output_tokens", "output_tokens"), ("total_tokens", "total_tokens"), ): value = discovered_usage.get(source_key) if isinstance(value, int): usage[output_key] = value return {"source": "ephemeral_runner", "model": model, "usage": usage} def _find_first_string_for_key(payload: Any, key: str) -> str | None: if isinstance(payload, dict): value = payload.get(key) if isinstance(value, str) and value.strip(): return value for nested in payload.values(): found = _find_first_string_for_key(nested, key) if found: return found if isinstance(payload, list): for item in payload: found = _find_first_string_for_key(item, key) if found: return found return None def _find_first_dict_for_key(payload: Any, key: str) -> dict[str, Any] | None: if isinstance(payload, dict): value = payload.get(key) if isinstance(value, dict): return value for nested in payload.values(): found = _find_first_dict_for_key(nested, key) if found: return found if isinstance(payload, list): for item in payload: found = _find_first_dict_for_key(item, key) if found: return found return None def _parse_review_json_from_text(text: str) -> dict[str, Any] | None: candidates: list[str] = [text.strip()] fenced = re.search(r"```(?:json)?\s*(\{.*\})\s*```", text, flags=re.DOTALL | re.IGNORECASE) if fenced: candidates.append(fenced.group(1).strip()) start = text.find("{") end = text.rfind("}") if start != -1 and end != -1 and end > start: candidates.append(text[start : end + 1].strip()) seen: set[str] = set() for candidate in candidates: if not candidate or candidate in seen: continue seen.add(candidate) try: payload = json.loads(candidate) except json.JSONDecodeError: continue if isinstance(payload, dict) and {"verdict", "summary", "findings"}.issubset(payload.keys()): return payload return None def _extract_text(payload: Any) -> str | None: if isinstance(payload, str): return payload if isinstance(payload, dict): for key in ("text", "message", "content", "output"): value = payload.get(key) text = _extract_text(value) if text: return text for value in payload.values(): if not isinstance(value, (dict, list)): continue text = _extract_text(value) if text: return text if isinstance(payload, list): for item in payload: text = _extract_text(item) if text: return text return None