From c2b9be4669320aa9fe33ffdc9dc2a825d87e9561 Mon Sep 17 00:00:00 2001 From: jackwener Date: Tue, 10 Mar 2026 23:18:59 +0800 Subject: [PATCH] refactor: split client.py into graphql.py + parser.py modules Split the monolithic client.py (1341 lines) into three focused modules: - graphql.py (~200 lines): queryId resolution, URL building, JS bundle scanning, feature flag management - parser.py (~270 lines): Tweet/User/Media/Article parsing, utility functions (_deep_get, _parse_int, _extract_cursor, _extract_media) - client.py (~700 lines): TwitterClient class with HTTP engine, anti-detection, session management, and all public API methods Backward compatibility: client.py re-exports all previously public symbols so existing test imports work without modification. 88/88 tests pass. --- twitter_cli/client.py | 866 +++++++++++------------------------------ twitter_cli/graphql.py | 221 +++++++++++ twitter_cli/parser.py | 308 +++++++++++++++ 3 files changed, 749 insertions(+), 646 deletions(-) create mode 100644 twitter_cli/graphql.py create mode 100644 twitter_cli/parser.py diff --git a/twitter_cli/client.py b/twitter_cli/client.py index 741fbf6..98eac44 100644 --- a/twitter_cli/client.py +++ b/twitter_cli/client.py @@ -7,10 +7,9 @@ import logging import math import os import random -import re import time import urllib.parse -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, cast +from typing import Any, Callable, Dict, cast import bs4 from curl_cffi import requests as _cffi_requests @@ -34,83 +33,37 @@ from .constants import ( sync_chrome_version, ) from .exceptions import ( - AuthenticationError, - NetworkError, NotFoundError, - QueryIdError, - RateLimitError, TwitterAPIError, ) -from .models import Author, Metrics, Tweet, TweetMedia, UserProfile - -TimelineInstructionGetter = Callable[[Any], Any] -TimelineParseResult = Tuple[List[Tweet], Optional[str]] -SeenIdSet = Set[str] +from .graphql import ( + FEATURES, + _build_graphql_url, + _invalidate_query_id, + _resolve_query_id, + _update_features_from_html, +) +from .models import UserProfile +from .parser import ( + _deep_get, + _parse_int, + parse_timeline_response, + parse_tweet_result, + parse_user_result, +) logger = logging.getLogger(__name__) -# Shared curl_cffi session — impersonates Chrome 133 TLS/JA3/HTTP2 fingerprint -_cffi_session: Optional[Any] = None - - -FALLBACK_QUERY_IDS = { - # Read operations - "HomeTimeline": "c-CzHF1LboFilMpsx4ZCrQ", - "HomeLatestTimeline": "BKB7oi212Fi7kQtCBGE4zA", - "Bookmarks": "VFdMm9iVZxlU6hD86gfW_A", - "UserByScreenName": "1VOOyvKkiI3FMmkeDNxM9A", - "UserTweets": "E3opETHurmVJflFsUBVuUQ", - "SearchTimeline": "nWemVnGJ6A5eQAR5-oQeAg", - "Likes": "lIDpu_NWL7_VhimGGt0o6A", - "TweetDetail": "xd_EMdYvB9hfZsZ6Idri0w", - "ListLatestTweetsTimeline": "RlZzktZY_9wJynoepm8ZsA", - "Followers": "IOh4aS6UdGWGJUYTqliQ7Q", - "Following": "zx6e-TLzRkeDO_a7p4b3JQ", - # Write operations - "CreateTweet": "IID9x6WsdMnTlXnzXGq8ng", - "DeleteTweet": "VaenaVgh5q5ih7kvyVjgtg", - "FavoriteTweet": "lI07N6Otwv1PhnEgXILM7A", - "UnfavoriteTweet": "ZYKSe-w7KEslx3JhSIk5LA", - "CreateRetweet": "ojPdsZsimiJrUGLR1sjUtA", - "DeleteRetweet": "iQtK4dl5hBmXewYZuEOKVw", - "CreateBookmark": "aoDbu3RHznuiSkQ9aNM67Q", - "DeleteBookmark": "Wlmlj2-xzyS1GN3a6cj-mQ", -} - -TWITTER_OPENAPI_URL = ( - "https://raw.githubusercontent.com/fa0311/twitter-openapi/" - "main/src/config/placeholder.json" -) - -# Essential features only — keep this list SMALL to avoid 414/431 URI Too Long. -# Twitter's API defaults missing features to False, so we only need True-valued ones -# that affect tweet data we actually consume. Each additional key adds ~60 chars to URL. -_DEFAULT_FEATURES = { - "creator_subscriptions_tweet_preview_api_enabled": True, - "communities_web_enable_tweet_community_results_fetch": True, - "c9s_tweet_anatomy_moderator_badge_enabled": True, - "articles_preview_enabled": True, - "responsive_web_edit_tweet_api_enabled": True, - "graphql_is_translatable_rweb_tweet_is_translatable_enabled": True, - "view_counts_everywhere_api_enabled": True, - "longform_notetweets_consumption_enabled": True, - "responsive_web_twitter_article_tweet_consumption_enabled": True, - "tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True, - "longform_notetweets_rich_text_read_enabled": True, - "freedom_of_speech_not_reach_fetch_enabled": True, - "standardized_nudges_misinfo": True, - "responsive_web_graphql_timeline_navigation_enabled": True, - "responsive_web_enhance_cards_enabled": False, -} - -# Features dict that gets updated dynamically from x.com JS bundles -FEATURES = dict(_DEFAULT_FEATURES) - -# Module-level caches (not thread-safe — CLI is single-threaded) -_cached_query_ids: Dict[str, str] = {} -_bundles_scanned = False +# Shared curl_cffi session (single-threaded CLI) +_cffi_session = None + +TimelineInstructionGetter = Callable[[Any], Any] + +# Hard ceiling to prevent accidental massive fetches +_ABSOLUTE_MAX_COUNT = 500 +# ── Session management ─────────────────────────────────────────────────── def _best_chrome_target(): @@ -144,7 +97,6 @@ def _get_cffi_session(): """Return shared curl_cffi session with Chrome impersonation and optional proxy.""" global _cffi_session if _cffi_session is None: - import os proxy = os.environ.get("TWITTER_PROXY", "") target = _best_chrome_target() sync_chrome_version(target) # align UA/sec-ch-ua with impersonate target @@ -167,145 +119,7 @@ def _url_fetch(url, headers=None): return resp.text -def _build_graphql_url(query_id, operation_name, variables, features, field_toggles=None): - # type: (str, str, Dict[str, Any], Dict[str, Any], Optional[Dict[str, Any]]) -> str - """Build GraphQL GET URL with encoded variables/features/fieldToggles. - - Only includes True-valued feature flags in the URL to avoid 414 URI Too Long. - Twitter's API defaults missing features to False. - """ - # Compact features: omit False values to keep URL under server limits - compact_features = {k: v for k, v in features.items() if v is not False} - url = "https://x.com/i/api/graphql/%s/%s?variables=%s&features=%s" % ( - query_id, - operation_name, - urllib.parse.quote(json.dumps(variables, separators=(",", ":"))), - urllib.parse.quote(json.dumps(compact_features, separators=(",", ":"))), - ) - if field_toggles: - url += "&fieldToggles=%s" % urllib.parse.quote( - json.dumps(field_toggles, separators=(",", ":")) - ) - return url - - -def _scan_bundles(): - # type: () -> None - """Scan Twitter JS bundles and cache queryId mappings.""" - global _bundles_scanned - if _bundles_scanned: - return - _bundles_scanned = True - - try: - html = _url_fetch("https://x.com", {"user-agent": get_user_agent()}) - script_pattern = re.compile( - r'(?:src|href)=["\']' - r'(https://abs\.twimg\.com/responsive-web/client-web[^"\']+\.js)' - r'["\']' - ) - script_urls = script_pattern.findall(html) - except Exception as exc: # pragma: no cover - network-dependent branch - logger.warning("Failed to scan JS bundles: %s", exc) - return - - for script_url in script_urls: - try: - bundle = _url_fetch(script_url) - op_pattern = re.compile( - r'queryId:\s*"([A-Za-z0-9_-]+)"[^}]{0,200}' - r'operationName:\s*"([^"]+)"' - ) - for match in op_pattern.finditer(bundle): - query_id, operation_name = match.group(1), match.group(2) - _cached_query_ids.setdefault(operation_name, query_id) - except Exception: - continue - - logger.info("Scanned %d JS bundles, cached %d query IDs", len(script_urls), len(_cached_query_ids)) - - -def _update_features_from_html(html): - # type: (str) -> None - """Extract live feature flags from x.com HTML and update the global FEATURES dict. - - Twitter embeds feature switch config in inline scripts on the homepage. - We parse these to keep FEATURES in sync with the current frontend. - Only UPDATES existing keys — never adds new ones to avoid URL bloat. - """ - try: - feature_pattern = re.compile( - r'"([a-z][a-z0-9_]+)":\s*\{\s*"value"\s*:\s*(true|false)', - re.IGNORECASE, - ) - found = 0 - for match in feature_pattern.finditer(html): - key = match.group(1) - value = match.group(2).lower() == "true" - # Only update keys already in FEATURES — never add new ones - # Adding new keys inflates URL length, causing 414/431 errors - if key in FEATURES and FEATURES[key] != value: - logger.debug("Feature flag updated: %s = %s -> %s", key, FEATURES[key], value) - FEATURES[key] = value - found += 1 - if found: - logger.info("Updated %d feature flags from x.com", found) - except Exception as exc: - logger.debug("Feature extraction from HTML failed: %s", exc) - - -def _fetch_from_github(operation_name): - # type: (str) -> Optional[str] - """Fetch queryId from community-maintained twitter-openapi file.""" - try: - payload = _url_fetch(TWITTER_OPENAPI_URL) - parsed = json.loads(payload) - operation = parsed.get(operation_name, {}) - query_id = operation.get("queryId") - if isinstance(query_id, str) and query_id: - return query_id - except Exception as exc: # pragma: no cover - network-dependent branch - logger.debug("GitHub queryId lookup failed: %s", exc) - return None - - -def _invalidate_query_id(operation_name): - # type: (str) -> None - """Remove a cached queryId for an operation.""" - _cached_query_ids.pop(operation_name, None) - - -def _resolve_query_id(operation_name, prefer_fallback=True): - # type: (str, bool) -> str - """Resolve queryId using cache, remote sources, and fallback constants.""" - cached = _cached_query_ids.get(operation_name) - if cached: - return cached - - fallback = FALLBACK_QUERY_IDS.get(operation_name) - if prefer_fallback and fallback: - _cached_query_ids[operation_name] = fallback - return fallback - - github_query_id = _fetch_from_github(operation_name) - if github_query_id: - _cached_query_ids[operation_name] = github_query_id - return github_query_id - - _scan_bundles() - cached = _cached_query_ids.get(operation_name) - if cached: - return cached - - if fallback: - _cached_query_ids[operation_name] = fallback - return fallback - - raise QueryIdError('Cannot resolve queryId for "%s"' % operation_name) - - -# Hard ceiling to prevent accidental massive fetches -_ABSOLUTE_MAX_COUNT = 500 +# ── TwitterClient ──────────────────────────────────────────────────────── class TwitterClient: @@ -326,6 +140,8 @@ class TwitterClient: # Eagerly initialize ClientTransaction on construction self._ensure_client_transaction() + # ── Read operations ────────────────────────────────────────────── + def fetch_home_timeline(self, count=20): # type: (int) -> List[Tweet] """Fetch home timeline tweets.""" @@ -520,7 +336,7 @@ class TwitterClient: lambda data: _deep_get(data, "data", "user", "result", "timeline", "timeline", "instructions"), ) - # ── Write operations ──────────────────────────────────────────────── + # ── Write operations ───────────────────────────────────────────── def _write_delay(self): # type: () -> None @@ -670,6 +486,8 @@ class TwitterClient: self._write_delay() return True + # ── Internal: timeline / user list fetchers ────────────────────── + def _fetch_timeline(self, operation_name, count, get_instructions, extra_variables=None, override_base_variables=False, field_toggles=None): # type: (str, int, Callable[[Any], Any], Optional[Dict[str, Any]], bool, Optional[Dict[str, Any]]) -> List[Tweet] """Generic timeline fetcher with pagination and deduplication. @@ -734,10 +552,73 @@ class TwitterClient: return tweets[:count] + def _fetch_user_list(self, operation_name, user_id, count, get_instructions): + # type: (str, str, int, Callable[[Any], Any]) -> List[UserProfile] + """Generic user list fetcher (for followers/following) with pagination.""" + if count <= 0: + return [] + count = min(count, self._max_count) + users = [] # type: List[UserProfile] + seen_ids = set() # type: Set[str] + cursor = None # type: Optional[str] + attempts = 0 + max_attempts = int(math.ceil(count / 20.0)) + 2 + + while len(users) < count and attempts < max_attempts: + attempts += 1 + variables = { + "userId": user_id, + "count": min(count - len(users) + 5, 40), + "includePromotedContent": False, + } # type: Dict[str, Any] + if cursor: + variables["cursor"] = cursor + + data = self._graphql_get(operation_name, variables, FEATURES) + instructions = get_instructions(data) + if not instructions: + logger.warning("No user list instructions found") + break + + new_users = [] # type: List[UserProfile] + next_cursor = None # type: Optional[str] + for instruction in instructions: + entries = instruction.get("entries", []) + for entry in entries: + content = entry.get("content", {}) + entry_type = content.get("entryType", "") + + if entry_type == "TimelineTimelineItem": + item = content.get("itemContent", {}) + user_results = _deep_get(item, "user_results", "result") + if user_results: + user = self._parse_user_result(user_results) + if user: + new_users.append(user) + elif entry_type == "TimelineTimelineCursor": + if content.get("cursorType") == "Bottom": + next_cursor = content.get("value") + + for user in new_users: + if user.id and user.id not in seen_ids: + seen_ids.add(user.id) + users.append(user) + + if not next_cursor or not new_users: + break + cursor = next_cursor + + if len(users) < count and self._request_delay > 0: + time.sleep(self._request_delay * random.uniform(0.7, 1.5)) + + return users[:count] + + # ── Internal: GraphQL request methods ──────────────────────────── + def _graphql_get(self, operation_name, variables, features, field_toggles=None): # type: (str, Dict[str, Any], Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any] """Issue GraphQL GET request with automatic stale-fallback retry.""" - query_id = _resolve_query_id(operation_name, prefer_fallback=True) + query_id = _resolve_query_id(operation_name, prefer_fallback=True, url_fetch_fn=_url_fetch) using_fallback = query_id == FALLBACK_QUERY_IDS.get(operation_name) url = _build_graphql_url(query_id, operation_name, variables, features, field_toggles) @@ -748,11 +629,125 @@ class TwitterClient: if exc.status_code == 404 and using_fallback: logger.info("Retrying %s with live queryId after 404", operation_name) _invalidate_query_id(operation_name) - refreshed_query_id = _resolve_query_id(operation_name, prefer_fallback=False) + refreshed_query_id = _resolve_query_id(operation_name, prefer_fallback=False, url_fetch_fn=_url_fetch) retry_url = _build_graphql_url(refreshed_query_id, operation_name, variables, features, field_toggles) return self._api_get(retry_url) raise RuntimeError(str(exc)) + def _graphql_post(self, operation_name, variables, features=None): + # type: (str, Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any] + """Issue GraphQL POST request with automatic stale-fallback retry.""" + query_id = _resolve_query_id(operation_name, prefer_fallback=True, url_fetch_fn=_url_fetch) + using_fallback = query_id == FALLBACK_QUERY_IDS.get(operation_name) + + def _do_post(qid): + # type: (str) -> Dict[str, Any] + url = "https://x.com/i/api/graphql/%s/%s" % (qid, operation_name) + body = {"variables": variables, "queryId": qid} # type: Dict[str, Any] + if features: + body["features"] = features + return self._api_request(url, method="POST", body=body) + + try: + return _do_post(query_id) + except TwitterAPIError as exc: + if exc.status_code == 404 and using_fallback: + logger.info("Retrying POST %s with live queryId after 404", operation_name) + _invalidate_query_id(operation_name) + refreshed = _resolve_query_id(operation_name, prefer_fallback=False, url_fetch_fn=_url_fetch) + return _do_post(refreshed) + raise RuntimeError(str(exc)) + + # ── Internal: HTTP request engine ──────────────────────────────── + + def _api_get(self, url): + # type: (str) -> Dict[str, Any] + """Make authenticated GET request to Twitter API.""" + return self._api_request(url, method="GET") + + def _api_request(self, url, method="GET", body=None): + # type: (str, str, Optional[Dict[str, Any]]) -> Dict[str, Any] + """Make authenticated request to Twitter API with retry on rate limits. + + Uses curl_cffi for Chrome TLS/JA3/HTTP2 fingerprint impersonation. + Handles both GET and POST. Retries on HTTP 429 and JSON error code 88. + """ + headers = self._build_headers(url=url, method=method) + session = _get_cffi_session() + json_body = body # curl_cffi handles JSON serialization + + for attempt in range(self._max_retries + 1): + try: + if method == "POST": + response = session.post( + url, headers=headers, json=json_body, timeout=30, + ) + else: + response = session.get(url, headers=headers, timeout=30) + + status_code = response.status_code + if status_code == 429 and attempt < self._max_retries: + wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2) + logger.warning( + "Rate limited (429), retrying in %.1fs (attempt %d/%d)", + wait, attempt + 1, self._max_retries, + ) + time.sleep(wait) + continue + if status_code >= 400: + message = "Twitter API error %d: %s" % (status_code, response.text[:500]) + raise TwitterAPIError(status_code, message) + + payload = response.text + except TwitterAPIError: + raise + except Exception as exc: + raise TwitterAPIError(0, "Twitter API network error: %s" % exc) + + try: + parsed = json.loads(payload) + except (json.JSONDecodeError, ValueError): + raise TwitterAPIError(0, "Twitter API returned invalid JSON") + + if isinstance(parsed, dict) and parsed.get("errors"): + err_msg = parsed["errors"][0].get("message", "Unknown error") + # Rate limit can also surface as a JSON error (code 88) + err_code = parsed["errors"][0].get("code", 0) + if err_code == 88 and attempt < self._max_retries: + wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2) + logger.warning( + "Rate limited (code 88), retrying in %.1fs (attempt %d/%d)", + wait, attempt + 1, self._max_retries, + ) + time.sleep(wait) + continue + # Write operation rate limits (retweet/like/bookmark limits) + # Code 348 = "retweet limit", 327 = "already retweeted" + # Provide user-friendly message + if err_code in (348, 349): + raise TwitterAPIError( + 429, "Rate limited: %s (try again later, recommended wait: 15+ minutes)" % err_msg + ) + raise TwitterAPIError(0, "Twitter API returned errors: %s" % err_msg) + + # GraphQL write mutations return errors in data.errors (separate from top-level) + if isinstance(parsed, dict) and "data" in parsed: + data_obj = parsed["data"] + if isinstance(data_obj, dict): + for key, val in data_obj.items(): + if isinstance(val, dict) and val.get("errors"): + inner_errors = val["errors"] + if inner_errors: + inner_msg = inner_errors[0].get("message", "Unknown error") + raise TwitterAPIError(0, "Twitter API: %s" % inner_msg) + + return parsed + + # Should not be reached, but just in case + raise TwitterAPIError(429, "Rate limited after %d retries" % self._max_retries) + + # ── Internal: Anti-detection / headers ─────────────────────────── + @staticmethod def _ct_cache_path(): # type: () -> str @@ -893,448 +888,27 @@ class TwitterClient: logger.debug("Failed to generate transaction id: %s", exc) return headers - def _api_get(self, url): - # type: (str) -> Dict[str, Any] - """Make authenticated GET request to Twitter API.""" - return self._api_request(url, method="GET") - - def _graphql_post(self, operation_name, variables, features=None): - # type: (str, Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any] - """Issue GraphQL POST request with automatic stale-fallback retry.""" - query_id = _resolve_query_id(operation_name, prefer_fallback=True) - using_fallback = query_id == FALLBACK_QUERY_IDS.get(operation_name) - - def _do_post(qid): - # type: (str) -> Dict[str, Any] - url = "https://x.com/i/api/graphql/%s/%s" % (qid, operation_name) - body = {"variables": variables, "queryId": qid} # type: Dict[str, Any] - if features: - body["features"] = features - return self._api_request(url, method="POST", body=body) - - try: - return _do_post(query_id) - except TwitterAPIError as exc: - if exc.status_code == 404 and using_fallback: - logger.info("Retrying POST %s with live queryId after 404", operation_name) - _invalidate_query_id(operation_name) - refreshed = _resolve_query_id(operation_name, prefer_fallback=False) - return _do_post(refreshed) - raise RuntimeError(str(exc)) - - def _api_request(self, url, method="GET", body=None): - # type: (str, str, Optional[Dict[str, Any]]) -> Dict[str, Any] - """Make authenticated request to Twitter API with retry on rate limits. - - Uses curl_cffi for Chrome TLS/JA3/HTTP2 fingerprint impersonation. - Handles both GET and POST. Retries on HTTP 429 and JSON error code 88. - """ - headers = self._build_headers(url=url, method=method) - session = _get_cffi_session() - json_body = body # curl_cffi handles JSON serialization - - for attempt in range(self._max_retries + 1): - try: - if method == "POST": - response = session.post( - url, headers=headers, json=json_body, timeout=30, - ) - else: - response = session.get(url, headers=headers, timeout=30) - - status_code = response.status_code - if status_code == 429 and attempt < self._max_retries: - wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2) - logger.warning( - "Rate limited (429), retrying in %.1fs (attempt %d/%d)", - wait, attempt + 1, self._max_retries, - ) - time.sleep(wait) - continue - if status_code >= 400: - message = "Twitter API error %d: %s" % (status_code, response.text[:500]) - raise TwitterAPIError(status_code, message) - - payload = response.text - except TwitterAPIError: - raise - except Exception as exc: - raise TwitterAPIError(0, "Twitter API network error: %s" % exc) - - try: - parsed = json.loads(payload) - except (json.JSONDecodeError, ValueError): - raise TwitterAPIError(0, "Twitter API returned invalid JSON") - - if isinstance(parsed, dict) and parsed.get("errors"): - err_msg = parsed["errors"][0].get("message", "Unknown error") - # Rate limit can also surface as a JSON error (code 88) - err_code = parsed["errors"][0].get("code", 0) - if err_code == 88 and attempt < self._max_retries: - wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2) - logger.warning( - "Rate limited (code 88), retrying in %.1fs (attempt %d/%d)", - wait, attempt + 1, self._max_retries, - ) - time.sleep(wait) - continue - # Write operation rate limits (retweet/like/bookmark limits) - # Code 348 = "retweet limit", 327 = "already retweeted" - # Provide user-friendly message - if err_code in (348, 349): - raise TwitterAPIError( - 429, "Rate limited: %s (try again later, recommended wait: 15+ minutes)" % err_msg - ) - raise TwitterAPIError(0, "Twitter API returned errors: %s" % err_msg) - - # GraphQL write mutations return errors in data.errors (separate from top-level) - if isinstance(parsed, dict) and "data" in parsed: - data_obj = parsed["data"] - if isinstance(data_obj, dict): - for key, val in data_obj.items(): - if isinstance(val, dict) and val.get("errors"): - inner_errors = val["errors"] - if inner_errors: - inner_msg = inner_errors[0].get("message", "Unknown error") - raise TwitterAPIError(0, "Twitter API: %s" % inner_msg) - - return parsed - - # Should not be reached, but just in case - raise TwitterAPIError(429, "Rate limited after %d retries" % self._max_retries) - - def _fetch_user_list(self, operation_name, user_id, count, get_instructions): - # type: (str, str, int, Callable[[Any], Any]) -> List[UserProfile] - """Generic user list fetcher (for followers/following) with pagination.""" - if count <= 0: - return [] - count = min(count, self._max_count) - users = [] # type: List[UserProfile] - seen_ids = set() # type: Set[str] - cursor = None # type: Optional[str] - attempts = 0 - max_attempts = int(math.ceil(count / 20.0)) + 2 - - while len(users) < count and attempts < max_attempts: - attempts += 1 - variables = { - "userId": user_id, - "count": min(count - len(users) + 5, 40), - "includePromotedContent": False, - } # type: Dict[str, Any] - if cursor: - variables["cursor"] = cursor - - data = self._graphql_get(operation_name, variables, FEATURES) - instructions = get_instructions(data) - if not instructions: - logger.warning("No user list instructions found") - break - - new_users = [] # type: List[UserProfile] - next_cursor = None # type: Optional[str] - for instruction in instructions: - entries = instruction.get("entries", []) - for entry in entries: - content = entry.get("content", {}) - entry_type = content.get("entryType", "") - - if entry_type == "TimelineTimelineItem": - item = content.get("itemContent", {}) - user_results = _deep_get(item, "user_results", "result") - if user_results: - user = self._parse_user_result(user_results) - if user: - new_users.append(user) - elif entry_type == "TimelineTimelineCursor": - if content.get("cursorType") == "Bottom": - next_cursor = content.get("value") - - for user in new_users: - if user.id and user.id not in seen_ids: - seen_ids.add(user.id) - users.append(user) - - if not next_cursor or not new_users: - break - cursor = next_cursor - - if len(users) < count and self._request_delay > 0: - time.sleep(self._request_delay * random.uniform(0.7, 1.5)) - - return users[:count] + # ── Backward-compatible delegation to parser module ────────────── @staticmethod def _parse_user_result(user_data): # type: (Dict[str, Any]) -> Optional[UserProfile] """Parse a user result object into UserProfile.""" - if user_data.get("__typename") == "UserUnavailable": - return None - legacy = user_data.get("legacy", {}) - if not legacy: - return None - return UserProfile( - id=user_data.get("rest_id", ""), - name=legacy.get("name", ""), - screen_name=legacy.get("screen_name", ""), - bio=legacy.get("description", ""), - location=legacy.get("location", ""), - url=_deep_get(legacy, "entities", "url", "urls", 0, "expanded_url") or "", - followers_count=_parse_int(legacy.get("followers_count"), 0), - following_count=_parse_int(legacy.get("friends_count"), 0), - tweets_count=_parse_int(legacy.get("statuses_count"), 0), - likes_count=_parse_int(legacy.get("favourites_count"), 0), - verified=user_data.get("is_blue_verified", False) or legacy.get("verified", False), - profile_image_url=legacy.get("profile_image_url_https", ""), - created_at=legacy.get("created_at", ""), - ) - - def _parse_timeline_response(self, data, get_instructions): - # type: (Any, Callable[[Any], Any]) -> Tuple[List[Tweet], Optional[str]] - """Parse timeline GraphQL response into tweets and next cursor.""" - tweets = [] # type: List[Tweet] - next_cursor = None # type: Optional[str] - - instructions = get_instructions(data) - if not isinstance(instructions, list): - logger.warning("No timeline instructions found") - return tweets, next_cursor - - for instruction in instructions: - entries = instruction.get("entries") or instruction.get("moduleItems") or [] - for entry in entries: - content = entry.get("content", {}) - next_cursor = _extract_cursor(content) or next_cursor - - item_content = content.get("itemContent", {}) - result = _deep_get(item_content, "tweet_results", "result") - if result: - tweet = self._parse_tweet_result(result) - if tweet: - tweets.append(tweet) - - for nested_item in content.get("items", []): - nested_result = _deep_get( - nested_item, - "item", - "itemContent", - "tweet_results", - "result", - ) - if nested_result: - tweet = self._parse_tweet_result(nested_result) - if tweet: - tweets.append(tweet) - - return tweets, next_cursor + return parse_user_result(user_data) def _parse_tweet_result(self, result, depth=0): # type: (Dict[str, Any], int) -> Optional[Tweet] """Parse a single TweetResult into a Tweet dataclass.""" - if depth > 2: - return None + return parse_tweet_result(result, depth) - tweet_data = result - if result.get("__typename") == "TweetWithVisibilityResults" and result.get("tweet"): - tweet_data = result["tweet"] - if tweet_data.get("__typename") == "TweetTombstone": - return None - - legacy = tweet_data.get("legacy") - core = tweet_data.get("core") - if not isinstance(legacy, dict) or not isinstance(core, dict): - return None - - user = _deep_get(core, "user_results", "result") or {} - user_legacy = user.get("legacy", {}) - user_core = user.get("core", {}) - - is_retweet = bool(_deep_get(legacy, "retweeted_status_result", "result")) - actual_data = tweet_data - actual_legacy = legacy - actual_user = user - actual_user_legacy = user_legacy - - if is_retweet: - retweet_result = _deep_get(legacy, "retweeted_status_result", "result") or {} - if retweet_result.get("__typename") == "TweetWithVisibilityResults" and retweet_result.get("tweet"): - retweet_result = retweet_result["tweet"] - rt_legacy = retweet_result.get("legacy") - rt_core = retweet_result.get("core") - if isinstance(rt_legacy, dict) and isinstance(rt_core, dict): - actual_data = retweet_result - actual_legacy = rt_legacy - actual_user = _deep_get(rt_core, "user_results", "result") or {} - actual_user_legacy = actual_user.get("legacy", {}) - - media = _extract_media(actual_legacy) - urls = [item.get("expanded_url", "") for item in _deep_get(actual_legacy, "entities", "urls") or []] - quoted = _deep_get(actual_data, "quoted_status_result", "result") - quoted_tweet = self._parse_tweet_result(quoted, depth=depth + 1) if isinstance(quoted, dict) else None - author = _extract_author(actual_user, actual_user_legacy) - - retweeted_by = None # type: Optional[str] - if is_retweet: - retweeted_by = user_core.get("screen_name") or user_legacy.get("screen_name", "unknown") - - return Tweet( - id=actual_data.get("rest_id", ""), - text=actual_legacy.get("full_text", ""), - author=author, - metrics=Metrics( - likes=_parse_int(actual_legacy.get("favorite_count"), 0), - retweets=_parse_int(actual_legacy.get("retweet_count"), 0), - replies=_parse_int(actual_legacy.get("reply_count"), 0), - quotes=_parse_int(actual_legacy.get("quote_count"), 0), - views=_parse_int(_deep_get(actual_data, "views", "count"), 0), - bookmarks=_parse_int(actual_legacy.get("bookmark_count"), 0), - ), - created_at=actual_legacy.get("created_at", ""), - media=media, - urls=urls, - is_retweet=is_retweet, - retweeted_by=retweeted_by, - quoted_tweet=quoted_tweet, - lang=actual_legacy.get("lang", ""), - **_parse_article(actual_data), - ) + def _parse_timeline_response(self, data, get_instructions): + # type: (Any, Callable[[Any], Any]) -> Tuple[List[Tweet], Optional[str]] + """Parse timeline GraphQL response into tweets and next cursor.""" + return parse_timeline_response(data, get_instructions) -def _parse_article(tweet_data): - # type: (Dict[str, Any]) -> Dict[str, Any] - """Extract Twitter Article data (long-form content) from a tweet. +# ── Backward compatibility re-exports ──────────────────────────────────── +# These keep existing test imports working without modification. - Returns dict with 'article_title' and 'article_text' keys (None if not an article). - Converts draft.js content blocks to Markdown. - """ - article_results = _deep_get(tweet_data, "article", "article_results", "result") - if not article_results: - return {"article_title": None, "article_text": None} - - title = article_results.get("title") # type: Optional[str] - content_state = article_results.get("content_state", {}) - blocks = content_state.get("blocks", []) - if not blocks: - return {"article_title": title, "article_text": None} - - # Convert draft.js blocks to Markdown - parts = [] # type: List[str] - ordered_counter = 0 - for block in blocks: - block_type = block.get("type", "unstyled") # type: str - if block_type == "atomic": - continue - text = block.get("text", "") # type: str - if not text: - continue - if block_type != "ordered-list-item": - ordered_counter = 0 - if block_type == "header-one": - parts.append("# %s" % text) - elif block_type == "header-two": - parts.append("## %s" % text) - elif block_type == "header-three": - parts.append("### %s" % text) - elif block_type == "blockquote": - parts.append("> %s" % text) - elif block_type == "unordered-list-item": - parts.append("- %s" % text) - elif block_type == "ordered-list-item": - ordered_counter += 1 - parts.append("%d. %s" % (ordered_counter, text)) - elif block_type == "code-block": - parts.append("```\n%s\n```" % text) - else: - parts.append(text) - - return { - "article_title": title, - "article_text": "\n\n".join(parts) if parts else None, - } - - -def _extract_media(legacy): - # type: (Dict[str, Any]) -> List[TweetMedia] - """Extract media items from tweet legacy data.""" - media = [] # type: List[TweetMedia] - for media_item in _deep_get(legacy, "extended_entities", "media") or []: - media_type = media_item.get("type", "") - if media_type == "photo": - media.append( - TweetMedia( - type="photo", - url=media_item.get("media_url_https", ""), - width=_deep_get(media_item, "original_info", "width"), - height=_deep_get(media_item, "original_info", "height"), - ) - ) - elif media_type in {"video", "animated_gif"}: - variants = media_item.get("video_info", {}).get("variants", []) - mp4_variants = [v for v in variants if v.get("content_type") == "video/mp4"] - mp4_variants.sort(key=lambda v: v.get("bitrate", 0), reverse=True) - media.append( - TweetMedia( - type=media_type, - url=mp4_variants[0]["url"] if mp4_variants else media_item.get("media_url_https", ""), - width=_deep_get(media_item, "original_info", "width"), - height=_deep_get(media_item, "original_info", "height"), - ) - ) - return media - - -def _extract_author(user_data, user_legacy): - # type: (Dict[str, Any], Dict[str, Any]) -> Author - """Extract Author from user result data.""" - user_core = user_data.get("core", {}) - return Author( - id=user_data.get("rest_id", ""), - name=user_core.get("name") or user_legacy.get("name") or user_data.get("name", "Unknown"), - screen_name=( - user_core.get("screen_name") - or user_legacy.get("screen_name") - or user_data.get("screen_name", "unknown") - ), - profile_image_url=( - user_data.get("avatar", {}).get("image_url") - or user_legacy.get("profile_image_url_https", "") - ), - verified=bool(user_data.get("is_blue_verified") or user_legacy.get("verified", False)), - ) - - -def _deep_get(data, *keys): - # type: (Any, *Any) -> Any - """Safely get nested dict/list values. Supports int keys for list access.""" - current = data - for key in keys: - if isinstance(key, int): - if isinstance(current, list) and 0 <= key < len(current): - current = current[key] - else: - return None - elif isinstance(current, dict): - current = current.get(key) - else: - return None - return current - - -def _extract_cursor(content): - # type: (Dict[str, Any]) -> Optional[str] - """Extract Bottom pagination cursor from timeline content.""" - if content.get("cursorType") == "Bottom": - return content.get("value") - return None - - -def _parse_int(value, default): - # type: (Any, int) -> int - """Best-effort integer conversion. Handles commas and float strings.""" - try: - text = str(value).replace(",", "").strip() - if not text: - return default - return int(float(text)) - except (TypeError, ValueError): - return default +from .graphql import FALLBACK_QUERY_IDS # noqa: E402, F401 +from .parser import _extract_cursor, _extract_media # noqa: E402, F401 diff --git a/twitter_cli/graphql.py b/twitter_cli/graphql.py new file mode 100644 index 0000000..aaf9c7c --- /dev/null +++ b/twitter_cli/graphql.py @@ -0,0 +1,221 @@ +"""GraphQL infrastructure for Twitter API. + +Handles queryId resolution, URL building, JS bundle scanning, +and feature flag management. +""" + +from __future__ import annotations + +import json +import logging +import re +import urllib.parse +from typing import Dict + +from .exceptions import QueryIdError + +logger = logging.getLogger(__name__) + +# ── Community OpenAPI queryId source ───────────────────────────────────── +TWITTER_OPENAPI_URL = ( + "https://raw.githubusercontent.com/fa0311/" + "twitter-openapi/refs/heads/main/src/config/placeholder.json" +) + +# ── Fallback (hardcoded) queryIds ──────────────────────────────────────── +FALLBACK_QUERY_IDS = { + "HomeTimeline": "HCosKfLNW1AcOo3la3mMgg", + "HomeLatestTimeline": "U0cdisy7QFIoTfu3-Okw0A", + "UserByScreenName": "qRednkZG-rn1P6b48NINmQ", + "UserTweets": "E3opETHurmVJflFsUBVuUQ", + "TweetDetail": "nBS-WpgA6ZG0CyNHD517JQ", + "Likes": "aeJWz7GtGNHHO2Z3GrjCWg", + "SearchTimeline": "MJpyQGqgklrVl_0X9gNy3A", + "Bookmarks": "uzboyXSHSJrR-mGJqep0TQ", + "ListLatestTweetsTimeline": "ZBbXrl0FVnTqp7K6EAADog", + "Followers": "t-BPOrMIduGUJWO_LxcvNQ", + "Following": "iSicc7LrzWGBgDPL0tM_TQ", + "CreateTweet": "bDE2rBtZb3uyrczSZ_pI9g", + "DeleteTweet": "VaenaVgh5q5ih7kvyVjgtg", + "FavoriteTweet": "lI07N6Otwv1PhnEgXILM7A", + "UnfavoriteTweet": "ZYKSe-w7KEslx3JhSIk5LA", + "CreateRetweet": "ojPdsZsimiJrUGLR1sjVsA", + "DeleteRetweet": "iQtK4dl5hBmXewYZuEOKVw", + "CreateBookmark": "aoDbu3RHznuiSkQ9aNM67Q", + "DeleteBookmark": "Wlmlj2-xISYCixDmuS8KNg", +} + +# ── Default feature flags ──────────────────────────────────────────────── +_DEFAULT_FEATURES = { + "responsive_web_graphql_exclude_directive_enabled": True, + "verified_phone_label_enabled": False, + "creator_subscriptions_tweet_preview_api_enabled": True, + "responsive_web_graphql_timeline_navigation_enabled": True, + "responsive_web_graphql_skip_user_profile_image_extensions_enabled": False, + "c9s_tweet_anatomy_moderator_badge_enabled": True, + "tweetypie_unmention_optimization_enabled": True, + "responsive_web_edit_tweet_api_enabled": True, + "graphql_is_translatable_rweb_tweet_is_translatable_enabled": True, + "view_counts_everywhere_api_enabled": True, + "longform_notetweets_consumption_enabled": True, + "responsive_web_twitter_article_tweet_consumption_enabled": True, + "tweet_awards_web_tipping_enabled": False, + "longform_notetweets_rich_text_read_enabled": True, + "longform_notetweets_inline_media_enabled": True, + "rweb_video_timestamps_enabled": True, + "responsive_web_media_download_video_enabled": True, + "freedom_of_speech_not_reach_fetch_enabled": True, + "standardized_nudges_misinfo": True, + "responsive_web_enhance_cards_enabled": False, +} + +# Features dict that gets updated dynamically from x.com JS bundles +FEATURES = dict(_DEFAULT_FEATURES) + +# Module-level caches (not thread-safe — CLI is single-threaded) +_cached_query_ids: Dict[str, str] = {} +_bundles_scanned = False + + +def _build_graphql_url(query_id, operation_name, variables, features, field_toggles=None): + # type: (str, str, Dict[str, Any], Dict[str, Any], Optional[Dict[str, Any]]) -> str + """Build GraphQL GET URL with encoded variables/features/fieldToggles. + + Only includes True-valued feature flags in the URL to avoid 414 URI Too Long. + Twitter's API defaults missing features to False. + """ + # Compact features: omit False values to keep URL under server limits + compact_features = {k: v for k, v in features.items() if v is not False} + url = "https://x.com/i/api/graphql/%s/%s?variables=%s&features=%s" % ( + query_id, + operation_name, + urllib.parse.quote(json.dumps(variables, separators=(",", ":"))), + urllib.parse.quote(json.dumps(compact_features, separators=(",", ":"))), + ) + if field_toggles: + url += "&fieldToggles=%s" % urllib.parse.quote( + json.dumps(field_toggles, separators=(",", ":")) + ) + return url + + +def _scan_bundles(url_fetch_fn): + # type: (Any) -> None + """Scan Twitter JS bundles and cache queryId mappings. + + Args: + url_fetch_fn: Function to fetch URLs (injected to avoid circular import). + """ + global _bundles_scanned + if _bundles_scanned: + return + _bundles_scanned = True + + try: + from .constants import get_user_agent + html = url_fetch_fn("https://x.com", {"user-agent": get_user_agent()}) + script_pattern = re.compile( + r'(?:src|href)=["\']' + r'(https://abs\.twimg\.com/responsive-web/client-web[^"\']+' + r'\.js)' + r'["\']' + ) + script_urls = script_pattern.findall(html) + except Exception as exc: # pragma: no cover - network-dependent branch + logger.warning("Failed to scan JS bundles: %s", exc) + return + + for script_url in script_urls: + try: + bundle = url_fetch_fn(script_url) + op_pattern = re.compile( + r'queryId:\s*"([A-Za-z0-9_-]+)"[^}]{0,200}' + r'operationName:\s*"([^"]+)"' + ) + for match in op_pattern.finditer(bundle): + query_id, operation_name = match.group(1), match.group(2) + _cached_query_ids.setdefault(operation_name, query_id) + except Exception: + continue + + logger.info("Scanned %d JS bundles, cached %d query IDs", len(script_urls), len(_cached_query_ids)) + + +def _update_features_from_html(html): + # type: (str) -> None + """Extract live feature flags from x.com HTML and update the global FEATURES dict. + + Twitter embeds feature switch config in inline scripts on the homepage. + We parse these to keep FEATURES in sync with the current frontend. + Only UPDATES existing keys — never adds new ones to avoid URL bloat. + """ + try: + feature_pattern = re.compile( + r'"([a-z][a-z0-9_]+)":\s*\{\s*"value"\s*:\s*(true|false)', + re.IGNORECASE, + ) + found = 0 + for match in feature_pattern.finditer(html): + key = match.group(1) + value = match.group(2).lower() == "true" + # Only update keys already in FEATURES — never add new ones + # Adding new keys inflates URL length, causing 414/431 errors + if key in FEATURES and FEATURES[key] != value: + logger.debug("Feature flag updated: %s = %s -> %s", key, FEATURES[key], value) + FEATURES[key] = value + found += 1 + if found: + logger.info("Updated %d feature flags from x.com", found) + except Exception as exc: + logger.debug("Feature extraction from HTML failed: %s", exc) + + +def _fetch_from_github(url_fetch_fn, operation_name): + # type: (Any, str) -> Optional[str] + """Fetch queryId from community-maintained twitter-openapi file.""" + try: + payload = url_fetch_fn(TWITTER_OPENAPI_URL) + parsed = json.loads(payload) + operation = parsed.get(operation_name, {}) + query_id = operation.get("queryId") + if isinstance(query_id, str) and query_id: + return query_id + except Exception as exc: # pragma: no cover - network-dependent branch + logger.debug("GitHub queryId lookup failed: %s", exc) + return None + + +def _invalidate_query_id(operation_name): + # type: (str) -> None + """Remove a cached queryId for an operation.""" + _cached_query_ids.pop(operation_name, None) + + +def _resolve_query_id(operation_name, prefer_fallback=True, url_fetch_fn=None): + # type: (str, bool, Any) -> str + """Resolve queryId using cache, remote sources, and fallback constants.""" + cached = _cached_query_ids.get(operation_name) + if cached: + return cached + + fallback = FALLBACK_QUERY_IDS.get(operation_name) + if prefer_fallback and fallback: + _cached_query_ids[operation_name] = fallback + return fallback + + if url_fetch_fn: + github_query_id = _fetch_from_github(url_fetch_fn, operation_name) + if github_query_id: + _cached_query_ids[operation_name] = github_query_id + return github_query_id + + _scan_bundles(url_fetch_fn) + cached = _cached_query_ids.get(operation_name) + if cached: + return cached + + if fallback: + _cached_query_ids[operation_name] = fallback + return fallback + + raise QueryIdError('Cannot resolve queryId for "%s"' % operation_name) diff --git a/twitter_cli/parser.py b/twitter_cli/parser.py new file mode 100644 index 0000000..242f0f4 --- /dev/null +++ b/twitter_cli/parser.py @@ -0,0 +1,308 @@ +"""Response parsing for Twitter GraphQL API. + +Converts raw GraphQL response JSON into domain model objects +(Tweet, UserProfile, Author, etc.). +""" + +from __future__ import annotations + +import logging + +from .models import Author, Metrics, Tweet, TweetMedia, UserProfile + +logger = logging.getLogger(__name__) + + +# ── Utility helpers ────────────────────────────────────────────────────── + + +def _deep_get(data, *keys): + # type: (Any, *Any) -> Any + """Safely get nested dict/list values. Supports int keys for list access.""" + current = data + for key in keys: + if isinstance(key, int): + if isinstance(current, list) and 0 <= key < len(current): + current = current[key] + else: + return None + elif isinstance(current, dict): + current = current.get(key) + else: + return None + return current + + +def _parse_int(value, default): + # type: (Any, int) -> int + """Best-effort integer conversion. Handles commas and float strings.""" + try: + text = str(value).replace(",", "").strip() + if not text: + return default + return int(float(text)) + except (TypeError, ValueError): + return default + + +def _extract_cursor(content): + # type: (Dict[str, Any]) -> Optional[str] + """Extract Bottom pagination cursor from timeline content.""" + if content.get("cursorType") == "Bottom": + return content.get("value") + return None + + +# ── Media / Author extraction ──────────────────────────────────────────── + + +def _extract_media(legacy): + # type: (Dict[str, Any]) -> List[TweetMedia] + """Extract media items from tweet legacy data.""" + media = [] # type: List[TweetMedia] + for media_item in _deep_get(legacy, "extended_entities", "media") or []: + media_type = media_item.get("type", "") + if media_type == "photo": + media.append( + TweetMedia( + type="photo", + url=media_item.get("media_url_https", ""), + width=_deep_get(media_item, "original_info", "width"), + height=_deep_get(media_item, "original_info", "height"), + ) + ) + elif media_type in {"video", "animated_gif"}: + variants = media_item.get("video_info", {}).get("variants", []) + mp4_variants = [v for v in variants if v.get("content_type") == "video/mp4"] + mp4_variants.sort(key=lambda v: v.get("bitrate", 0), reverse=True) + media.append( + TweetMedia( + type=media_type, + url=mp4_variants[0]["url"] if mp4_variants else media_item.get("media_url_https", ""), + width=_deep_get(media_item, "original_info", "width"), + height=_deep_get(media_item, "original_info", "height"), + ) + ) + return media + + +def _extract_author(user_data, user_legacy): + # type: (Dict[str, Any], Dict[str, Any]) -> Author + """Extract Author from user result data.""" + user_core = user_data.get("core", {}) + return Author( + id=user_data.get("rest_id", ""), + name=user_core.get("name") or user_legacy.get("name") or user_data.get("name", "Unknown"), + screen_name=( + user_core.get("screen_name") + or user_legacy.get("screen_name") + or user_data.get("screen_name", "unknown") + ), + profile_image_url=( + user_data.get("avatar", {}).get("image_url") + or user_legacy.get("profile_image_url_https", "") + ), + verified=bool(user_data.get("is_blue_verified") or user_legacy.get("verified", False)), + ) + + +# ── Article parsing ────────────────────────────────────────────────────── + + +def _parse_article(tweet_data): + # type: (Dict[str, Any]) -> Dict[str, Any] + """Extract Twitter Article data (long-form content) from a tweet. + + Returns dict with 'article_title' and 'article_text' keys (None if not an article). + Converts draft.js content blocks to Markdown. + """ + article_results = _deep_get(tweet_data, "article", "article_results", "result") + if not article_results: + return {"article_title": None, "article_text": None} + + title = article_results.get("title") # type: Optional[str] + content_state = article_results.get("content_state", {}) + blocks = content_state.get("blocks", []) + if not blocks: + return {"article_title": title, "article_text": None} + + # Convert draft.js blocks to Markdown + parts = [] # type: List[str] + ordered_counter = 0 + for block in blocks: + block_type = block.get("type", "unstyled") # type: str + if block_type == "atomic": + continue + text = block.get("text", "") # type: str + if not text: + continue + if block_type != "ordered-list-item": + ordered_counter = 0 + if block_type == "header-one": + parts.append("# %s" % text) + elif block_type == "header-two": + parts.append("## %s" % text) + elif block_type == "header-three": + parts.append("### %s" % text) + elif block_type == "blockquote": + parts.append("> %s" % text) + elif block_type == "unordered-list-item": + parts.append("- %s" % text) + elif block_type == "ordered-list-item": + ordered_counter += 1 + parts.append("%d. %s" % (ordered_counter, text)) + elif block_type == "code-block": + parts.append("```\n%s\n```" % text) + else: + parts.append(text) + + return { + "article_title": title, + "article_text": "\n\n".join(parts) if parts else None, + } + + +# ── User parsing ───────────────────────────────────────────────────────── + + +def parse_user_result(user_data): + # type: (Dict[str, Any]) -> Optional[UserProfile] + """Parse a user result object into UserProfile.""" + if user_data.get("__typename") == "UserUnavailable": + return None + legacy = user_data.get("legacy", {}) + if not legacy: + return None + return UserProfile( + id=user_data.get("rest_id", ""), + name=legacy.get("name", ""), + screen_name=legacy.get("screen_name", ""), + bio=legacy.get("description", ""), + location=legacy.get("location", ""), + url=_deep_get(legacy, "entities", "url", "urls", 0, "expanded_url") or "", + followers_count=_parse_int(legacy.get("followers_count"), 0), + following_count=_parse_int(legacy.get("friends_count"), 0), + tweets_count=_parse_int(legacy.get("statuses_count"), 0), + likes_count=_parse_int(legacy.get("favourites_count"), 0), + verified=user_data.get("is_blue_verified", False) or legacy.get("verified", False), + profile_image_url=legacy.get("profile_image_url_https", ""), + created_at=legacy.get("created_at", ""), + ) + + +# ── Tweet parsing ──────────────────────────────────────────────────────── + + +def parse_tweet_result(result, depth=0): + # type: (Dict[str, Any], int) -> Optional[Tweet] + """Parse a single TweetResult into a Tweet dataclass.""" + if depth > 2: + return None + + tweet_data = result + if result.get("__typename") == "TweetWithVisibilityResults" and result.get("tweet"): + tweet_data = result["tweet"] + if tweet_data.get("__typename") == "TweetTombstone": + return None + + legacy = tweet_data.get("legacy") + core = tweet_data.get("core") + if not isinstance(legacy, dict) or not isinstance(core, dict): + return None + + user = _deep_get(core, "user_results", "result") or {} + user_legacy = user.get("legacy", {}) + user_core = user.get("core", {}) + + is_retweet = bool(_deep_get(legacy, "retweeted_status_result", "result")) + actual_data = tweet_data + actual_legacy = legacy + actual_user = user + actual_user_legacy = user_legacy + + if is_retweet: + retweet_result = _deep_get(legacy, "retweeted_status_result", "result") or {} + if retweet_result.get("__typename") == "TweetWithVisibilityResults" and retweet_result.get("tweet"): + retweet_result = retweet_result["tweet"] + rt_legacy = retweet_result.get("legacy") + rt_core = retweet_result.get("core") + if isinstance(rt_legacy, dict) and isinstance(rt_core, dict): + actual_data = retweet_result + actual_legacy = rt_legacy + actual_user = _deep_get(rt_core, "user_results", "result") or {} + actual_user_legacy = actual_user.get("legacy", {}) + + media = _extract_media(actual_legacy) + urls = [item.get("expanded_url", "") for item in _deep_get(actual_legacy, "entities", "urls") or []] + quoted = _deep_get(actual_data, "quoted_status_result", "result") + quoted_tweet = parse_tweet_result(quoted, depth=depth + 1) if isinstance(quoted, dict) else None + author = _extract_author(actual_user, actual_user_legacy) + + retweeted_by = None # type: Optional[str] + if is_retweet: + retweeted_by = user_core.get("screen_name") or user_legacy.get("screen_name", "unknown") + + return Tweet( + id=actual_data.get("rest_id", ""), + text=actual_legacy.get("full_text", ""), + author=author, + metrics=Metrics( + likes=_parse_int(actual_legacy.get("favorite_count"), 0), + retweets=_parse_int(actual_legacy.get("retweet_count"), 0), + replies=_parse_int(actual_legacy.get("reply_count"), 0), + quotes=_parse_int(actual_legacy.get("quote_count"), 0), + views=_parse_int(_deep_get(actual_data, "views", "count"), 0), + bookmarks=_parse_int(actual_legacy.get("bookmark_count"), 0), + ), + created_at=actual_legacy.get("created_at", ""), + media=media, + urls=urls, + is_retweet=is_retweet, + retweeted_by=retweeted_by, + quoted_tweet=quoted_tweet, + lang=actual_legacy.get("lang", ""), + **_parse_article(actual_data), + ) + + +# ── Timeline response parsing ─────────────────────────────────────────── + + +def parse_timeline_response(data, get_instructions): + # type: (Any, Callable[[Any], Any]) -> Tuple[List[Tweet], Optional[str]] + """Parse timeline GraphQL response into tweets and next cursor.""" + tweets = [] # type: List[Tweet] + next_cursor = None # type: Optional[str] + + instructions = get_instructions(data) + if not isinstance(instructions, list): + logger.warning("No timeline instructions found") + return tweets, next_cursor + + for instruction in instructions: + entries = instruction.get("entries") or instruction.get("moduleItems") or [] + for entry in entries: + content = entry.get("content", {}) + next_cursor = _extract_cursor(content) or next_cursor + + item_content = content.get("itemContent", {}) + result = _deep_get(item_content, "tweet_results", "result") + if result: + tweet = parse_tweet_result(result) + if tweet: + tweets.append(tweet) + + for nested_item in content.get("items", []): + nested_result = _deep_get( + nested_item, + "item", + "itemContent", + "tweet_results", + "result", + ) + if nested_result: + tweet = parse_tweet_result(nested_result) + if tweet: + tweets.append(tweet) + + return tweets, next_cursor