72 lines
2.2 KiB
Python
72 lines
2.2 KiB
Python
from datetime import UTC, date, datetime
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
|
|
class GiteaSourceError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def _parse_gitea_date(raw: Any) -> str | None:
|
|
if isinstance(raw, int):
|
|
return datetime.fromtimestamp(raw, tz=UTC).date().isoformat()
|
|
if isinstance(raw, float):
|
|
return datetime.fromtimestamp(int(raw), tz=UTC).date().isoformat()
|
|
if isinstance(raw, str):
|
|
try:
|
|
if len(raw) == 10 and raw[4] == "-" and raw[7] == "-":
|
|
return raw
|
|
if raw.isdigit():
|
|
return datetime.fromtimestamp(int(raw), tz=UTC).date().isoformat()
|
|
return datetime.fromisoformat(raw.replace("Z", "+00:00")).date().isoformat()
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
async def fetch_gitea_activity(
|
|
base_url: str,
|
|
username: str,
|
|
token: str | None,
|
|
from_date: date,
|
|
to_date: date,
|
|
timeout_seconds: float = 20.0,
|
|
) -> dict[str, int]:
|
|
endpoint = f"{base_url.rstrip('/')}/api/v1/users/{username}/heatmap"
|
|
headers: dict[str, str] = {"Accept": "application/json"}
|
|
if token:
|
|
headers["Authorization"] = f"token {token}"
|
|
|
|
async with httpx.AsyncClient(timeout=timeout_seconds) as client:
|
|
response = await client.get(endpoint, headers=headers)
|
|
|
|
if response.status_code >= 400:
|
|
raise GiteaSourceError(f"Gitea heatmap request failed with status {response.status_code}")
|
|
|
|
payload = response.json()
|
|
if not isinstance(payload, list):
|
|
raise GiteaSourceError("Unexpected Gitea heatmap payload format")
|
|
|
|
normalized: dict[str, int] = {}
|
|
for item in payload:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
|
|
date_key = _parse_gitea_date(item.get("date") or item.get("timestamp") or item.get("day"))
|
|
if not date_key:
|
|
continue
|
|
|
|
count = item.get("contributions")
|
|
if count is None:
|
|
count = item.get("count", 0)
|
|
try:
|
|
count_int = int(count)
|
|
except (TypeError, ValueError):
|
|
count_int = 0
|
|
|
|
if from_date.isoformat() <= date_key <= to_date.isoformat():
|
|
normalized[date_key] = normalized.get(date_key, 0) + count_int
|
|
|
|
return normalized
|