from __future__ import annotations
import asyncio
import json
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
from fastapi import Depends, FastAPI, Header, HTTPException, Request, status
from fastapi.exception_handlers import http_exception_handler
from fastapi.responses import HTMLResponse
from starlette.exceptions import HTTPException as StarletteHTTPException
from sqlalchemy import select
from sqlalchemy.orm import Session
from gitea_codex_bot.config import Settings, get_settings
from gitea_codex_bot.db import get_session
from gitea_codex_bot.models import JobStatus, ReviewJob
from gitea_codex_bot.services.commands import detect_prefixed_command, parse_command
from gitea_codex_bot.services.gitea import GiteaClient
from gitea_codex_bot.services.jobs import cooldown_remaining_seconds, enqueue_job, persist_webhook_event
from gitea_codex_bot.services.repo_config import RepoReviewConfig, parse_repo_review_config_text
from gitea_codex_bot.services.review_format import (
format_cooldown_ack,
format_disabled_ack,
format_queue_ack,
format_unsupported_ack,
)
from gitea_codex_bot.services.security import verify_gitea_signature
from gitea_codex_bot.workers.dispatcher import worker_loop
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
logger = logging.getLogger(__name__)
def _validate_required_env(settings: Settings) -> None:
webhook_secret = settings.gitea_webhook_secret.get_secret_value()
if not webhook_secret.strip():
raise RuntimeError("GITEA_WEBHOOK_SECRET is required")
gitea_token = settings.gitea_token.get_secret_value()
if not gitea_token.strip():
raise RuntimeError("GITEA_TOKEN is required")
if not settings.allowed_repos.strip():
raise RuntimeError("ALLOWED_REPOS is required")
if settings.codex_auth_mode != "api_key":
return
api_key = settings.openai_api_key.get_secret_value() if settings.openai_api_key else ""
if not api_key.strip():
raise RuntimeError("OPENAI_API_KEY is required")
def _configured_auth_json_path(settings: Settings) -> Path:
raw_path = settings.codex_auth_json_path.strip() if settings.codex_auth_json_path else "~/.codex/auth.json"
return Path(raw_path).expanduser()
def _log_startup_identity(settings: Settings) -> None:
logger.info(
"Bot startup identity: username=%s gitea_base_url=%s auth_mode=%s",
settings.gitea_bot_username,
settings.gitea_base_url,
settings.codex_auth_mode,
)
def _log_startup_auth_json_status(settings: Settings) -> None:
if settings.codex_auth_mode != "chatgpt":
logger.info("Codex auth configuration: mode=api_key (auth.json not used)")
return
auth_path = _configured_auth_json_path(settings)
try:
content = auth_path.read_text(encoding="utf-8")
parsed = json.loads(content)
except FileNotFoundError:
logger.warning("Codex auth configuration: mode=chatgpt auth.json missing path=%s", auth_path)
return
except json.JSONDecodeError as exc:
logger.warning("Codex auth configuration: mode=chatgpt invalid auth.json path=%s error=%s", auth_path, exc.msg)
return
except OSError as exc:
logger.warning("Codex auth configuration: mode=chatgpt auth.json unreadable path=%s error=%s", auth_path, exc)
return
root_type = type(parsed).__name__
configured_mode = parsed.get("auth_mode") if isinstance(parsed, dict) else None
logger.info(
"Codex auth configuration: mode=chatgpt auth.json valid path=%s root_type=%s auth_mode=%s",
auth_path,
root_type,
configured_mode or "unknown",
)
def _extract_pr_event(payload: dict[str, Any], event_name: str) -> tuple[str, int, str, int, str] | None:
repository = payload.get("repository", {})
repo = repository.get("full_name")
if not repo:
return None
sender = payload.get("sender", {})
sender_username = sender.get("username", "")
comment = payload.get("comment", {})
comment_id = int(comment.get("id", 0) or 0)
if comment_id <= 0:
return None
if event_name == "issue_comment":
issue = payload.get("issue", {})
if not issue.get("pull_request"):
return None
pr_number = int(issue.get("number", 0) or 0)
head_sha = payload.get("pull_request", {}).get("head", {}).get("sha", "")
elif event_name == "pull_request_comment":
pull_request = payload.get("pull_request", {})
if not pull_request:
return None
pr_number = int(pull_request.get("number", 0) or 0)
head_sha = pull_request.get("head", {}).get("sha", "")
else:
return None
if pr_number <= 0:
return None
if not head_sha:
head_sha = "unknown"
return repo, pr_number, head_sha, comment_id, sender_username
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
_validate_required_env(settings)
_log_startup_identity(settings)
_log_startup_auth_json_status(settings)
stop_event = asyncio.Event()
task = asyncio.create_task(worker_loop(settings, stop_event))
app.state.worker_stop_event = stop_event
app.state.worker_task = task
try:
yield
finally:
stop_event.set()
await task
app = FastAPI(title="Gitea Codex Review Bot", lifespan=lifespan)
def _load_repo_review_config_for_pr(gitea: GiteaClient, repo: str, pr_number: int) -> tuple[RepoReviewConfig, str]:
pr_ctx = gitea.get_pull_request(repo, pr_number)
head_sha = pr_ctx.head_sha
cfg_text = gitea.get_file_content(repo, ".codex-review.yml", ref=head_sha)
if cfg_text is None:
return RepoReviewConfig(configured=False), head_sha
return parse_repo_review_config_text(cfg_text, configured=True), head_sha
def _resolve_pr_head_sha(gitea: GiteaClient, repo: str, pr_number: int, fallback: str) -> str:
try:
return gitea.get_pull_request(repo, pr_number).head_sha
except Exception:
return fallback
def _render_landing_page() -> str:
return """
Gitea Codex Review Bot
Webhook Service
Gitea Codex Review Bot
This endpoint powers automated pull request review workflows for Gitea. It validates signed webhook events, queues review jobs, and posts structured feedback back to pull requests.
Webhook: POST /webhook/gitea
Health Check
Loading...
"""
def _render_browser_404_page() -> str:
return """
Not Found
Error 404
Page not found
This service exposes only a small set of routes. Head back to the home page for a quick overview.
Go to home
"""
@app.exception_handler(StarletteHTTPException)
async def custom_http_exception_handler(request: Request, exc: StarletteHTTPException):
if exc.status_code == status.HTTP_404_NOT_FOUND:
accept = request.headers.get("accept", "")
if "text/html" in accept.lower():
return HTMLResponse(content=_render_browser_404_page(), status_code=status.HTTP_404_NOT_FOUND)
return await http_exception_handler(request, exc)
@app.get("/", response_class=HTMLResponse)
def root() -> str:
return _render_landing_page()
@app.get("/healthz")
def healthz(settings: Settings = Depends(get_settings)) -> dict[str, str]:
_ = settings.gitea_base_url
return {"status": "ok"}
@app.get("/healthz/latest-failure")
def healthz_latest_failure(session: Session = Depends(get_session)) -> dict[str, Any]:
failed_job = session.execute(
select(ReviewJob).where(ReviewJob.status == JobStatus.failed).order_by(ReviewJob.created_at.desc(), ReviewJob.id.desc()).limit(1)
).scalar_one_or_none()
if not failed_job:
return {"status": "ok", "has_failed_job": False}
return {
"status": "ok",
"has_failed_job": True,
"job_id": failed_job.id,
"repo": failed_job.repo,
"pr_number": failed_job.pr_number,
"command": failed_job.command,
"head_sha": failed_job.head_sha,
"error": failed_job.last_error or "",
"failed_at": failed_job.finished_at.isoformat() if failed_job.finished_at else None,
}
@app.get("/healthz/latest-job")
def healthz_latest_job(session: Session = Depends(get_session)) -> dict[str, Any]:
latest_job = session.execute(select(ReviewJob).order_by(ReviewJob.created_at.desc(), ReviewJob.id.desc()).limit(1)).scalar_one_or_none()
if not latest_job:
return {"status": "ok", "has_job": False}
result_summary = ""
if isinstance(latest_job.result_json, dict):
summary = latest_job.result_json.get("summary")
if isinstance(summary, str):
result_summary = summary
return {
"status": "ok",
"has_job": True,
"job_id": latest_job.id,
"repo": latest_job.repo,
"pr_number": latest_job.pr_number,
"command": latest_job.command,
"head_sha": latest_job.head_sha,
"job_status": latest_job.status.value if hasattr(latest_job.status, "value") else str(latest_job.status),
"error": latest_job.last_error or "",
"result_summary": result_summary,
"created_at": latest_job.created_at.isoformat() if latest_job.created_at else None,
"started_at": latest_job.started_at.isoformat() if latest_job.started_at else None,
"finished_at": latest_job.finished_at.isoformat() if latest_job.finished_at else None,
}
@app.post("/webhook/gitea")
async def gitea_webhook(
request: Request,
x_gitea_event: str | None = Header(default=None),
x_gitea_delivery: str | None = Header(default=None),
x_gitea_signature: str | None = Header(default=None),
session: Session = Depends(get_session),
settings: Settings = Depends(get_settings),
) -> dict[str, Any]:
payload_bytes = await request.body()
if not verify_gitea_signature(payload_bytes, settings.gitea_webhook_secret.get_secret_value(), x_gitea_signature):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid signature")
event_name = (x_gitea_event or "").strip()
if event_name not in {"issue_comment", "pull_request_comment"}:
return {"accepted": False, "reason": "event ignored"}
payload = await request.json()
extracted = _extract_pr_event(payload, event_name)
if not extracted:
return {"accepted": False, "reason": "not a pull request comment"}
repo, pr_number, head_sha, comment_id, sender_username = extracted
if sender_username == settings.gitea_bot_username:
return {"accepted": False, "reason": "bot comment ignored"}
if repo not in settings.allowed_repo_set:
logger.info(
"Webhook ignored: repo not in ALLOWED_REPOS repo=%s pr=%s comment_id=%s sender=%s",
repo,
pr_number,
comment_id,
sender_username,
)
return {"accepted": False, "reason": "repo not allowed"}
comment_body = str(payload.get("comment", {}).get("body", "")).strip()
parsed_command = parse_command(comment_body, aliases=settings.bot_command_aliases)
if not parsed_command:
attempted_command = detect_prefixed_command(comment_body, aliases=settings.bot_command_aliases)
if attempted_command:
gitea = GiteaClient(settings)
if attempted_command == "fix":
gitea.post_issue_comment(repo, pr_number, "⚠️ `@codex fix` is no longer supported on this bot.")
return {"accepted": False, "reason": "unsupported command", "command": attempted_command}
gitea.post_issue_comment(
repo,
pr_number,
f"⚠️ Command `@codex {attempted_command}` is not supported. Try `@codex -h`.",
)
return {"accepted": False, "reason": "unsupported command", "command": attempted_command}
logger.info(
"Webhook ignored: no @codex review command repo=%s pr=%s comment_id=%s sender=%s",
repo,
pr_number,
comment_id,
sender_username,
)
return {"accepted": False, "reason": "no codex command"}
if parsed_command.name != "review":
logger.info(
"Webhook without @codex review command repo=%s pr=%s comment_id=%s sender=%s parsed_command=%s",
repo,
pr_number,
comment_id,
sender_username,
parsed_command.name,
)
inserted = persist_webhook_event(
session,
delivery_id=x_gitea_delivery,
event_name=event_name,
repo=repo,
comment_id=comment_id,
payload=payload_bytes,
)
if not inserted:
return {"accepted": True, "reason": "duplicate event"}
gitea = GiteaClient(settings)
if parsed_command.name in {"review", "rerun"}:
head_sha = _resolve_pr_head_sha(gitea, repo, pr_number, head_sha)
repo_cfg: RepoReviewConfig | None = None
try:
repo_cfg, resolved_head_sha = _load_repo_review_config_for_pr(gitea, repo, pr_number)
head_sha = resolved_head_sha
except Exception:
repo_cfg = None
if head_sha == "unknown":
head_sha = _resolve_pr_head_sha(gitea, repo, pr_number, head_sha)
if repo_cfg and not repo_cfg.enabled:
gitea.post_issue_comment(repo, pr_number, format_disabled_ack())
return {"accepted": True, "reason": "review disabled by repo config"}
if parsed_command.name != "rerun":
remaining = cooldown_remaining_seconds(session, repo, pr_number, settings.cooldown_seconds)
if remaining > 0:
gitea.post_issue_comment(repo, pr_number, format_cooldown_ack(remaining))
return {"accepted": True, "reason": "cooldown active", "cooldown_seconds_remaining": remaining}
job = enqueue_job(
session,
repo=repo,
pr_number=pr_number,
head_sha=head_sha,
trigger_comment_id=comment_id,
trigger_comment_body=comment_body,
requested_by=sender_username,
command=parsed_command,
)
gitea.post_issue_comment(repo, pr_number, format_queue_ack(head_sha))
return {"accepted": True, "job_id": job.id, "status": "queued"}
if parsed_command.name in {"explain", "ignore", "help"}:
job = enqueue_job(
session,
repo=repo,
pr_number=pr_number,
head_sha=head_sha,
trigger_comment_id=comment_id,
trigger_comment_body=comment_body,
requested_by=sender_username,
command=parsed_command,
)
return {"accepted": True, "job_id": job.id, "status": "queued"}
gitea.post_issue_comment(repo, pr_number, format_unsupported_ack(parsed_command))
return {"accepted": False, "reason": "unsupported command"}