import hashlib import json import logging import os from dataclasses import dataclass from datetime import UTC, date, datetime, timedelta from typing import Literal from fastapi import Depends, FastAPI, HTTPException, Query from fastapi.responses import JSONResponse, Response from app.cache import FileCache from app.merge import filter_activity_source, merge_activity from app.render.png import render_png_from_svg from app.render.svg import render_activity_svg from app.settings import Settings, get_settings from app.sources.gitea import GiteaSourceError, fetch_gitea_activity from app.sources.github import GitHubSourceError, fetch_github_activity logger = logging.getLogger(__name__) SourceType = Literal["all", "github", "gitea"] ThemeType = Literal["light", "dark"] @dataclass class QueryOptions: year: int | None days: int theme: ThemeType source: SourceType @dataclass class ActivityResult: merged: dict[str, dict[str, int]] stale: bool from_date: date to_date: date days_count: int app = FastAPI(title=os.getenv("SERVICE_TITLE", "git-activity-merge")) def get_cache(settings: Settings = Depends(get_settings)) -> FileCache | None: if settings.cache_dir is None or settings.cache_ttl_seconds is None: return None return FileCache(cache_dir=settings.cache_dir, default_ttl_seconds=settings.cache_ttl_seconds) def get_query_options( settings: Settings = Depends(get_settings), year: int | None = Query(default=None, ge=1970, le=2100), days: int | None = Query(default=None, ge=1, le=5000), theme: ThemeType | None = Query(default=None), source: SourceType = Query(default="all"), ) -> QueryOptions: if year is not None and days is not None: raise HTTPException(status_code=400, detail="Provide either year or days, not both") selected_theme = theme or ("dark" if settings.default_theme == "dark" else "light") selected_days = days if days is not None else 365 return QueryOptions(year=year, days=selected_days, theme=selected_theme, source=source) def compute_date_range(options: QueryOptions) -> tuple[date, date, int]: today = datetime.now(tz=UTC).date() if options.year is not None: from_date = date(options.year, 1, 1) to_date = date(options.year, 12, 31) if options.year == today.year: to_date = today days_count = (to_date - from_date).days + 1 return from_date, to_date, days_count to_date = today from_date = to_date - timedelta(days=options.days - 1) return from_date, to_date, options.days def _date_keys(from_date: date, to_date: date) -> list[str]: keys: list[str] = [] current = from_date while current <= to_date: keys.append(current.isoformat()) current += timedelta(days=1) return keys async def _fetch_with_cache( cache: FileCache | None, key: str, fetcher, ) -> tuple[dict[str, int], bool]: if cache is None: fresh = await fetcher() return fresh, False cached = cache.get_json(key, allow_stale=True) if cached and not cached.stale: return {k: int(v) for k, v in cached.value.items()}, False try: fresh = await fetcher() cache.set_json(key, fresh) return fresh, False except (GitHubSourceError, GiteaSourceError, RuntimeError, ValueError) as exc: if cached is not None: logger.warning("source fetch failed for %s, serving stale cache: %s", key, exc) return {k: int(v) for k, v in cached.value.items()}, True raise async def collect_merged_activity( settings: Settings, cache: FileCache | None, options: QueryOptions, ) -> ActivityResult: from_date, to_date, days_count = compute_date_range(options) gh_key = f"github_{settings.github_username}_{from_date}_{to_date}" gt_key = f"gitea_{settings.gitea_username}_{from_date}_{to_date}" github_data: dict[str, int] = {} gitea_data: dict[str, int] = {} github_stale = False gitea_stale = False if options.source in ("all", "github"): github_data, github_stale = await _fetch_with_cache( cache, gh_key, lambda: fetch_github_activity( username=settings.github_username, token=settings.github_token, from_date=from_date, to_date=to_date, ), ) if options.source in ("all", "gitea"): gitea_data, gitea_stale = await _fetch_with_cache( cache, gt_key, lambda: fetch_gitea_activity( base_url=settings.gitea_base_url, username=settings.gitea_username, token=settings.gitea_token, from_date=from_date, to_date=to_date, ), ) merged = merge_activity(github_data, gitea_data, dates=_date_keys(from_date, to_date)) merged = filter_activity_source(merged, options.source) return ActivityResult( merged=merged, stale=github_stale or gitea_stale, from_date=from_date, to_date=to_date, days_count=days_count, ) def _daily_totals(merged: dict[str, dict[str, int]]) -> dict[str, int]: return {day: int(payload.get("total", 0)) for day, payload in merged.items()} def _image_cache_key(prefix: str, options: QueryOptions, result: ActivityResult) -> str: digest = hashlib.sha1( json.dumps(result.merged, sort_keys=True, separators=(",", ":")).encode("utf-8") ).hexdigest() return f"{prefix}_{options.theme}_{options.source}_{result.from_date}_{result.to_date}_{digest}" @app.get("/health") async def health(settings: Settings = Depends(get_settings)) -> dict[str, str]: return {"status": "ok", "service": settings.service_title} @app.get("/activity.json") async def activity_json( settings: Settings = Depends(get_settings), cache: FileCache | None = Depends(get_cache), options: QueryOptions = Depends(get_query_options), ) -> JSONResponse: try: result = await collect_merged_activity(settings, cache, options) except Exception as exc: logger.exception("failed to fetch activity data") raise HTTPException(status_code=502, detail="Failed to fetch activity data") from exc total = sum(v.get("total", 0) for v in result.merged.values()) payload = { "from": result.from_date.isoformat(), "to": result.to_date.isoformat(), "days": result.days_count, "source": options.source, "total": total, "stale": result.stale, "activity": result.merged, } return JSONResponse(content=payload) @app.get("/activity.svg") async def activity_svg( settings: Settings = Depends(get_settings), cache: FileCache | None = Depends(get_cache), options: QueryOptions = Depends(get_query_options), ) -> Response: try: result = await collect_merged_activity(settings, cache, options) except Exception as exc: logger.exception("failed to fetch activity data") raise HTTPException(status_code=502, detail="Failed to fetch activity data") from exc cache_key = _image_cache_key("svg", options, result) if cache is not None: cached = cache.get_json(cache_key) if cached is not None: return Response(content=str(cached.value), media_type="image/svg+xml") daily_totals = _daily_totals(result.merged) total = sum(daily_totals.values()) svg = render_activity_svg( daily_totals=daily_totals, from_date=result.from_date, to_date=result.to_date, days_count=result.days_count, total_contributions=total, theme=options.theme, ) if cache is not None: cache.set_json(cache_key, svg) return Response(content=svg, media_type="image/svg+xml") @app.get("/activity.png") async def activity_png( settings: Settings = Depends(get_settings), cache: FileCache | None = Depends(get_cache), options: QueryOptions = Depends(get_query_options), ) -> Response: try: result = await collect_merged_activity(settings, cache, options) except Exception as exc: logger.exception("failed to fetch activity data") raise HTTPException(status_code=502, detail="Failed to fetch activity data") from exc cache_key = _image_cache_key("png", options, result) if cache is not None: cached = cache.get_json(cache_key) if cached is not None: png_data = bytes.fromhex(str(cached.value)) return Response(content=png_data, media_type="image/png") daily_totals = _daily_totals(result.merged) total = sum(daily_totals.values()) svg = render_activity_svg( daily_totals=daily_totals, from_date=result.from_date, to_date=result.to_date, days_count=result.days_count, total_contributions=total, theme=options.theme, ) try: png_data = render_png_from_svg(svg) except Exception as exc: logger.exception("failed to render png") raise HTTPException(status_code=500, detail="PNG rendering failed") from exc if cache is not None: cache.set_json(cache_key, png_data.hex()) return Response(content=png_data, media_type="image/png")