143 lines
4.4 KiB
Python
143 lines
4.4 KiB
Python
import re
|
|
from datetime import date, datetime, timezone
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
GITHUB_GRAPHQL_URL = "https://api.github.com/graphql"
|
|
|
|
|
|
class GitHubSourceError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def _extract_attr(tag: str, attr: str) -> str | None:
|
|
match = re.search(rf'{attr}="([^"]+)"', tag)
|
|
return match.group(1) if match else None
|
|
|
|
|
|
def _parse_public_contributions_html(html: str, from_date: date, to_date: date) -> dict[str, int]:
|
|
tooltip_by_id: dict[str, int] = {}
|
|
|
|
for tooltip_match in re.finditer(r'<tool-tip[^>]*for="([^"]+)"[^>]*>(.*?)</tool-tip>', html, flags=re.S):
|
|
cell_id = tooltip_match.group(1)
|
|
tooltip_text = re.sub(r"<[^>]+>", "", tooltip_match.group(2)).strip()
|
|
count_match = re.search(r"(\d[\d,]*)\s+contribution", tooltip_text, flags=re.I)
|
|
if not count_match:
|
|
if "No contributions" in tooltip_text:
|
|
tooltip_by_id[cell_id] = 0
|
|
continue
|
|
tooltip_by_id[cell_id] = int(count_match.group(1).replace(",", ""))
|
|
|
|
normalized: dict[str, int] = {}
|
|
for td_match in re.finditer(r"<td[^>]*ContributionCalendar-day[^>]*></td>", html, flags=re.S):
|
|
tag = td_match.group(0)
|
|
date_key = _extract_attr(tag, "data-date")
|
|
cell_id = _extract_attr(tag, "id")
|
|
if not date_key or not cell_id:
|
|
continue
|
|
if from_date.isoformat() <= date_key <= to_date.isoformat():
|
|
normalized[date_key] = tooltip_by_id.get(cell_id, 0)
|
|
|
|
return normalized
|
|
|
|
|
|
async def _fetch_github_activity_public(
|
|
username: str,
|
|
from_date: date,
|
|
to_date: date,
|
|
timeout_seconds: float,
|
|
) -> dict[str, int]:
|
|
endpoint = (
|
|
f"https://github.com/users/{username}/contributions"
|
|
f"?from={from_date.isoformat()}&to={to_date.isoformat()}"
|
|
)
|
|
headers = {
|
|
"Accept": "text/html",
|
|
"User-Agent": "git-activity-merge/0.1",
|
|
}
|
|
async with httpx.AsyncClient(timeout=timeout_seconds, follow_redirects=True) as client:
|
|
response = await client.get(endpoint, headers=headers)
|
|
|
|
if response.status_code >= 400:
|
|
raise GitHubSourceError(
|
|
f"GitHub public contributions request failed with status {response.status_code}"
|
|
)
|
|
return _parse_public_contributions_html(response.text, from_date, to_date)
|
|
|
|
|
|
async def fetch_github_activity(
|
|
username: str,
|
|
token: str | None,
|
|
from_date: date,
|
|
to_date: date,
|
|
timeout_seconds: float = 20.0,
|
|
) -> dict[str, int]:
|
|
if not token:
|
|
return await _fetch_github_activity_public(
|
|
username=username,
|
|
from_date=from_date,
|
|
to_date=to_date,
|
|
timeout_seconds=timeout_seconds,
|
|
)
|
|
|
|
query = """
|
|
query($login: String!, $from: DateTime!, $to: DateTime!) {
|
|
user(login: $login) {
|
|
contributionsCollection(from: $from, to: $to) {
|
|
contributionCalendar {
|
|
weeks {
|
|
contributionDays {
|
|
date
|
|
contributionCount
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
variables: dict[str, Any] = {
|
|
"login": username,
|
|
"from": datetime.combine(from_date, datetime.min.time(), tzinfo=timezone.utc).isoformat(),
|
|
"to": datetime.combine(to_date, datetime.max.time(), tzinfo=timezone.utc).isoformat(),
|
|
}
|
|
|
|
headers: dict[str, str] = {"Accept": "application/json"}
|
|
if token:
|
|
headers["Authorization"] = f"bearer {token}"
|
|
|
|
async with httpx.AsyncClient(timeout=timeout_seconds) as client:
|
|
response = await client.post(
|
|
GITHUB_GRAPHQL_URL,
|
|
headers=headers,
|
|
json={"query": query, "variables": variables},
|
|
)
|
|
|
|
if response.status_code >= 400:
|
|
raise GitHubSourceError(f"GitHub GraphQL request failed with status {response.status_code}")
|
|
|
|
payload = response.json()
|
|
if payload.get("errors"):
|
|
raise GitHubSourceError("GitHub GraphQL response included errors")
|
|
|
|
user = payload.get("data", {}).get("user")
|
|
if not user:
|
|
return {}
|
|
|
|
weeks = (
|
|
user.get("contributionsCollection", {})
|
|
.get("contributionCalendar", {})
|
|
.get("weeks", [])
|
|
)
|
|
|
|
normalized: dict[str, int] = {}
|
|
for week in weeks:
|
|
for day in week.get("contributionDays", []):
|
|
date_key = str(day.get("date", ""))
|
|
if not date_key:
|
|
continue
|
|
normalized[date_key] = int(day.get("contributionCount", 0))
|
|
|
|
return normalized
|