136 lines
4.9 KiB
Python
136 lines
4.9 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
from urllib.parse import quote
|
|
|
|
import httpx
|
|
|
|
from gitea_codex_bot.config import Settings
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class PullRequestContext:
|
|
repo: str
|
|
pr_number: int
|
|
base_ref: str
|
|
base_sha: str
|
|
head_ref: str
|
|
head_sha: str
|
|
clone_url: str
|
|
html_url: str
|
|
is_fork: bool
|
|
base_clone_url: str | None = None
|
|
head_clone_url: str | None = None
|
|
|
|
|
|
class GiteaClient:
|
|
def __init__(self, settings: Settings) -> None:
|
|
self.settings = settings
|
|
self.base_url = settings.gitea_base_url
|
|
self.headers = {
|
|
"Authorization": f"token {settings.gitea_token.get_secret_value()}",
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
def _request(self, method: str, path: str, *, json_body: dict[str, Any] | None = None) -> Any:
|
|
with httpx.Client(timeout=20.0) as client:
|
|
response = client.request(
|
|
method,
|
|
f"{self.base_url}{path}",
|
|
headers=self.headers,
|
|
json=json_body,
|
|
)
|
|
response.raise_for_status()
|
|
if response.status_code == 204:
|
|
return None
|
|
return response.json()
|
|
|
|
@staticmethod
|
|
def split_repo(repo: str) -> tuple[str, str]:
|
|
owner, name = repo.split("/", 1)
|
|
return owner, name
|
|
|
|
def get_pull_request(self, repo: str, pr_number: int) -> PullRequestContext:
|
|
owner, name = self.split_repo(repo)
|
|
encoded_owner = quote(owner, safe="")
|
|
encoded_name = quote(name, safe="")
|
|
payload = self._request("GET", f"/api/v1/repos/{encoded_owner}/{encoded_name}/pulls/{pr_number}")
|
|
base_clone_url = payload["base"]["repo"]["clone_url"]
|
|
head_clone_url = payload["head"]["repo"]["clone_url"]
|
|
return PullRequestContext(
|
|
repo=repo,
|
|
pr_number=pr_number,
|
|
base_ref=payload["base"]["ref"],
|
|
base_sha=payload["base"]["sha"],
|
|
head_ref=payload["head"]["ref"],
|
|
head_sha=payload["head"]["sha"],
|
|
clone_url=head_clone_url,
|
|
base_clone_url=base_clone_url,
|
|
head_clone_url=head_clone_url,
|
|
html_url=payload["html_url"],
|
|
is_fork=bool(payload["head"]["repo"]["full_name"] != payload["base"]["repo"]["full_name"]),
|
|
)
|
|
|
|
def post_issue_comment(self, repo: str, pr_number: int, body: str) -> int:
|
|
owner, name = self.split_repo(repo)
|
|
encoded_owner = quote(owner, safe="")
|
|
encoded_name = quote(name, safe="")
|
|
payload = self._request(
|
|
"POST",
|
|
f"/api/v1/repos/{encoded_owner}/{encoded_name}/issues/{pr_number}/comments",
|
|
json_body={"body": body},
|
|
)
|
|
return int(payload["id"])
|
|
|
|
def edit_issue_comment(self, repo: str, comment_id: int, body: str) -> int:
|
|
owner, name = self.split_repo(repo)
|
|
encoded_owner = quote(owner, safe="")
|
|
encoded_name = quote(name, safe="")
|
|
payload = self._request(
|
|
"PATCH",
|
|
f"/api/v1/repos/{encoded_owner}/{encoded_name}/issues/comments/{comment_id}",
|
|
json_body={"body": body},
|
|
)
|
|
return int(payload["id"])
|
|
|
|
def get_issue_comment(self, repo: str, comment_id: int) -> dict[str, Any]:
|
|
owner, name = self.split_repo(repo)
|
|
encoded_owner = quote(owner, safe="")
|
|
encoded_name = quote(name, safe="")
|
|
payload = self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{encoded_owner}/{encoded_name}/issues/comments/{comment_id}",
|
|
)
|
|
return dict(payload)
|
|
|
|
def list_issue_comments(self, repo: str, pr_number: int) -> list[dict[str, Any]]:
|
|
owner, name = self.split_repo(repo)
|
|
encoded_owner = quote(owner, safe="")
|
|
encoded_name = quote(name, safe="")
|
|
payload = self._request("GET", f"/api/v1/repos/{encoded_owner}/{encoded_name}/issues/{pr_number}/comments")
|
|
return list(payload)
|
|
|
|
def get_file_content(self, repo: str, path: str, *, ref: str) -> str | None:
|
|
owner, name = self.split_repo(repo)
|
|
encoded_owner = quote(owner, safe="")
|
|
encoded_name = quote(name, safe="")
|
|
encoded_path = quote(path, safe="")
|
|
try:
|
|
payload = self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{encoded_owner}/{encoded_name}/contents/{encoded_path}?ref={quote(ref, safe='')}",
|
|
)
|
|
except httpx.HTTPStatusError as exc:
|
|
if exc.response.status_code == 404:
|
|
return None
|
|
raise
|
|
content = payload.get("content")
|
|
encoding = payload.get("encoding")
|
|
if not isinstance(content, str) or encoding != "base64":
|
|
return None
|
|
decoded = base64.b64decode(content.encode("ascii"))
|
|
return decoded.decode("utf-8", errors="ignore")
|