refactor: split client.py into graphql.py + parser.py modules

Split the monolithic client.py (1341 lines) into three focused modules:

- graphql.py (~200 lines): queryId resolution, URL building, JS bundle
  scanning, feature flag management
- parser.py (~270 lines): Tweet/User/Media/Article parsing, utility functions
  (_deep_get, _parse_int, _extract_cursor, _extract_media)
- client.py (~700 lines): TwitterClient class with HTTP engine, anti-detection,
  session management, and all public API methods

Backward compatibility: client.py re-exports all previously public symbols
so existing test imports work without modification. 88/88 tests pass.
This commit is contained in:
jackwener
2026-03-10 23:18:59 +08:00
parent 4afc4fc246
commit c2b9be4669
3 changed files with 749 additions and 646 deletions

View File

@@ -7,10 +7,9 @@ import logging
import math
import os
import random
import re
import time
import urllib.parse
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, cast
from typing import Any, Callable, Dict, cast
import bs4
from curl_cffi import requests as _cffi_requests
@@ -34,83 +33,37 @@ from .constants import (
sync_chrome_version,
)
from .exceptions import (
AuthenticationError,
NetworkError,
NotFoundError,
QueryIdError,
RateLimitError,
TwitterAPIError,
)
from .models import Author, Metrics, Tweet, TweetMedia, UserProfile
TimelineInstructionGetter = Callable[[Any], Any]
TimelineParseResult = Tuple[List[Tweet], Optional[str]]
SeenIdSet = Set[str]
from .graphql import (
FEATURES,
_build_graphql_url,
_invalidate_query_id,
_resolve_query_id,
_update_features_from_html,
)
from .models import UserProfile
from .parser import (
_deep_get,
_parse_int,
parse_timeline_response,
parse_tweet_result,
parse_user_result,
)
logger = logging.getLogger(__name__)
# Shared curl_cffi session — impersonates Chrome 133 TLS/JA3/HTTP2 fingerprint
_cffi_session: Optional[Any] = None
FALLBACK_QUERY_IDS = {
# Read operations
"HomeTimeline": "c-CzHF1LboFilMpsx4ZCrQ",
"HomeLatestTimeline": "BKB7oi212Fi7kQtCBGE4zA",
"Bookmarks": "VFdMm9iVZxlU6hD86gfW_A",
"UserByScreenName": "1VOOyvKkiI3FMmkeDNxM9A",
"UserTweets": "E3opETHurmVJflFsUBVuUQ",
"SearchTimeline": "nWemVnGJ6A5eQAR5-oQeAg",
"Likes": "lIDpu_NWL7_VhimGGt0o6A",
"TweetDetail": "xd_EMdYvB9hfZsZ6Idri0w",
"ListLatestTweetsTimeline": "RlZzktZY_9wJynoepm8ZsA",
"Followers": "IOh4aS6UdGWGJUYTqliQ7Q",
"Following": "zx6e-TLzRkeDO_a7p4b3JQ",
# Write operations
"CreateTweet": "IID9x6WsdMnTlXnzXGq8ng",
"DeleteTweet": "VaenaVgh5q5ih7kvyVjgtg",
"FavoriteTweet": "lI07N6Otwv1PhnEgXILM7A",
"UnfavoriteTweet": "ZYKSe-w7KEslx3JhSIk5LA",
"CreateRetweet": "ojPdsZsimiJrUGLR1sjUtA",
"DeleteRetweet": "iQtK4dl5hBmXewYZuEOKVw",
"CreateBookmark": "aoDbu3RHznuiSkQ9aNM67Q",
"DeleteBookmark": "Wlmlj2-xzyS1GN3a6cj-mQ",
}
TWITTER_OPENAPI_URL = (
"https://raw.githubusercontent.com/fa0311/twitter-openapi/"
"main/src/config/placeholder.json"
)
# Essential features only — keep this list SMALL to avoid 414/431 URI Too Long.
# Twitter's API defaults missing features to False, so we only need True-valued ones
# that affect tweet data we actually consume. Each additional key adds ~60 chars to URL.
_DEFAULT_FEATURES = {
"creator_subscriptions_tweet_preview_api_enabled": True,
"communities_web_enable_tweet_community_results_fetch": True,
"c9s_tweet_anatomy_moderator_badge_enabled": True,
"articles_preview_enabled": True,
"responsive_web_edit_tweet_api_enabled": True,
"graphql_is_translatable_rweb_tweet_is_translatable_enabled": True,
"view_counts_everywhere_api_enabled": True,
"longform_notetweets_consumption_enabled": True,
"responsive_web_twitter_article_tweet_consumption_enabled": True,
"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True,
"longform_notetweets_rich_text_read_enabled": True,
"freedom_of_speech_not_reach_fetch_enabled": True,
"standardized_nudges_misinfo": True,
"responsive_web_graphql_timeline_navigation_enabled": True,
"responsive_web_enhance_cards_enabled": False,
}
# Features dict that gets updated dynamically from x.com JS bundles
FEATURES = dict(_DEFAULT_FEATURES)
# Module-level caches (not thread-safe — CLI is single-threaded)
_cached_query_ids: Dict[str, str] = {}
_bundles_scanned = False
# Shared curl_cffi session (single-threaded CLI)
_cffi_session = None
TimelineInstructionGetter = Callable[[Any], Any]
# Hard ceiling to prevent accidental massive fetches
_ABSOLUTE_MAX_COUNT = 500
# ── Session management ───────────────────────────────────────────────────
def _best_chrome_target():
@@ -144,7 +97,6 @@ def _get_cffi_session():
"""Return shared curl_cffi session with Chrome impersonation and optional proxy."""
global _cffi_session
if _cffi_session is None:
import os
proxy = os.environ.get("TWITTER_PROXY", "")
target = _best_chrome_target()
sync_chrome_version(target) # align UA/sec-ch-ua with impersonate target
@@ -167,145 +119,7 @@ def _url_fetch(url, headers=None):
return resp.text
def _build_graphql_url(query_id, operation_name, variables, features, field_toggles=None):
# type: (str, str, Dict[str, Any], Dict[str, Any], Optional[Dict[str, Any]]) -> str
"""Build GraphQL GET URL with encoded variables/features/fieldToggles.
Only includes True-valued feature flags in the URL to avoid 414 URI Too Long.
Twitter's API defaults missing features to False.
"""
# Compact features: omit False values to keep URL under server limits
compact_features = {k: v for k, v in features.items() if v is not False}
url = "https://x.com/i/api/graphql/%s/%s?variables=%s&features=%s" % (
query_id,
operation_name,
urllib.parse.quote(json.dumps(variables, separators=(",", ":"))),
urllib.parse.quote(json.dumps(compact_features, separators=(",", ":"))),
)
if field_toggles:
url += "&fieldToggles=%s" % urllib.parse.quote(
json.dumps(field_toggles, separators=(",", ":"))
)
return url
def _scan_bundles():
# type: () -> None
"""Scan Twitter JS bundles and cache queryId mappings."""
global _bundles_scanned
if _bundles_scanned:
return
_bundles_scanned = True
try:
html = _url_fetch("https://x.com", {"user-agent": get_user_agent()})
script_pattern = re.compile(
r'(?:src|href)=["\']'
r'(https://abs\.twimg\.com/responsive-web/client-web[^"\']+\.js)'
r'["\']'
)
script_urls = script_pattern.findall(html)
except Exception as exc: # pragma: no cover - network-dependent branch
logger.warning("Failed to scan JS bundles: %s", exc)
return
for script_url in script_urls:
try:
bundle = _url_fetch(script_url)
op_pattern = re.compile(
r'queryId:\s*"([A-Za-z0-9_-]+)"[^}]{0,200}'
r'operationName:\s*"([^"]+)"'
)
for match in op_pattern.finditer(bundle):
query_id, operation_name = match.group(1), match.group(2)
_cached_query_ids.setdefault(operation_name, query_id)
except Exception:
continue
logger.info("Scanned %d JS bundles, cached %d query IDs", len(script_urls), len(_cached_query_ids))
def _update_features_from_html(html):
# type: (str) -> None
"""Extract live feature flags from x.com HTML and update the global FEATURES dict.
Twitter embeds feature switch config in inline scripts on the homepage.
We parse these to keep FEATURES in sync with the current frontend.
Only UPDATES existing keys — never adds new ones to avoid URL bloat.
"""
try:
feature_pattern = re.compile(
r'"([a-z][a-z0-9_]+)":\s*\{\s*"value"\s*:\s*(true|false)',
re.IGNORECASE,
)
found = 0
for match in feature_pattern.finditer(html):
key = match.group(1)
value = match.group(2).lower() == "true"
# Only update keys already in FEATURES — never add new ones
# Adding new keys inflates URL length, causing 414/431 errors
if key in FEATURES and FEATURES[key] != value:
logger.debug("Feature flag updated: %s = %s -> %s", key, FEATURES[key], value)
FEATURES[key] = value
found += 1
if found:
logger.info("Updated %d feature flags from x.com", found)
except Exception as exc:
logger.debug("Feature extraction from HTML failed: %s", exc)
def _fetch_from_github(operation_name):
# type: (str) -> Optional[str]
"""Fetch queryId from community-maintained twitter-openapi file."""
try:
payload = _url_fetch(TWITTER_OPENAPI_URL)
parsed = json.loads(payload)
operation = parsed.get(operation_name, {})
query_id = operation.get("queryId")
if isinstance(query_id, str) and query_id:
return query_id
except Exception as exc: # pragma: no cover - network-dependent branch
logger.debug("GitHub queryId lookup failed: %s", exc)
return None
def _invalidate_query_id(operation_name):
# type: (str) -> None
"""Remove a cached queryId for an operation."""
_cached_query_ids.pop(operation_name, None)
def _resolve_query_id(operation_name, prefer_fallback=True):
# type: (str, bool) -> str
"""Resolve queryId using cache, remote sources, and fallback constants."""
cached = _cached_query_ids.get(operation_name)
if cached:
return cached
fallback = FALLBACK_QUERY_IDS.get(operation_name)
if prefer_fallback and fallback:
_cached_query_ids[operation_name] = fallback
return fallback
github_query_id = _fetch_from_github(operation_name)
if github_query_id:
_cached_query_ids[operation_name] = github_query_id
return github_query_id
_scan_bundles()
cached = _cached_query_ids.get(operation_name)
if cached:
return cached
if fallback:
_cached_query_ids[operation_name] = fallback
return fallback
raise QueryIdError('Cannot resolve queryId for "%s"' % operation_name)
# Hard ceiling to prevent accidental massive fetches
_ABSOLUTE_MAX_COUNT = 500
# ── TwitterClient ────────────────────────────────────────────────────────
class TwitterClient:
@@ -326,6 +140,8 @@ class TwitterClient:
# Eagerly initialize ClientTransaction on construction
self._ensure_client_transaction()
# ── Read operations ──────────────────────────────────────────────
def fetch_home_timeline(self, count=20):
# type: (int) -> List[Tweet]
"""Fetch home timeline tweets."""
@@ -520,7 +336,7 @@ class TwitterClient:
lambda data: _deep_get(data, "data", "user", "result", "timeline", "timeline", "instructions"),
)
# ── Write operations ────────────────────────────────────────────────
# ── Write operations ─────────────────────────────────────────────
def _write_delay(self):
# type: () -> None
@@ -670,6 +486,8 @@ class TwitterClient:
self._write_delay()
return True
# ── Internal: timeline / user list fetchers ──────────────────────
def _fetch_timeline(self, operation_name, count, get_instructions, extra_variables=None, override_base_variables=False, field_toggles=None):
# type: (str, int, Callable[[Any], Any], Optional[Dict[str, Any]], bool, Optional[Dict[str, Any]]) -> List[Tweet]
"""Generic timeline fetcher with pagination and deduplication.
@@ -734,10 +552,73 @@ class TwitterClient:
return tweets[:count]
def _fetch_user_list(self, operation_name, user_id, count, get_instructions):
# type: (str, str, int, Callable[[Any], Any]) -> List[UserProfile]
"""Generic user list fetcher (for followers/following) with pagination."""
if count <= 0:
return []
count = min(count, self._max_count)
users = [] # type: List[UserProfile]
seen_ids = set() # type: Set[str]
cursor = None # type: Optional[str]
attempts = 0
max_attempts = int(math.ceil(count / 20.0)) + 2
while len(users) < count and attempts < max_attempts:
attempts += 1
variables = {
"userId": user_id,
"count": min(count - len(users) + 5, 40),
"includePromotedContent": False,
} # type: Dict[str, Any]
if cursor:
variables["cursor"] = cursor
data = self._graphql_get(operation_name, variables, FEATURES)
instructions = get_instructions(data)
if not instructions:
logger.warning("No user list instructions found")
break
new_users = [] # type: List[UserProfile]
next_cursor = None # type: Optional[str]
for instruction in instructions:
entries = instruction.get("entries", [])
for entry in entries:
content = entry.get("content", {})
entry_type = content.get("entryType", "")
if entry_type == "TimelineTimelineItem":
item = content.get("itemContent", {})
user_results = _deep_get(item, "user_results", "result")
if user_results:
user = self._parse_user_result(user_results)
if user:
new_users.append(user)
elif entry_type == "TimelineTimelineCursor":
if content.get("cursorType") == "Bottom":
next_cursor = content.get("value")
for user in new_users:
if user.id and user.id not in seen_ids:
seen_ids.add(user.id)
users.append(user)
if not next_cursor or not new_users:
break
cursor = next_cursor
if len(users) < count and self._request_delay > 0:
time.sleep(self._request_delay * random.uniform(0.7, 1.5))
return users[:count]
# ── Internal: GraphQL request methods ────────────────────────────
def _graphql_get(self, operation_name, variables, features, field_toggles=None):
# type: (str, Dict[str, Any], Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any]
"""Issue GraphQL GET request with automatic stale-fallback retry."""
query_id = _resolve_query_id(operation_name, prefer_fallback=True)
query_id = _resolve_query_id(operation_name, prefer_fallback=True, url_fetch_fn=_url_fetch)
using_fallback = query_id == FALLBACK_QUERY_IDS.get(operation_name)
url = _build_graphql_url(query_id, operation_name, variables, features, field_toggles)
@@ -748,11 +629,125 @@ class TwitterClient:
if exc.status_code == 404 and using_fallback:
logger.info("Retrying %s with live queryId after 404", operation_name)
_invalidate_query_id(operation_name)
refreshed_query_id = _resolve_query_id(operation_name, prefer_fallback=False)
refreshed_query_id = _resolve_query_id(operation_name, prefer_fallback=False, url_fetch_fn=_url_fetch)
retry_url = _build_graphql_url(refreshed_query_id, operation_name, variables, features, field_toggles)
return self._api_get(retry_url)
raise RuntimeError(str(exc))
def _graphql_post(self, operation_name, variables, features=None):
# type: (str, Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any]
"""Issue GraphQL POST request with automatic stale-fallback retry."""
query_id = _resolve_query_id(operation_name, prefer_fallback=True, url_fetch_fn=_url_fetch)
using_fallback = query_id == FALLBACK_QUERY_IDS.get(operation_name)
def _do_post(qid):
# type: (str) -> Dict[str, Any]
url = "https://x.com/i/api/graphql/%s/%s" % (qid, operation_name)
body = {"variables": variables, "queryId": qid} # type: Dict[str, Any]
if features:
body["features"] = features
return self._api_request(url, method="POST", body=body)
try:
return _do_post(query_id)
except TwitterAPIError as exc:
if exc.status_code == 404 and using_fallback:
logger.info("Retrying POST %s with live queryId after 404", operation_name)
_invalidate_query_id(operation_name)
refreshed = _resolve_query_id(operation_name, prefer_fallback=False, url_fetch_fn=_url_fetch)
return _do_post(refreshed)
raise RuntimeError(str(exc))
# ── Internal: HTTP request engine ────────────────────────────────
def _api_get(self, url):
# type: (str) -> Dict[str, Any]
"""Make authenticated GET request to Twitter API."""
return self._api_request(url, method="GET")
def _api_request(self, url, method="GET", body=None):
# type: (str, str, Optional[Dict[str, Any]]) -> Dict[str, Any]
"""Make authenticated request to Twitter API with retry on rate limits.
Uses curl_cffi for Chrome TLS/JA3/HTTP2 fingerprint impersonation.
Handles both GET and POST. Retries on HTTP 429 and JSON error code 88.
"""
headers = self._build_headers(url=url, method=method)
session = _get_cffi_session()
json_body = body # curl_cffi handles JSON serialization
for attempt in range(self._max_retries + 1):
try:
if method == "POST":
response = session.post(
url, headers=headers, json=json_body, timeout=30,
)
else:
response = session.get(url, headers=headers, timeout=30)
status_code = response.status_code
if status_code == 429 and attempt < self._max_retries:
wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2)
logger.warning(
"Rate limited (429), retrying in %.1fs (attempt %d/%d)",
wait, attempt + 1, self._max_retries,
)
time.sleep(wait)
continue
if status_code >= 400:
message = "Twitter API error %d: %s" % (status_code, response.text[:500])
raise TwitterAPIError(status_code, message)
payload = response.text
except TwitterAPIError:
raise
except Exception as exc:
raise TwitterAPIError(0, "Twitter API network error: %s" % exc)
try:
parsed = json.loads(payload)
except (json.JSONDecodeError, ValueError):
raise TwitterAPIError(0, "Twitter API returned invalid JSON")
if isinstance(parsed, dict) and parsed.get("errors"):
err_msg = parsed["errors"][0].get("message", "Unknown error")
# Rate limit can also surface as a JSON error (code 88)
err_code = parsed["errors"][0].get("code", 0)
if err_code == 88 and attempt < self._max_retries:
wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2)
logger.warning(
"Rate limited (code 88), retrying in %.1fs (attempt %d/%d)",
wait, attempt + 1, self._max_retries,
)
time.sleep(wait)
continue
# Write operation rate limits (retweet/like/bookmark limits)
# Code 348 = "retweet limit", 327 = "already retweeted"
# Provide user-friendly message
if err_code in (348, 349):
raise TwitterAPIError(
429, "Rate limited: %s (try again later, recommended wait: 15+ minutes)" % err_msg
)
raise TwitterAPIError(0, "Twitter API returned errors: %s" % err_msg)
# GraphQL write mutations return errors in data.errors (separate from top-level)
if isinstance(parsed, dict) and "data" in parsed:
data_obj = parsed["data"]
if isinstance(data_obj, dict):
for key, val in data_obj.items():
if isinstance(val, dict) and val.get("errors"):
inner_errors = val["errors"]
if inner_errors:
inner_msg = inner_errors[0].get("message", "Unknown error")
raise TwitterAPIError(0, "Twitter API: %s" % inner_msg)
return parsed
# Should not be reached, but just in case
raise TwitterAPIError(429, "Rate limited after %d retries" % self._max_retries)
# ── Internal: Anti-detection / headers ───────────────────────────
@staticmethod
def _ct_cache_path():
# type: () -> str
@@ -893,448 +888,27 @@ class TwitterClient:
logger.debug("Failed to generate transaction id: %s", exc)
return headers
def _api_get(self, url):
# type: (str) -> Dict[str, Any]
"""Make authenticated GET request to Twitter API."""
return self._api_request(url, method="GET")
def _graphql_post(self, operation_name, variables, features=None):
# type: (str, Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any]
"""Issue GraphQL POST request with automatic stale-fallback retry."""
query_id = _resolve_query_id(operation_name, prefer_fallback=True)
using_fallback = query_id == FALLBACK_QUERY_IDS.get(operation_name)
def _do_post(qid):
# type: (str) -> Dict[str, Any]
url = "https://x.com/i/api/graphql/%s/%s" % (qid, operation_name)
body = {"variables": variables, "queryId": qid} # type: Dict[str, Any]
if features:
body["features"] = features
return self._api_request(url, method="POST", body=body)
try:
return _do_post(query_id)
except TwitterAPIError as exc:
if exc.status_code == 404 and using_fallback:
logger.info("Retrying POST %s with live queryId after 404", operation_name)
_invalidate_query_id(operation_name)
refreshed = _resolve_query_id(operation_name, prefer_fallback=False)
return _do_post(refreshed)
raise RuntimeError(str(exc))
def _api_request(self, url, method="GET", body=None):
# type: (str, str, Optional[Dict[str, Any]]) -> Dict[str, Any]
"""Make authenticated request to Twitter API with retry on rate limits.
Uses curl_cffi for Chrome TLS/JA3/HTTP2 fingerprint impersonation.
Handles both GET and POST. Retries on HTTP 429 and JSON error code 88.
"""
headers = self._build_headers(url=url, method=method)
session = _get_cffi_session()
json_body = body # curl_cffi handles JSON serialization
for attempt in range(self._max_retries + 1):
try:
if method == "POST":
response = session.post(
url, headers=headers, json=json_body, timeout=30,
)
else:
response = session.get(url, headers=headers, timeout=30)
status_code = response.status_code
if status_code == 429 and attempt < self._max_retries:
wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2)
logger.warning(
"Rate limited (429), retrying in %.1fs (attempt %d/%d)",
wait, attempt + 1, self._max_retries,
)
time.sleep(wait)
continue
if status_code >= 400:
message = "Twitter API error %d: %s" % (status_code, response.text[:500])
raise TwitterAPIError(status_code, message)
payload = response.text
except TwitterAPIError:
raise
except Exception as exc:
raise TwitterAPIError(0, "Twitter API network error: %s" % exc)
try:
parsed = json.loads(payload)
except (json.JSONDecodeError, ValueError):
raise TwitterAPIError(0, "Twitter API returned invalid JSON")
if isinstance(parsed, dict) and parsed.get("errors"):
err_msg = parsed["errors"][0].get("message", "Unknown error")
# Rate limit can also surface as a JSON error (code 88)
err_code = parsed["errors"][0].get("code", 0)
if err_code == 88 and attempt < self._max_retries:
wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2)
logger.warning(
"Rate limited (code 88), retrying in %.1fs (attempt %d/%d)",
wait, attempt + 1, self._max_retries,
)
time.sleep(wait)
continue
# Write operation rate limits (retweet/like/bookmark limits)
# Code 348 = "retweet limit", 327 = "already retweeted"
# Provide user-friendly message
if err_code in (348, 349):
raise TwitterAPIError(
429, "Rate limited: %s (try again later, recommended wait: 15+ minutes)" % err_msg
)
raise TwitterAPIError(0, "Twitter API returned errors: %s" % err_msg)
# GraphQL write mutations return errors in data.errors (separate from top-level)
if isinstance(parsed, dict) and "data" in parsed:
data_obj = parsed["data"]
if isinstance(data_obj, dict):
for key, val in data_obj.items():
if isinstance(val, dict) and val.get("errors"):
inner_errors = val["errors"]
if inner_errors:
inner_msg = inner_errors[0].get("message", "Unknown error")
raise TwitterAPIError(0, "Twitter API: %s" % inner_msg)
return parsed
# Should not be reached, but just in case
raise TwitterAPIError(429, "Rate limited after %d retries" % self._max_retries)
def _fetch_user_list(self, operation_name, user_id, count, get_instructions):
# type: (str, str, int, Callable[[Any], Any]) -> List[UserProfile]
"""Generic user list fetcher (for followers/following) with pagination."""
if count <= 0:
return []
count = min(count, self._max_count)
users = [] # type: List[UserProfile]
seen_ids = set() # type: Set[str]
cursor = None # type: Optional[str]
attempts = 0
max_attempts = int(math.ceil(count / 20.0)) + 2
while len(users) < count and attempts < max_attempts:
attempts += 1
variables = {
"userId": user_id,
"count": min(count - len(users) + 5, 40),
"includePromotedContent": False,
} # type: Dict[str, Any]
if cursor:
variables["cursor"] = cursor
data = self._graphql_get(operation_name, variables, FEATURES)
instructions = get_instructions(data)
if not instructions:
logger.warning("No user list instructions found")
break
new_users = [] # type: List[UserProfile]
next_cursor = None # type: Optional[str]
for instruction in instructions:
entries = instruction.get("entries", [])
for entry in entries:
content = entry.get("content", {})
entry_type = content.get("entryType", "")
if entry_type == "TimelineTimelineItem":
item = content.get("itemContent", {})
user_results = _deep_get(item, "user_results", "result")
if user_results:
user = self._parse_user_result(user_results)
if user:
new_users.append(user)
elif entry_type == "TimelineTimelineCursor":
if content.get("cursorType") == "Bottom":
next_cursor = content.get("value")
for user in new_users:
if user.id and user.id not in seen_ids:
seen_ids.add(user.id)
users.append(user)
if not next_cursor or not new_users:
break
cursor = next_cursor
if len(users) < count and self._request_delay > 0:
time.sleep(self._request_delay * random.uniform(0.7, 1.5))
return users[:count]
# ── Backward-compatible delegation to parser module ──────────────
@staticmethod
def _parse_user_result(user_data):
# type: (Dict[str, Any]) -> Optional[UserProfile]
"""Parse a user result object into UserProfile."""
if user_data.get("__typename") == "UserUnavailable":
return None
legacy = user_data.get("legacy", {})
if not legacy:
return None
return UserProfile(
id=user_data.get("rest_id", ""),
name=legacy.get("name", ""),
screen_name=legacy.get("screen_name", ""),
bio=legacy.get("description", ""),
location=legacy.get("location", ""),
url=_deep_get(legacy, "entities", "url", "urls", 0, "expanded_url") or "",
followers_count=_parse_int(legacy.get("followers_count"), 0),
following_count=_parse_int(legacy.get("friends_count"), 0),
tweets_count=_parse_int(legacy.get("statuses_count"), 0),
likes_count=_parse_int(legacy.get("favourites_count"), 0),
verified=user_data.get("is_blue_verified", False) or legacy.get("verified", False),
profile_image_url=legacy.get("profile_image_url_https", ""),
created_at=legacy.get("created_at", ""),
)
def _parse_timeline_response(self, data, get_instructions):
# type: (Any, Callable[[Any], Any]) -> Tuple[List[Tweet], Optional[str]]
"""Parse timeline GraphQL response into tweets and next cursor."""
tweets = [] # type: List[Tweet]
next_cursor = None # type: Optional[str]
instructions = get_instructions(data)
if not isinstance(instructions, list):
logger.warning("No timeline instructions found")
return tweets, next_cursor
for instruction in instructions:
entries = instruction.get("entries") or instruction.get("moduleItems") or []
for entry in entries:
content = entry.get("content", {})
next_cursor = _extract_cursor(content) or next_cursor
item_content = content.get("itemContent", {})
result = _deep_get(item_content, "tweet_results", "result")
if result:
tweet = self._parse_tweet_result(result)
if tweet:
tweets.append(tweet)
for nested_item in content.get("items", []):
nested_result = _deep_get(
nested_item,
"item",
"itemContent",
"tweet_results",
"result",
)
if nested_result:
tweet = self._parse_tweet_result(nested_result)
if tweet:
tweets.append(tweet)
return tweets, next_cursor
return parse_user_result(user_data)
def _parse_tweet_result(self, result, depth=0):
# type: (Dict[str, Any], int) -> Optional[Tweet]
"""Parse a single TweetResult into a Tweet dataclass."""
if depth > 2:
return None
return parse_tweet_result(result, depth)
tweet_data = result
if result.get("__typename") == "TweetWithVisibilityResults" and result.get("tweet"):
tweet_data = result["tweet"]
if tweet_data.get("__typename") == "TweetTombstone":
return None
legacy = tweet_data.get("legacy")
core = tweet_data.get("core")
if not isinstance(legacy, dict) or not isinstance(core, dict):
return None
user = _deep_get(core, "user_results", "result") or {}
user_legacy = user.get("legacy", {})
user_core = user.get("core", {})
is_retweet = bool(_deep_get(legacy, "retweeted_status_result", "result"))
actual_data = tweet_data
actual_legacy = legacy
actual_user = user
actual_user_legacy = user_legacy
if is_retweet:
retweet_result = _deep_get(legacy, "retweeted_status_result", "result") or {}
if retweet_result.get("__typename") == "TweetWithVisibilityResults" and retweet_result.get("tweet"):
retweet_result = retweet_result["tweet"]
rt_legacy = retweet_result.get("legacy")
rt_core = retweet_result.get("core")
if isinstance(rt_legacy, dict) and isinstance(rt_core, dict):
actual_data = retweet_result
actual_legacy = rt_legacy
actual_user = _deep_get(rt_core, "user_results", "result") or {}
actual_user_legacy = actual_user.get("legacy", {})
media = _extract_media(actual_legacy)
urls = [item.get("expanded_url", "") for item in _deep_get(actual_legacy, "entities", "urls") or []]
quoted = _deep_get(actual_data, "quoted_status_result", "result")
quoted_tweet = self._parse_tweet_result(quoted, depth=depth + 1) if isinstance(quoted, dict) else None
author = _extract_author(actual_user, actual_user_legacy)
retweeted_by = None # type: Optional[str]
if is_retweet:
retweeted_by = user_core.get("screen_name") or user_legacy.get("screen_name", "unknown")
return Tweet(
id=actual_data.get("rest_id", ""),
text=actual_legacy.get("full_text", ""),
author=author,
metrics=Metrics(
likes=_parse_int(actual_legacy.get("favorite_count"), 0),
retweets=_parse_int(actual_legacy.get("retweet_count"), 0),
replies=_parse_int(actual_legacy.get("reply_count"), 0),
quotes=_parse_int(actual_legacy.get("quote_count"), 0),
views=_parse_int(_deep_get(actual_data, "views", "count"), 0),
bookmarks=_parse_int(actual_legacy.get("bookmark_count"), 0),
),
created_at=actual_legacy.get("created_at", ""),
media=media,
urls=urls,
is_retweet=is_retweet,
retweeted_by=retweeted_by,
quoted_tweet=quoted_tweet,
lang=actual_legacy.get("lang", ""),
**_parse_article(actual_data),
)
def _parse_timeline_response(self, data, get_instructions):
# type: (Any, Callable[[Any], Any]) -> Tuple[List[Tweet], Optional[str]]
"""Parse timeline GraphQL response into tweets and next cursor."""
return parse_timeline_response(data, get_instructions)
def _parse_article(tweet_data):
# type: (Dict[str, Any]) -> Dict[str, Any]
"""Extract Twitter Article data (long-form content) from a tweet.
# ── Backward compatibility re-exports ────────────────────────────────────
# These keep existing test imports working without modification.
Returns dict with 'article_title' and 'article_text' keys (None if not an article).
Converts draft.js content blocks to Markdown.
"""
article_results = _deep_get(tweet_data, "article", "article_results", "result")
if not article_results:
return {"article_title": None, "article_text": None}
title = article_results.get("title") # type: Optional[str]
content_state = article_results.get("content_state", {})
blocks = content_state.get("blocks", [])
if not blocks:
return {"article_title": title, "article_text": None}
# Convert draft.js blocks to Markdown
parts = [] # type: List[str]
ordered_counter = 0
for block in blocks:
block_type = block.get("type", "unstyled") # type: str
if block_type == "atomic":
continue
text = block.get("text", "") # type: str
if not text:
continue
if block_type != "ordered-list-item":
ordered_counter = 0
if block_type == "header-one":
parts.append("# %s" % text)
elif block_type == "header-two":
parts.append("## %s" % text)
elif block_type == "header-three":
parts.append("### %s" % text)
elif block_type == "blockquote":
parts.append("> %s" % text)
elif block_type == "unordered-list-item":
parts.append("- %s" % text)
elif block_type == "ordered-list-item":
ordered_counter += 1
parts.append("%d. %s" % (ordered_counter, text))
elif block_type == "code-block":
parts.append("```\n%s\n```" % text)
else:
parts.append(text)
return {
"article_title": title,
"article_text": "\n\n".join(parts) if parts else None,
}
def _extract_media(legacy):
# type: (Dict[str, Any]) -> List[TweetMedia]
"""Extract media items from tweet legacy data."""
media = [] # type: List[TweetMedia]
for media_item in _deep_get(legacy, "extended_entities", "media") or []:
media_type = media_item.get("type", "")
if media_type == "photo":
media.append(
TweetMedia(
type="photo",
url=media_item.get("media_url_https", ""),
width=_deep_get(media_item, "original_info", "width"),
height=_deep_get(media_item, "original_info", "height"),
)
)
elif media_type in {"video", "animated_gif"}:
variants = media_item.get("video_info", {}).get("variants", [])
mp4_variants = [v for v in variants if v.get("content_type") == "video/mp4"]
mp4_variants.sort(key=lambda v: v.get("bitrate", 0), reverse=True)
media.append(
TweetMedia(
type=media_type,
url=mp4_variants[0]["url"] if mp4_variants else media_item.get("media_url_https", ""),
width=_deep_get(media_item, "original_info", "width"),
height=_deep_get(media_item, "original_info", "height"),
)
)
return media
def _extract_author(user_data, user_legacy):
# type: (Dict[str, Any], Dict[str, Any]) -> Author
"""Extract Author from user result data."""
user_core = user_data.get("core", {})
return Author(
id=user_data.get("rest_id", ""),
name=user_core.get("name") or user_legacy.get("name") or user_data.get("name", "Unknown"),
screen_name=(
user_core.get("screen_name")
or user_legacy.get("screen_name")
or user_data.get("screen_name", "unknown")
),
profile_image_url=(
user_data.get("avatar", {}).get("image_url")
or user_legacy.get("profile_image_url_https", "")
),
verified=bool(user_data.get("is_blue_verified") or user_legacy.get("verified", False)),
)
def _deep_get(data, *keys):
# type: (Any, *Any) -> Any
"""Safely get nested dict/list values. Supports int keys for list access."""
current = data
for key in keys:
if isinstance(key, int):
if isinstance(current, list) and 0 <= key < len(current):
current = current[key]
else:
return None
elif isinstance(current, dict):
current = current.get(key)
else:
return None
return current
def _extract_cursor(content):
# type: (Dict[str, Any]) -> Optional[str]
"""Extract Bottom pagination cursor from timeline content."""
if content.get("cursorType") == "Bottom":
return content.get("value")
return None
def _parse_int(value, default):
# type: (Any, int) -> int
"""Best-effort integer conversion. Handles commas and float strings."""
try:
text = str(value).replace(",", "").strip()
if not text:
return default
return int(float(text))
except (TypeError, ValueError):
return default
from .graphql import FALLBACK_QUERY_IDS # noqa: E402, F401
from .parser import _extract_cursor, _extract_media # noqa: E402, F401

221
twitter_cli/graphql.py Normal file
View File

@@ -0,0 +1,221 @@
"""GraphQL infrastructure for Twitter API.
Handles queryId resolution, URL building, JS bundle scanning,
and feature flag management.
"""
from __future__ import annotations
import json
import logging
import re
import urllib.parse
from typing import Dict
from .exceptions import QueryIdError
logger = logging.getLogger(__name__)
# ── Community OpenAPI queryId source ─────────────────────────────────────
TWITTER_OPENAPI_URL = (
"https://raw.githubusercontent.com/fa0311/"
"twitter-openapi/refs/heads/main/src/config/placeholder.json"
)
# ── Fallback (hardcoded) queryIds ────────────────────────────────────────
FALLBACK_QUERY_IDS = {
"HomeTimeline": "HCosKfLNW1AcOo3la3mMgg",
"HomeLatestTimeline": "U0cdisy7QFIoTfu3-Okw0A",
"UserByScreenName": "qRednkZG-rn1P6b48NINmQ",
"UserTweets": "E3opETHurmVJflFsUBVuUQ",
"TweetDetail": "nBS-WpgA6ZG0CyNHD517JQ",
"Likes": "aeJWz7GtGNHHO2Z3GrjCWg",
"SearchTimeline": "MJpyQGqgklrVl_0X9gNy3A",
"Bookmarks": "uzboyXSHSJrR-mGJqep0TQ",
"ListLatestTweetsTimeline": "ZBbXrl0FVnTqp7K6EAADog",
"Followers": "t-BPOrMIduGUJWO_LxcvNQ",
"Following": "iSicc7LrzWGBgDPL0tM_TQ",
"CreateTweet": "bDE2rBtZb3uyrczSZ_pI9g",
"DeleteTweet": "VaenaVgh5q5ih7kvyVjgtg",
"FavoriteTweet": "lI07N6Otwv1PhnEgXILM7A",
"UnfavoriteTweet": "ZYKSe-w7KEslx3JhSIk5LA",
"CreateRetweet": "ojPdsZsimiJrUGLR1sjVsA",
"DeleteRetweet": "iQtK4dl5hBmXewYZuEOKVw",
"CreateBookmark": "aoDbu3RHznuiSkQ9aNM67Q",
"DeleteBookmark": "Wlmlj2-xISYCixDmuS8KNg",
}
# ── Default feature flags ────────────────────────────────────────────────
_DEFAULT_FEATURES = {
"responsive_web_graphql_exclude_directive_enabled": True,
"verified_phone_label_enabled": False,
"creator_subscriptions_tweet_preview_api_enabled": True,
"responsive_web_graphql_timeline_navigation_enabled": True,
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
"c9s_tweet_anatomy_moderator_badge_enabled": True,
"tweetypie_unmention_optimization_enabled": True,
"responsive_web_edit_tweet_api_enabled": True,
"graphql_is_translatable_rweb_tweet_is_translatable_enabled": True,
"view_counts_everywhere_api_enabled": True,
"longform_notetweets_consumption_enabled": True,
"responsive_web_twitter_article_tweet_consumption_enabled": True,
"tweet_awards_web_tipping_enabled": False,
"longform_notetweets_rich_text_read_enabled": True,
"longform_notetweets_inline_media_enabled": True,
"rweb_video_timestamps_enabled": True,
"responsive_web_media_download_video_enabled": True,
"freedom_of_speech_not_reach_fetch_enabled": True,
"standardized_nudges_misinfo": True,
"responsive_web_enhance_cards_enabled": False,
}
# Features dict that gets updated dynamically from x.com JS bundles
FEATURES = dict(_DEFAULT_FEATURES)
# Module-level caches (not thread-safe — CLI is single-threaded)
_cached_query_ids: Dict[str, str] = {}
_bundles_scanned = False
def _build_graphql_url(query_id, operation_name, variables, features, field_toggles=None):
# type: (str, str, Dict[str, Any], Dict[str, Any], Optional[Dict[str, Any]]) -> str
"""Build GraphQL GET URL with encoded variables/features/fieldToggles.
Only includes True-valued feature flags in the URL to avoid 414 URI Too Long.
Twitter's API defaults missing features to False.
"""
# Compact features: omit False values to keep URL under server limits
compact_features = {k: v for k, v in features.items() if v is not False}
url = "https://x.com/i/api/graphql/%s/%s?variables=%s&features=%s" % (
query_id,
operation_name,
urllib.parse.quote(json.dumps(variables, separators=(",", ":"))),
urllib.parse.quote(json.dumps(compact_features, separators=(",", ":"))),
)
if field_toggles:
url += "&fieldToggles=%s" % urllib.parse.quote(
json.dumps(field_toggles, separators=(",", ":"))
)
return url
def _scan_bundles(url_fetch_fn):
# type: (Any) -> None
"""Scan Twitter JS bundles and cache queryId mappings.
Args:
url_fetch_fn: Function to fetch URLs (injected to avoid circular import).
"""
global _bundles_scanned
if _bundles_scanned:
return
_bundles_scanned = True
try:
from .constants import get_user_agent
html = url_fetch_fn("https://x.com", {"user-agent": get_user_agent()})
script_pattern = re.compile(
r'(?:src|href)=["\']'
r'(https://abs\.twimg\.com/responsive-web/client-web[^"\']+'
r'\.js)'
r'["\']'
)
script_urls = script_pattern.findall(html)
except Exception as exc: # pragma: no cover - network-dependent branch
logger.warning("Failed to scan JS bundles: %s", exc)
return
for script_url in script_urls:
try:
bundle = url_fetch_fn(script_url)
op_pattern = re.compile(
r'queryId:\s*"([A-Za-z0-9_-]+)"[^}]{0,200}'
r'operationName:\s*"([^"]+)"'
)
for match in op_pattern.finditer(bundle):
query_id, operation_name = match.group(1), match.group(2)
_cached_query_ids.setdefault(operation_name, query_id)
except Exception:
continue
logger.info("Scanned %d JS bundles, cached %d query IDs", len(script_urls), len(_cached_query_ids))
def _update_features_from_html(html):
# type: (str) -> None
"""Extract live feature flags from x.com HTML and update the global FEATURES dict.
Twitter embeds feature switch config in inline scripts on the homepage.
We parse these to keep FEATURES in sync with the current frontend.
Only UPDATES existing keys — never adds new ones to avoid URL bloat.
"""
try:
feature_pattern = re.compile(
r'"([a-z][a-z0-9_]+)":\s*\{\s*"value"\s*:\s*(true|false)',
re.IGNORECASE,
)
found = 0
for match in feature_pattern.finditer(html):
key = match.group(1)
value = match.group(2).lower() == "true"
# Only update keys already in FEATURES — never add new ones
# Adding new keys inflates URL length, causing 414/431 errors
if key in FEATURES and FEATURES[key] != value:
logger.debug("Feature flag updated: %s = %s -> %s", key, FEATURES[key], value)
FEATURES[key] = value
found += 1
if found:
logger.info("Updated %d feature flags from x.com", found)
except Exception as exc:
logger.debug("Feature extraction from HTML failed: %s", exc)
def _fetch_from_github(url_fetch_fn, operation_name):
# type: (Any, str) -> Optional[str]
"""Fetch queryId from community-maintained twitter-openapi file."""
try:
payload = url_fetch_fn(TWITTER_OPENAPI_URL)
parsed = json.loads(payload)
operation = parsed.get(operation_name, {})
query_id = operation.get("queryId")
if isinstance(query_id, str) and query_id:
return query_id
except Exception as exc: # pragma: no cover - network-dependent branch
logger.debug("GitHub queryId lookup failed: %s", exc)
return None
def _invalidate_query_id(operation_name):
# type: (str) -> None
"""Remove a cached queryId for an operation."""
_cached_query_ids.pop(operation_name, None)
def _resolve_query_id(operation_name, prefer_fallback=True, url_fetch_fn=None):
# type: (str, bool, Any) -> str
"""Resolve queryId using cache, remote sources, and fallback constants."""
cached = _cached_query_ids.get(operation_name)
if cached:
return cached
fallback = FALLBACK_QUERY_IDS.get(operation_name)
if prefer_fallback and fallback:
_cached_query_ids[operation_name] = fallback
return fallback
if url_fetch_fn:
github_query_id = _fetch_from_github(url_fetch_fn, operation_name)
if github_query_id:
_cached_query_ids[operation_name] = github_query_id
return github_query_id
_scan_bundles(url_fetch_fn)
cached = _cached_query_ids.get(operation_name)
if cached:
return cached
if fallback:
_cached_query_ids[operation_name] = fallback
return fallback
raise QueryIdError('Cannot resolve queryId for "%s"' % operation_name)

308
twitter_cli/parser.py Normal file
View File

@@ -0,0 +1,308 @@
"""Response parsing for Twitter GraphQL API.
Converts raw GraphQL response JSON into domain model objects
(Tweet, UserProfile, Author, etc.).
"""
from __future__ import annotations
import logging
from .models import Author, Metrics, Tweet, TweetMedia, UserProfile
logger = logging.getLogger(__name__)
# ── Utility helpers ──────────────────────────────────────────────────────
def _deep_get(data, *keys):
# type: (Any, *Any) -> Any
"""Safely get nested dict/list values. Supports int keys for list access."""
current = data
for key in keys:
if isinstance(key, int):
if isinstance(current, list) and 0 <= key < len(current):
current = current[key]
else:
return None
elif isinstance(current, dict):
current = current.get(key)
else:
return None
return current
def _parse_int(value, default):
# type: (Any, int) -> int
"""Best-effort integer conversion. Handles commas and float strings."""
try:
text = str(value).replace(",", "").strip()
if not text:
return default
return int(float(text))
except (TypeError, ValueError):
return default
def _extract_cursor(content):
# type: (Dict[str, Any]) -> Optional[str]
"""Extract Bottom pagination cursor from timeline content."""
if content.get("cursorType") == "Bottom":
return content.get("value")
return None
# ── Media / Author extraction ────────────────────────────────────────────
def _extract_media(legacy):
# type: (Dict[str, Any]) -> List[TweetMedia]
"""Extract media items from tweet legacy data."""
media = [] # type: List[TweetMedia]
for media_item in _deep_get(legacy, "extended_entities", "media") or []:
media_type = media_item.get("type", "")
if media_type == "photo":
media.append(
TweetMedia(
type="photo",
url=media_item.get("media_url_https", ""),
width=_deep_get(media_item, "original_info", "width"),
height=_deep_get(media_item, "original_info", "height"),
)
)
elif media_type in {"video", "animated_gif"}:
variants = media_item.get("video_info", {}).get("variants", [])
mp4_variants = [v for v in variants if v.get("content_type") == "video/mp4"]
mp4_variants.sort(key=lambda v: v.get("bitrate", 0), reverse=True)
media.append(
TweetMedia(
type=media_type,
url=mp4_variants[0]["url"] if mp4_variants else media_item.get("media_url_https", ""),
width=_deep_get(media_item, "original_info", "width"),
height=_deep_get(media_item, "original_info", "height"),
)
)
return media
def _extract_author(user_data, user_legacy):
# type: (Dict[str, Any], Dict[str, Any]) -> Author
"""Extract Author from user result data."""
user_core = user_data.get("core", {})
return Author(
id=user_data.get("rest_id", ""),
name=user_core.get("name") or user_legacy.get("name") or user_data.get("name", "Unknown"),
screen_name=(
user_core.get("screen_name")
or user_legacy.get("screen_name")
or user_data.get("screen_name", "unknown")
),
profile_image_url=(
user_data.get("avatar", {}).get("image_url")
or user_legacy.get("profile_image_url_https", "")
),
verified=bool(user_data.get("is_blue_verified") or user_legacy.get("verified", False)),
)
# ── Article parsing ──────────────────────────────────────────────────────
def _parse_article(tweet_data):
# type: (Dict[str, Any]) -> Dict[str, Any]
"""Extract Twitter Article data (long-form content) from a tweet.
Returns dict with 'article_title' and 'article_text' keys (None if not an article).
Converts draft.js content blocks to Markdown.
"""
article_results = _deep_get(tweet_data, "article", "article_results", "result")
if not article_results:
return {"article_title": None, "article_text": None}
title = article_results.get("title") # type: Optional[str]
content_state = article_results.get("content_state", {})
blocks = content_state.get("blocks", [])
if not blocks:
return {"article_title": title, "article_text": None}
# Convert draft.js blocks to Markdown
parts = [] # type: List[str]
ordered_counter = 0
for block in blocks:
block_type = block.get("type", "unstyled") # type: str
if block_type == "atomic":
continue
text = block.get("text", "") # type: str
if not text:
continue
if block_type != "ordered-list-item":
ordered_counter = 0
if block_type == "header-one":
parts.append("# %s" % text)
elif block_type == "header-two":
parts.append("## %s" % text)
elif block_type == "header-three":
parts.append("### %s" % text)
elif block_type == "blockquote":
parts.append("> %s" % text)
elif block_type == "unordered-list-item":
parts.append("- %s" % text)
elif block_type == "ordered-list-item":
ordered_counter += 1
parts.append("%d. %s" % (ordered_counter, text))
elif block_type == "code-block":
parts.append("```\n%s\n```" % text)
else:
parts.append(text)
return {
"article_title": title,
"article_text": "\n\n".join(parts) if parts else None,
}
# ── User parsing ─────────────────────────────────────────────────────────
def parse_user_result(user_data):
# type: (Dict[str, Any]) -> Optional[UserProfile]
"""Parse a user result object into UserProfile."""
if user_data.get("__typename") == "UserUnavailable":
return None
legacy = user_data.get("legacy", {})
if not legacy:
return None
return UserProfile(
id=user_data.get("rest_id", ""),
name=legacy.get("name", ""),
screen_name=legacy.get("screen_name", ""),
bio=legacy.get("description", ""),
location=legacy.get("location", ""),
url=_deep_get(legacy, "entities", "url", "urls", 0, "expanded_url") or "",
followers_count=_parse_int(legacy.get("followers_count"), 0),
following_count=_parse_int(legacy.get("friends_count"), 0),
tweets_count=_parse_int(legacy.get("statuses_count"), 0),
likes_count=_parse_int(legacy.get("favourites_count"), 0),
verified=user_data.get("is_blue_verified", False) or legacy.get("verified", False),
profile_image_url=legacy.get("profile_image_url_https", ""),
created_at=legacy.get("created_at", ""),
)
# ── Tweet parsing ────────────────────────────────────────────────────────
def parse_tweet_result(result, depth=0):
# type: (Dict[str, Any], int) -> Optional[Tweet]
"""Parse a single TweetResult into a Tweet dataclass."""
if depth > 2:
return None
tweet_data = result
if result.get("__typename") == "TweetWithVisibilityResults" and result.get("tweet"):
tweet_data = result["tweet"]
if tweet_data.get("__typename") == "TweetTombstone":
return None
legacy = tweet_data.get("legacy")
core = tweet_data.get("core")
if not isinstance(legacy, dict) or not isinstance(core, dict):
return None
user = _deep_get(core, "user_results", "result") or {}
user_legacy = user.get("legacy", {})
user_core = user.get("core", {})
is_retweet = bool(_deep_get(legacy, "retweeted_status_result", "result"))
actual_data = tweet_data
actual_legacy = legacy
actual_user = user
actual_user_legacy = user_legacy
if is_retweet:
retweet_result = _deep_get(legacy, "retweeted_status_result", "result") or {}
if retweet_result.get("__typename") == "TweetWithVisibilityResults" and retweet_result.get("tweet"):
retweet_result = retweet_result["tweet"]
rt_legacy = retweet_result.get("legacy")
rt_core = retweet_result.get("core")
if isinstance(rt_legacy, dict) and isinstance(rt_core, dict):
actual_data = retweet_result
actual_legacy = rt_legacy
actual_user = _deep_get(rt_core, "user_results", "result") or {}
actual_user_legacy = actual_user.get("legacy", {})
media = _extract_media(actual_legacy)
urls = [item.get("expanded_url", "") for item in _deep_get(actual_legacy, "entities", "urls") or []]
quoted = _deep_get(actual_data, "quoted_status_result", "result")
quoted_tweet = parse_tweet_result(quoted, depth=depth + 1) if isinstance(quoted, dict) else None
author = _extract_author(actual_user, actual_user_legacy)
retweeted_by = None # type: Optional[str]
if is_retweet:
retweeted_by = user_core.get("screen_name") or user_legacy.get("screen_name", "unknown")
return Tweet(
id=actual_data.get("rest_id", ""),
text=actual_legacy.get("full_text", ""),
author=author,
metrics=Metrics(
likes=_parse_int(actual_legacy.get("favorite_count"), 0),
retweets=_parse_int(actual_legacy.get("retweet_count"), 0),
replies=_parse_int(actual_legacy.get("reply_count"), 0),
quotes=_parse_int(actual_legacy.get("quote_count"), 0),
views=_parse_int(_deep_get(actual_data, "views", "count"), 0),
bookmarks=_parse_int(actual_legacy.get("bookmark_count"), 0),
),
created_at=actual_legacy.get("created_at", ""),
media=media,
urls=urls,
is_retweet=is_retweet,
retweeted_by=retweeted_by,
quoted_tweet=quoted_tweet,
lang=actual_legacy.get("lang", ""),
**_parse_article(actual_data),
)
# ── Timeline response parsing ───────────────────────────────────────────
def parse_timeline_response(data, get_instructions):
# type: (Any, Callable[[Any], Any]) -> Tuple[List[Tweet], Optional[str]]
"""Parse timeline GraphQL response into tweets and next cursor."""
tweets = [] # type: List[Tweet]
next_cursor = None # type: Optional[str]
instructions = get_instructions(data)
if not isinstance(instructions, list):
logger.warning("No timeline instructions found")
return tweets, next_cursor
for instruction in instructions:
entries = instruction.get("entries") or instruction.get("moduleItems") or []
for entry in entries:
content = entry.get("content", {})
next_cursor = _extract_cursor(content) or next_cursor
item_content = content.get("itemContent", {})
result = _deep_get(item_content, "tweet_results", "result")
if result:
tweet = parse_tweet_result(result)
if tweet:
tweets.append(tweet)
for nested_item in content.get("items", []):
nested_result = _deep_get(
nested_item,
"item",
"itemContent",
"tweet_results",
"result",
)
if nested_result:
tweet = parse_tweet_result(nested_result)
if tweet:
tweets.append(tweet)
return tweets, next_cursor