Files
gitea-codex/src/gitea_codex_bot/workers/container_runner.py
2026-05-22 22:37:53 +02:00

312 lines
11 KiB
Python

from __future__ import annotations
import base64
import json
import logging
import os
import re
import shlex
import subprocess
import uuid
from pathlib import Path
from typing import Any
from gitea_codex_bot.config import Settings
from gitea_codex_bot.services.gitea import GiteaClient
from gitea_codex_bot.services.repo_config import RepoReviewConfig
from gitea_codex_bot.services.reviewer import normalize_review_result, prepare_review_prompt, run_review_for_pr
from gitea_codex_bot.types import ParsedCommand
CONTAINER_CODEX_HOME = "/root/.codex"
logger = logging.getLogger(__name__)
def run_review_ephemeral(
settings: Settings,
*,
repo: str,
pr_number: int,
command: ParsedCommand,
) -> tuple[dict[str, Any], RepoReviewConfig]:
gitea = GiteaClient(settings)
prompt, _diff_context, repo_cfg = prepare_review_prompt(settings, gitea, repo, pr_number, command)
container_name = f"codex-review-{uuid.uuid4().hex[:12]}"
install_and_run = _build_install_and_run_command(settings)
extra_env: dict[str, str] = {}
if settings.codex_auth_mode == "chatgpt":
extra_env["CODEX_AUTH_JSON_B64"] = _load_codex_auth_json_b64(settings)
cmd = _build_docker_command(settings, container_name=container_name, install_and_run=install_and_run)
try:
completed = subprocess.run(
cmd,
input=prompt,
text=True,
check=False,
capture_output=True,
timeout=settings.max_review_minutes * 60,
env={**os.environ, **extra_env},
)
if completed.returncode != 0:
raise RuntimeError(_format_runner_failure(completed))
parsed = _parse_codex_exec_stdout(completed.stdout)
parsed["_meta"] = _extract_result_meta_from_codex_stdout(completed.stdout, settings)
return normalize_review_result(parsed), repo_cfg
except Exception as exc:
if settings.codex_auth_mode == "chatgpt":
logger.warning("Ephemeral chatgpt runner failed, skipping API-key fallback: %s", exc)
return _chatgpt_runner_failure_result(exc), repo_cfg
result, _repo_cfg = run_review_for_pr(settings, gitea, repo, pr_number, command)
return result, _repo_cfg
def _build_install_and_run_command(settings: Settings) -> str:
steps = ["set -euo pipefail"]
if settings.codex_auth_mode == "chatgpt":
steps.extend(
[
f"mkdir -p {CONTAINER_CODEX_HOME}",
'printf "%s" "$CODEX_AUTH_JSON_B64" | base64 -d > /root/.codex/auth.json',
f"chmod 600 {CONTAINER_CODEX_HOME}/auth.json",
]
)
steps.extend(
[
"apt-get update >/tmp/apt-update.log 2>&1 && apt-get install -y --no-install-recommends ca-certificates >/tmp/apt-install.log 2>&1 || { rc=$?; echo 'ca-certificates install failed'; tail -n 80 /tmp/apt-update.log || true; tail -n 80 /tmp/apt-install.log || true; exit $rc; }",
"npm install -g @openai/codex >/tmp/codex-install.log 2>&1 || { rc=$?; echo 'codex install failed'; tail -n 200 /tmp/codex-install.log || true; exit $rc; }",
]
)
model = settings.openai_review_model.strip()
if model:
steps.append(f"codex exec --skip-git-repo-check --json -m {shlex.quote(model)}")
else:
steps.append("codex exec --skip-git-repo-check --json")
return "; ".join(steps)
def _build_docker_command(settings: Settings, *, container_name: str, install_and_run: str) -> list[str]:
cmd = [
"docker",
"run",
"--rm",
"-i",
"--name",
container_name,
"-e",
"CODEX_DISABLE_TELEMETRY=1",
]
if settings.codex_auth_mode == "chatgpt":
cmd.extend(
[
"-e",
f"CODEX_HOME={CONTAINER_CODEX_HOME}",
"-e",
"CODEX_AUTH_JSON_B64",
]
)
else:
cmd.extend(
[
"-e",
"OPENAI_API_KEY",
"-e",
"OPENAI_ORG_ID",
"-e",
"OPENAI_PROJECT_ID",
]
)
cmd.extend([settings.review_runner_image, "bash", "-lc", install_and_run])
return cmd
def _chatgpt_runner_failure_result(exc: Exception) -> dict[str, Any]:
message = str(exc).strip() or exc.__class__.__name__
summary = f"ChatGPT auth runner failed before review execution. Error: {message}"
return {
"verdict": "has_issues",
"confidence": 0.6,
"summary": summary,
"findings": [
{
"severity": "high",
"file": "runner",
"line_start": 1,
"line_end": 1,
"title": "Ephemeral chatgpt review runner failed",
"body": message,
"suggestion": "Check ephemeral runner logs for model/auth/network issues, then rerun @codex review.",
}
],
}
def _format_runner_failure(completed: subprocess.CompletedProcess[str]) -> str:
stdout_tail = _tail_text(completed.stdout)
stderr_tail = _tail_text(completed.stderr)
message = f"ephemeral runner exited with code {completed.returncode}"
if stdout_tail:
message = f"{message}; stdout_tail={stdout_tail}"
if stderr_tail:
message = f"{message}; stderr_tail={stderr_tail}"
return message
def _tail_text(text: str, limit: int = 1200) -> str:
compact = " ".join(text.split())
if len(compact) <= limit:
return compact
return f"...{compact[-limit:]}"
def _resolve_codex_auth_json_path(settings: Settings) -> Path:
raw_path = settings.codex_auth_json_path.strip() if settings.codex_auth_json_path else "~/.codex/auth.json"
path = Path(raw_path).expanduser()
if not path.exists() or not path.is_file():
raise FileNotFoundError(
f"CODEX_AUTH_MODE=chatgpt requires a readable auth.json file. Checked path: {path}"
)
return path.resolve()
def _load_codex_auth_json_b64(settings: Settings) -> str:
auth_path = _resolve_codex_auth_json_path(settings)
content = auth_path.read_text(encoding="utf-8")
# Validate JSON before handing it to the ephemeral runner.
json.loads(content)
return base64.b64encode(content.encode("utf-8")).decode("ascii")
def ensure_workdir(path: str) -> Path:
target = Path(path)
target.mkdir(parents=True, exist_ok=True)
return target
def _parse_codex_exec_stdout(stdout: str) -> dict[str, Any]:
last_text: str | None = None
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
payload = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(payload, dict) and {"verdict", "summary", "findings"}.issubset(payload.keys()):
return payload
extracted = _extract_text(payload)
if extracted:
last_text = extracted
parsed = _parse_review_json_from_text(extracted)
if parsed:
return parsed
if not last_text:
raise RuntimeError("codex exec output did not include parseable review payload text")
raise RuntimeError(f"codex exec output text did not contain review JSON; text_tail={_tail_text(last_text, 400)}")
def _extract_result_meta_from_codex_stdout(stdout: str, settings: Settings) -> dict[str, Any]:
model = settings.openai_review_model
usage: dict[str, int] = {}
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
payload = json.loads(line)
except json.JSONDecodeError:
continue
discovered_model = _find_first_string_for_key(payload, "model")
if discovered_model:
model = discovered_model
discovered_usage = _find_first_dict_for_key(payload, "usage")
if isinstance(discovered_usage, dict):
for output_key, source_key in (
("input_tokens", "input_tokens"),
("output_tokens", "output_tokens"),
("total_tokens", "total_tokens"),
):
value = discovered_usage.get(source_key)
if isinstance(value, int):
usage[output_key] = value
return {"source": "ephemeral_runner", "model": model, "usage": usage}
def _find_first_string_for_key(payload: Any, key: str) -> str | None:
if isinstance(payload, dict):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value
for nested in payload.values():
found = _find_first_string_for_key(nested, key)
if found:
return found
if isinstance(payload, list):
for item in payload:
found = _find_first_string_for_key(item, key)
if found:
return found
return None
def _find_first_dict_for_key(payload: Any, key: str) -> dict[str, Any] | None:
if isinstance(payload, dict):
value = payload.get(key)
if isinstance(value, dict):
return value
for nested in payload.values():
found = _find_first_dict_for_key(nested, key)
if found:
return found
if isinstance(payload, list):
for item in payload:
found = _find_first_dict_for_key(item, key)
if found:
return found
return None
def _parse_review_json_from_text(text: str) -> dict[str, Any] | None:
candidates: list[str] = [text.strip()]
fenced = re.search(r"```(?:json)?\s*(\{.*\})\s*```", text, flags=re.DOTALL | re.IGNORECASE)
if fenced:
candidates.append(fenced.group(1).strip())
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end > start:
candidates.append(text[start : end + 1].strip())
seen: set[str] = set()
for candidate in candidates:
if not candidate or candidate in seen:
continue
seen.add(candidate)
try:
payload = json.loads(candidate)
except json.JSONDecodeError:
continue
if isinstance(payload, dict) and {"verdict", "summary", "findings"}.issubset(payload.keys()):
return payload
return None
def _extract_text(payload: Any) -> str | None:
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
for key in ("text", "message", "content", "output"):
value = payload.get(key)
text = _extract_text(value)
if text:
return text
for value in payload.values():
if not isinstance(value, (dict, list)):
continue
text = _extract_text(value)
if text:
return text
if isinstance(payload, list):
for item in payload:
text = _extract_text(item)
if text:
return text
return None