first commit

This commit is contained in:
Space-Banane
2026-05-29 19:15:00 +02:00
commit a54d1cfeaf
25 changed files with 1183 additions and 0 deletions

0
app/__init__.py Normal file
View File

63
app/cache.py Normal file
View File

@@ -0,0 +1,63 @@
import json
import os
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass
class CacheEntry:
value: Any
stale: bool
class FileCache:
def __init__(self, cache_dir: str, default_ttl_seconds: int) -> None:
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.default_ttl_seconds = default_ttl_seconds
def _path_for(self, key: str) -> Path:
safe_key = "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in key)
return self.cache_dir / f"{safe_key}.json"
def _read_raw(self, key: str) -> dict[str, Any] | None:
path = self._path_for(key)
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
def get_json(self, key: str, allow_stale: bool = False) -> CacheEntry | None:
payload = self._read_raw(key)
if payload is None:
return None
expires_at = float(payload.get("expires_at", 0))
value = payload.get("value")
stale = time.time() > expires_at
if stale and not allow_stale:
return None
return CacheEntry(value=value, stale=stale)
def set_json(self, key: str, value: Any, ttl_seconds: int | None = None) -> None:
ttl = ttl_seconds if ttl_seconds is not None else self.default_ttl_seconds
payload = {
"expires_at": time.time() + max(ttl, 0),
"value": value,
}
self._atomic_write_json(self._path_for(key), payload)
def _atomic_write_json(self, path: Path, payload: dict[str, Any]) -> None:
fd, tmp_name = tempfile.mkstemp(prefix="cache_", suffix=".tmp", dir=str(self.cache_dir))
try:
with os.fdopen(fd, "w", encoding="utf-8") as tmp_file:
json.dump(payload, tmp_file, separators=(",", ":"))
os.replace(tmp_name, path)
finally:
if os.path.exists(tmp_name):
os.remove(tmp_name)

268
app/main.py Normal file
View File

@@ -0,0 +1,268 @@
import hashlib
import json
import logging
import os
from dataclasses import dataclass
from datetime import UTC, date, datetime, timedelta
from typing import Literal
from fastapi import Depends, FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse, Response
from app.cache import FileCache
from app.merge import filter_activity_source, merge_activity
from app.render.png import render_png_from_svg
from app.render.svg import render_activity_svg
from app.settings import Settings, get_settings
from app.sources.gitea import GiteaSourceError, fetch_gitea_activity
from app.sources.github import GitHubSourceError, fetch_github_activity
logger = logging.getLogger(__name__)
SourceType = Literal["all", "github", "gitea"]
ThemeType = Literal["light", "dark"]
@dataclass
class QueryOptions:
year: int | None
days: int
theme: ThemeType
source: SourceType
@dataclass
class ActivityResult:
merged: dict[str, dict[str, int]]
stale: bool
from_date: date
to_date: date
days_count: int
app = FastAPI(title=os.getenv("SERVICE_TITLE", "git-activity-merge"))
def get_cache(settings: Settings = Depends(get_settings)) -> FileCache:
return FileCache(cache_dir=settings.cache_dir, default_ttl_seconds=settings.cache_ttl_seconds)
def get_query_options(
settings: Settings = Depends(get_settings),
year: int | None = Query(default=None, ge=1970, le=2100),
days: int | None = Query(default=None, ge=1, le=5000),
theme: ThemeType | None = Query(default=None),
source: SourceType = Query(default="all"),
) -> QueryOptions:
if year is not None and days is not None:
raise HTTPException(status_code=400, detail="Provide either year or days, not both")
selected_theme = theme or ("dark" if settings.default_theme == "dark" else "light")
selected_days = days if days is not None else 365
return QueryOptions(year=year, days=selected_days, theme=selected_theme, source=source)
def compute_date_range(options: QueryOptions) -> tuple[date, date, int]:
today = datetime.now(tz=UTC).date()
if options.year is not None:
from_date = date(options.year, 1, 1)
to_date = date(options.year, 12, 31)
if options.year == today.year:
to_date = today
days_count = (to_date - from_date).days + 1
return from_date, to_date, days_count
to_date = today
from_date = to_date - timedelta(days=options.days - 1)
return from_date, to_date, options.days
def _date_keys(from_date: date, to_date: date) -> list[str]:
keys: list[str] = []
current = from_date
while current <= to_date:
keys.append(current.isoformat())
current += timedelta(days=1)
return keys
async def _fetch_with_cache(
cache: FileCache,
key: str,
fetcher,
) -> tuple[dict[str, int], bool]:
cached = cache.get_json(key, allow_stale=True)
if cached and not cached.stale:
return {k: int(v) for k, v in cached.value.items()}, False
try:
fresh = await fetcher()
cache.set_json(key, fresh)
return fresh, False
except (GitHubSourceError, GiteaSourceError, RuntimeError, ValueError) as exc:
if cached is not None:
logger.warning("source fetch failed for %s, serving stale cache: %s", key, exc)
return {k: int(v) for k, v in cached.value.items()}, True
raise
async def collect_merged_activity(
settings: Settings,
cache: FileCache,
options: QueryOptions,
) -> ActivityResult:
from_date, to_date, days_count = compute_date_range(options)
gh_key = f"github_{settings.github_username}_{from_date}_{to_date}"
gt_key = f"gitea_{settings.gitea_username}_{from_date}_{to_date}"
github_data: dict[str, int] = {}
gitea_data: dict[str, int] = {}
github_stale = False
gitea_stale = False
if options.source in ("all", "github"):
github_data, github_stale = await _fetch_with_cache(
cache,
gh_key,
lambda: fetch_github_activity(
username=settings.github_username,
token=settings.github_token,
from_date=from_date,
to_date=to_date,
),
)
if options.source in ("all", "gitea"):
gitea_data, gitea_stale = await _fetch_with_cache(
cache,
gt_key,
lambda: fetch_gitea_activity(
base_url=settings.gitea_base_url,
username=settings.gitea_username,
token=settings.gitea_token,
from_date=from_date,
to_date=to_date,
),
)
merged = merge_activity(github_data, gitea_data, dates=_date_keys(from_date, to_date))
merged = filter_activity_source(merged, options.source)
return ActivityResult(
merged=merged,
stale=github_stale or gitea_stale,
from_date=from_date,
to_date=to_date,
days_count=days_count,
)
def _daily_totals(merged: dict[str, dict[str, int]]) -> dict[str, int]:
return {day: int(payload.get("total", 0)) for day, payload in merged.items()}
def _image_cache_key(prefix: str, options: QueryOptions, result: ActivityResult) -> str:
digest = hashlib.sha1(
json.dumps(result.merged, sort_keys=True, separators=(",", ":")).encode("utf-8")
).hexdigest()
return f"{prefix}_{options.theme}_{options.source}_{result.from_date}_{result.to_date}_{digest}"
@app.get("/health")
async def health(settings: Settings = Depends(get_settings)) -> dict[str, str]:
return {"status": "ok", "service": settings.service_title}
@app.get("/activity.json")
async def activity_json(
settings: Settings = Depends(get_settings),
cache: FileCache = Depends(get_cache),
options: QueryOptions = Depends(get_query_options),
) -> JSONResponse:
try:
result = await collect_merged_activity(settings, cache, options)
except Exception as exc:
logger.exception("failed to fetch activity data")
raise HTTPException(status_code=502, detail="Failed to fetch activity data") from exc
total = sum(v.get("total", 0) for v in result.merged.values())
payload = {
"from": result.from_date.isoformat(),
"to": result.to_date.isoformat(),
"days": result.days_count,
"source": options.source,
"total": total,
"stale": result.stale,
"activity": result.merged,
}
return JSONResponse(content=payload)
@app.get("/activity.svg")
async def activity_svg(
settings: Settings = Depends(get_settings),
cache: FileCache = Depends(get_cache),
options: QueryOptions = Depends(get_query_options),
) -> Response:
try:
result = await collect_merged_activity(settings, cache, options)
except Exception as exc:
logger.exception("failed to fetch activity data")
raise HTTPException(status_code=502, detail="Failed to fetch activity data") from exc
cache_key = _image_cache_key("svg", options, result)
cached = cache.get_json(cache_key)
if cached is not None:
return Response(content=str(cached.value), media_type="image/svg+xml")
daily_totals = _daily_totals(result.merged)
total = sum(daily_totals.values())
svg = render_activity_svg(
daily_totals=daily_totals,
from_date=result.from_date,
to_date=result.to_date,
days_count=result.days_count,
total_contributions=total,
theme=options.theme,
)
cache.set_json(cache_key, svg)
return Response(content=svg, media_type="image/svg+xml")
@app.get("/activity.png")
async def activity_png(
settings: Settings = Depends(get_settings),
cache: FileCache = Depends(get_cache),
options: QueryOptions = Depends(get_query_options),
) -> Response:
try:
result = await collect_merged_activity(settings, cache, options)
except Exception as exc:
logger.exception("failed to fetch activity data")
raise HTTPException(status_code=502, detail="Failed to fetch activity data") from exc
cache_key = _image_cache_key("png", options, result)
cached = cache.get_json(cache_key)
if cached is not None:
png_data = bytes.fromhex(str(cached.value))
return Response(content=png_data, media_type="image/png")
daily_totals = _daily_totals(result.merged)
total = sum(daily_totals.values())
svg = render_activity_svg(
daily_totals=daily_totals,
from_date=result.from_date,
to_date=result.to_date,
days_count=result.days_count,
total_contributions=total,
theme=options.theme,
)
try:
png_data = render_png_from_svg(svg)
except Exception as exc:
logger.exception("failed to render png")
raise HTTPException(status_code=500, detail="PNG rendering failed") from exc
cache.set_json(cache_key, png_data.hex())
return Response(content=png_data, media_type="image/png")

42
app/merge.py Normal file
View File

@@ -0,0 +1,42 @@
from collections.abc import Iterable
def merge_activity(
github: dict[str, int],
gitea: dict[str, int],
dates: Iterable[str] | None = None,
) -> dict[str, dict[str, int]]:
keys = set(github) | set(gitea)
if dates is not None:
keys |= set(dates)
merged: dict[str, dict[str, int]] = {}
for date_key in sorted(keys):
gh = int(github.get(date_key, 0))
gt = int(gitea.get(date_key, 0))
merged[date_key] = {
"github": gh,
"gitea": gt,
"total": gh + gt,
}
return merged
def filter_activity_source(
merged: dict[str, dict[str, int]],
source: str,
) -> dict[str, dict[str, int]]:
if source == "all":
return merged
filtered: dict[str, dict[str, int]] = {}
for day, counts in merged.items():
github = counts.get("github", 0)
gitea = counts.get("gitea", 0)
if source == "github":
filtered[day] = {"github": github, "gitea": 0, "total": github}
elif source == "gitea":
filtered[day] = {"github": 0, "gitea": gitea, "total": gitea}
else:
filtered[day] = counts
return filtered

0
app/render/__init__.py Normal file
View File

4
app/render/png.py Normal file
View File

@@ -0,0 +1,4 @@
def render_png_from_svg(svg: str) -> bytes:
import cairosvg
return cairosvg.svg2png(bytestring=svg.encode("utf-8"))

125
app/render/svg.py Normal file
View File

@@ -0,0 +1,125 @@
from datetime import date, timedelta
THEMES: dict[str, dict[str, str | list[str]]] = {
"light": {
"bg": "#ffffff",
"text": "#24292f",
"muted": "#57606a",
"empty": "#ebedf0",
"levels": ["#ebedf0", "#9be9a8", "#40c463", "#30a14e", "#216e39"],
},
"dark": {
"bg": "#0d1117",
"text": "#c9d1d9",
"muted": "#8b949e",
"empty": "#161b22",
"levels": ["#161b22", "#0e4429", "#006d32", "#26a641", "#39d353"],
},
}
def _intensity_level(value: int, max_value: int) -> int:
if value <= 0 or max_value <= 0:
return 0
ratio = value / max_value
if ratio < 0.25:
return 1
if ratio < 0.5:
return 2
if ratio < 0.75:
return 3
return 4
def _build_calendar_days(from_date: date, to_date: date) -> list[date]:
days: list[date] = []
current = from_date
while current <= to_date:
days.append(current)
current += timedelta(days=1)
return days
def render_activity_svg(
daily_totals: dict[str, int],
from_date: date,
to_date: date,
days_count: int,
total_contributions: int,
theme: str,
) -> str:
palette = THEMES[theme if theme in THEMES else "light"]
levels = palette["levels"]
day_size = 11
gap = 3
left_padding = 42
top_padding = 30
month_label_height = 14
grid_top = top_padding + month_label_height
start = from_date - timedelta(days=(from_date.weekday() + 1) % 7)
end = to_date + timedelta(days=(5 - to_date.weekday()) % 7 + 1)
all_days = _build_calendar_days(start, end)
week_count = max(1, len(all_days) // 7)
grid_width = week_count * (day_size + gap)
grid_height = 7 * (day_size + gap)
width = left_padding + grid_width + 20
height = grid_top + grid_height + 34
max_value = max(daily_totals.values(), default=0)
lines: list[str] = []
lines.append(f'<svg xmlns="http://www.w3.org/2000/svg" width="{width}" height="{height}" role="img" aria-label="Contribution graph">')
lines.append(f'<rect width="100%" height="100%" fill="{palette["bg"]}" />')
lines.append(
f'<text x="{left_padding}" y="16" fill="{palette["text"]}" font-family="-apple-system,BlinkMacSystemFont,Segoe UI,Helvetica,Arial,sans-serif" font-size="12">{total_contributions} contributions in the last {days_count} days</text>'
)
month_x_positions: dict[str, int] = {}
for idx, day in enumerate(all_days):
if day.day == 1:
month_x_positions[day.strftime("%b")] = left_padding + (idx // 7) * (day_size + gap)
for label, x in month_x_positions.items():
lines.append(
f'<text x="{x}" y="{top_padding + 10}" fill="{palette["muted"]}" font-family="-apple-system,BlinkMacSystemFont,Segoe UI,Helvetica,Arial,sans-serif" font-size="10">{label}</text>'
)
weekday_labels = {1: "Mon", 3: "Wed", 5: "Fri"}
for row, label in weekday_labels.items():
y = grid_top + row * (day_size + gap) + 9
lines.append(
f'<text x="8" y="{y}" fill="{palette["muted"]}" font-family="-apple-system,BlinkMacSystemFont,Segoe UI,Helvetica,Arial,sans-serif" font-size="10">{label}</text>'
)
for idx, day in enumerate(all_days):
week = idx // 7
row = (day.weekday() + 1) % 7
x = left_padding + week * (day_size + gap)
y = grid_top + row * (day_size + gap)
value = daily_totals.get(day.isoformat(), 0)
level = _intensity_level(value, max_value)
fill = levels[level]
lines.append(
f'<rect x="{x}" y="{y}" width="{day_size}" height="{day_size}" rx="2" ry="2" fill="{fill}" />'
)
legend_y = grid_top + grid_height + 16
legend_x = left_padding + grid_width - 120
lines.append(
f'<text x="{legend_x}" y="{legend_y}" fill="{palette["muted"]}" font-family="-apple-system,BlinkMacSystemFont,Segoe UI,Helvetica,Arial,sans-serif" font-size="10">Less</text>'
)
for i, color in enumerate(levels):
lx = legend_x + 26 + i * (day_size + 2)
lines.append(
f'<rect x="{lx}" y="{legend_y - 9}" width="{day_size}" height="{day_size}" rx="2" ry="2" fill="{color}" />'
)
lines.append(
f'<text x="{legend_x + 26 + len(levels) * (day_size + 2) + 4}" y="{legend_y}" fill="{palette["muted"]}" font-family="-apple-system,BlinkMacSystemFont,Segoe UI,Helvetica,Arial,sans-serif" font-size="10">More</text>'
)
lines.append("</svg>")
return "".join(lines)

26
app/settings.py Normal file
View File

@@ -0,0 +1,26 @@
from functools import lru_cache
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", case_sensitive=False)
github_username: str = Field(alias="GITHUB_USERNAME")
github_token: str | None = Field(default=None, alias="GITHUB_TOKEN")
gitea_base_url: str = Field(alias="GITEA_BASE_URL")
gitea_username: str = Field(alias="GITEA_USERNAME")
gitea_token: str | None = Field(default=None, alias="GITEA_TOKEN")
cache_ttl_seconds: int = Field(default=3600, alias="CACHE_TTL_SECONDS")
cache_dir: str = Field(default="./cache", alias="CACHE_DIR")
default_theme: str = Field(default="light", alias="DEFAULT_THEME")
service_title: str = Field(default="git-activity-merge", alias="SERVICE_TITLE")
@lru_cache(maxsize=1)
def get_settings() -> Settings:
return Settings()

0
app/sources/__init__.py Normal file
View File

71
app/sources/gitea.py Normal file
View File

@@ -0,0 +1,71 @@
from datetime import UTC, date, datetime
from typing import Any
import httpx
class GiteaSourceError(RuntimeError):
pass
def _parse_gitea_date(raw: Any) -> str | None:
if isinstance(raw, int):
return datetime.fromtimestamp(raw, tz=UTC).date().isoformat()
if isinstance(raw, float):
return datetime.fromtimestamp(int(raw), tz=UTC).date().isoformat()
if isinstance(raw, str):
try:
if len(raw) == 10 and raw[4] == "-" and raw[7] == "-":
return raw
if raw.isdigit():
return datetime.fromtimestamp(int(raw), tz=UTC).date().isoformat()
return datetime.fromisoformat(raw.replace("Z", "+00:00")).date().isoformat()
except ValueError:
return None
return None
async def fetch_gitea_activity(
base_url: str,
username: str,
token: str | None,
from_date: date,
to_date: date,
timeout_seconds: float = 20.0,
) -> dict[str, int]:
endpoint = f"{base_url.rstrip('/')}/api/v1/users/{username}/heatmap"
headers: dict[str, str] = {"Accept": "application/json"}
if token:
headers["Authorization"] = f"token {token}"
async with httpx.AsyncClient(timeout=timeout_seconds) as client:
response = await client.get(endpoint, headers=headers)
if response.status_code >= 400:
raise GiteaSourceError(f"Gitea heatmap request failed with status {response.status_code}")
payload = response.json()
if not isinstance(payload, list):
raise GiteaSourceError("Unexpected Gitea heatmap payload format")
normalized: dict[str, int] = {}
for item in payload:
if not isinstance(item, dict):
continue
date_key = _parse_gitea_date(item.get("date") or item.get("timestamp") or item.get("day"))
if not date_key:
continue
count = item.get("contributions")
if count is None:
count = item.get("count", 0)
try:
count_int = int(count)
except (TypeError, ValueError):
count_int = 0
if from_date.isoformat() <= date_key <= to_date.isoformat():
normalized[date_key] = normalized.get(date_key, 0) + count_int
return normalized

142
app/sources/github.py Normal file
View File

@@ -0,0 +1,142 @@
import re
from datetime import date, datetime, timezone
from typing import Any
import httpx
GITHUB_GRAPHQL_URL = "https://api.github.com/graphql"
class GitHubSourceError(RuntimeError):
pass
def _extract_attr(tag: str, attr: str) -> str | None:
match = re.search(rf'{attr}="([^"]+)"', tag)
return match.group(1) if match else None
def _parse_public_contributions_html(html: str, from_date: date, to_date: date) -> dict[str, int]:
tooltip_by_id: dict[str, int] = {}
for tooltip_match in re.finditer(r'<tool-tip[^>]*for="([^"]+)"[^>]*>(.*?)</tool-tip>', html, flags=re.S):
cell_id = tooltip_match.group(1)
tooltip_text = re.sub(r"<[^>]+>", "", tooltip_match.group(2)).strip()
count_match = re.search(r"(\d[\d,]*)\s+contribution", tooltip_text, flags=re.I)
if not count_match:
if "No contributions" in tooltip_text:
tooltip_by_id[cell_id] = 0
continue
tooltip_by_id[cell_id] = int(count_match.group(1).replace(",", ""))
normalized: dict[str, int] = {}
for td_match in re.finditer(r"<td[^>]*ContributionCalendar-day[^>]*></td>", html, flags=re.S):
tag = td_match.group(0)
date_key = _extract_attr(tag, "data-date")
cell_id = _extract_attr(tag, "id")
if not date_key or not cell_id:
continue
if from_date.isoformat() <= date_key <= to_date.isoformat():
normalized[date_key] = tooltip_by_id.get(cell_id, 0)
return normalized
async def _fetch_github_activity_public(
username: str,
from_date: date,
to_date: date,
timeout_seconds: float,
) -> dict[str, int]:
endpoint = (
f"https://github.com/users/{username}/contributions"
f"?from={from_date.isoformat()}&to={to_date.isoformat()}"
)
headers = {
"Accept": "text/html",
"User-Agent": "git-activity-merge/0.1",
}
async with httpx.AsyncClient(timeout=timeout_seconds, follow_redirects=True) as client:
response = await client.get(endpoint, headers=headers)
if response.status_code >= 400:
raise GitHubSourceError(
f"GitHub public contributions request failed with status {response.status_code}"
)
return _parse_public_contributions_html(response.text, from_date, to_date)
async def fetch_github_activity(
username: str,
token: str | None,
from_date: date,
to_date: date,
timeout_seconds: float = 20.0,
) -> dict[str, int]:
if not token:
return await _fetch_github_activity_public(
username=username,
from_date=from_date,
to_date=to_date,
timeout_seconds=timeout_seconds,
)
query = """
query($login: String!, $from: DateTime!, $to: DateTime!) {
user(login: $login) {
contributionsCollection(from: $from, to: $to) {
contributionCalendar {
weeks {
contributionDays {
date
contributionCount
}
}
}
}
}
}
"""
variables: dict[str, Any] = {
"login": username,
"from": datetime.combine(from_date, datetime.min.time(), tzinfo=timezone.utc).isoformat(),
"to": datetime.combine(to_date, datetime.max.time(), tzinfo=timezone.utc).isoformat(),
}
headers: dict[str, str] = {"Accept": "application/json"}
if token:
headers["Authorization"] = f"bearer {token}"
async with httpx.AsyncClient(timeout=timeout_seconds) as client:
response = await client.post(
GITHUB_GRAPHQL_URL,
headers=headers,
json={"query": query, "variables": variables},
)
if response.status_code >= 400:
raise GitHubSourceError(f"GitHub GraphQL request failed with status {response.status_code}")
payload = response.json()
if payload.get("errors"):
raise GitHubSourceError("GitHub GraphQL response included errors")
user = payload.get("data", {}).get("user")
if not user:
return {}
weeks = (
user.get("contributionsCollection", {})
.get("contributionCalendar", {})
.get("weeks", [])
)
normalized: dict[str, int] = {}
for week in weeks:
for day in week.get("contributionDays", []):
date_key = str(day.get("date", ""))
if not date_key:
continue
normalized[date_key] = int(day.get("contributionCount", 0))
return normalized