"""Twitter GraphQL API client.""" from __future__ import annotations import json import logging import math import os import random import time import urllib.parse from typing import Any, Callable, Dict, cast import bs4 from curl_cffi import requests as _cffi_requests from x_client_transaction import ClientTransaction from x_client_transaction.utils import generate_headers as _gen_ct_headers, get_ondemand_file_url from .constants import ( BEARER_TOKEN, SEC_CH_UA_ARCH, SEC_CH_UA_BITNESS, SEC_CH_UA_MOBILE, SEC_CH_UA_MODEL, SEC_CH_UA_PLATFORM_VERSION, get_accept_language, get_sec_ch_ua, get_sec_ch_ua_full_version, get_sec_ch_ua_full_version_list, get_sec_ch_ua_platform, get_twitter_client_language, get_user_agent, sync_chrome_version, ) from .exceptions import ( NotFoundError, TwitterAPIError, ) from .graphql import ( FEATURES, _build_graphql_url, _invalidate_query_id, _resolve_query_id, _update_features_from_html, ) from .models import UserProfile from .parser import ( _deep_get, _parse_int, parse_timeline_response, parse_tweet_result, parse_user_result, ) logger = logging.getLogger(__name__) # Shared curl_cffi session (single-threaded CLI) _cffi_session = None TimelineInstructionGetter = Callable[[Any], Any] # Hard ceiling to prevent accidental massive fetches _ABSOLUTE_MAX_COUNT = 500 # ── Session management ─────────────────────────────────────────────────── def _best_chrome_target(): # type: () -> str """Detect the best available Chrome impersonation target at runtime. curl_cffi versions differ in which Chrome targets they ship. e.g. 0.14.0 has chrome133a but not chrome133. """ try: from curl_cffi.requests import BrowserType available = {e.value for e in BrowserType} except Exception: available = set() # Preference order: exact chrome versions, then suffixed variants for target in ("chrome133", "chrome133a", "chrome136", "chrome131", "chrome130"): if target in available: return target # Fallback: pick highest chrome* with a pure numeric suffix chrome_targets = sorted( [v for v in available if v.startswith("chrome") and v.replace("chrome", "").isdigit()], key=lambda x: int(x.replace("chrome", "")), reverse=True, ) return chrome_targets[0] if chrome_targets else "chrome131" def _get_cffi_session(): # type: () -> Any """Return shared curl_cffi session with Chrome impersonation and optional proxy.""" global _cffi_session if _cffi_session is None: proxy = os.environ.get("TWITTER_PROXY", "") target = _best_chrome_target() sync_chrome_version(target) # align UA/sec-ch-ua with impersonate target _cffi_session = _cffi_requests.Session( impersonate=cast(Any, target), proxies={"https": proxy, "http": proxy} if proxy else None, ) logger.info("curl_cffi impersonating %s", target) if proxy: logger.info("Using proxy: %s", proxy[:20] + "...") return _cffi_session def _url_fetch(url, headers=None): # type: (str, Optional[Dict[str, str]]) -> str """URL fetch using curl_cffi for proper TLS fingerprint.""" session = _get_cffi_session() resp = session.get(url, headers=headers or {}, timeout=30) resp.raise_for_status() return resp.text # ── TwitterClient ──────────────────────────────────────────────────────── class TwitterClient: """Twitter GraphQL API client using cookie authentication.""" def __init__(self, auth_token, ct0, rate_limit_config=None, cookie_string=None): # type: (str, str, Optional[Dict[str, Any]], Optional[str]) -> None self._auth_token = auth_token self._ct0 = ct0 self._cookie_string = cookie_string # Full browser cookie string rl = rate_limit_config or {} self._request_delay = float(rl.get("requestDelay", 2.5)) self._max_retries = int(rl.get("maxRetries", 3)) self._retry_base_delay = float(rl.get("retryBaseDelay", 5.0)) self._max_count = min(int(rl.get("maxCount", 200)), _ABSOLUTE_MAX_COUNT) self._client_transaction = None # type: Optional[Any] self._ct_init_attempted = False # Eagerly initialize ClientTransaction on construction self._ensure_client_transaction() # ── Read operations ────────────────────────────────────────────── def fetch_home_timeline(self, count=20): # type: (int) -> List[Tweet] """Fetch home timeline tweets.""" return self._fetch_timeline( "HomeTimeline", count, lambda data: _deep_get(data, "data", "home", "home_timeline_urt", "instructions"), ) def fetch_following_feed(self, count=20): # type: (int) -> List[Tweet] """Fetch chronological following feed.""" return self._fetch_timeline( "HomeLatestTimeline", count, lambda data: _deep_get(data, "data", "home", "home_timeline_urt", "instructions"), ) def fetch_bookmarks(self, count=50): # type: (int) -> List[Tweet] """Fetch bookmarked tweets.""" def get_instructions(data): # type: (Any) -> Any instructions = _deep_get(data, "data", "bookmark_timeline", "timeline", "instructions") if instructions is None: instructions = _deep_get(data, "data", "bookmark_timeline_v2", "timeline", "instructions") return instructions return self._fetch_timeline("Bookmarks", count, get_instructions) def resolve_user_id(self, identifier): # type: (str) -> str """Resolve a user identifier (screen_name or numeric user_id) to numeric user_id. If identifier is all digits, returns it as-is. Otherwise fetches the user profile. """ if identifier.isdigit(): return identifier profile = self.fetch_user(identifier) return profile.id def fetch_user(self, screen_name): # type: (str) -> UserProfile """Fetch user profile by screen name.""" variables = { "screen_name": screen_name, "withSafetyModeUserFields": True, } features = { "hidden_profile_subscriptions_enabled": True, "rweb_tipjar_consumption_enabled": True, "responsive_web_graphql_exclude_directive_enabled": True, "verified_phone_label_enabled": False, "subscriptions_verification_info_is_identity_verified_enabled": True, "subscriptions_verification_info_verified_since_enabled": True, "highlights_tweets_tab_ui_enabled": True, "responsive_web_twitter_article_notes_tab_enabled": True, "subscriptions_feature_can_gift_premium": True, "creator_subscriptions_tweet_preview_api_enabled": True, "responsive_web_graphql_skip_user_profile_image_extensions_enabled": False, "responsive_web_graphql_timeline_navigation_enabled": True, } data = self._graphql_get("UserByScreenName", variables, features) result = _deep_get(data, "data", "user", "result") if not result: raise NotFoundError("User @%s not found" % screen_name) legacy = result.get("legacy", {}) return UserProfile( id=result.get("rest_id", ""), name=legacy.get("name", ""), screen_name=legacy.get("screen_name", screen_name), bio=legacy.get("description", ""), location=legacy.get("location", ""), url=_deep_get(legacy, "entities", "url", "urls", 0, "expanded_url") or "", followers_count=_parse_int(legacy.get("followers_count"), 0), following_count=_parse_int(legacy.get("friends_count"), 0), tweets_count=_parse_int(legacy.get("statuses_count"), 0), likes_count=_parse_int(legacy.get("favourites_count"), 0), verified=bool(result.get("is_blue_verified") or legacy.get("verified", False)), profile_image_url=legacy.get("profile_image_url_https", ""), created_at=legacy.get("created_at", ""), ) def fetch_user_tweets(self, user_id, count=20): # type: (str, int) -> List[Tweet] """Fetch tweets posted by a user.""" return self._fetch_timeline( "UserTweets", count, lambda data: _deep_get(data, "data", "user", "result", "timeline_v2", "timeline", "instructions"), extra_variables={ "userId": user_id, "withQuickPromoteEligibilityTweetFields": True, "withVoice": True, "withV2Timeline": True, }, ) def fetch_user_likes(self, user_id, count=20): # type: (str, int) -> List[Tweet] """Fetch tweets liked by a user.""" return self._fetch_timeline( "Likes", count, lambda data: _deep_get(data, "data", "user", "result", "timeline_v2", "timeline", "instructions"), extra_variables={ "userId": user_id, "includePromotedContent": False, "withClientEventToken": False, "withBirdwatchNotes": False, "withVoice": True, }, override_base_variables=True, ) def fetch_search(self, query, count=20, product="Top"): # type: (str, int, str) -> List[Tweet] """Search tweets by query. Args: query: Search query string. count: Max number of tweets to return. product: Search tab — "Top", "Latest", "People", "Photos", "Videos". """ return self._fetch_timeline( "SearchTimeline", count, lambda data: _deep_get( data, "data", "search_by_raw_query", "search_timeline", "timeline", "instructions", ), extra_variables={ "rawQuery": query, "querySource": "typed_query", "product": product, }, override_base_variables=True, ) def fetch_tweet_detail(self, tweet_id, count=20): # type: (str, int) -> List[Tweet] """Fetch a tweet and its conversation thread (replies).""" return self._fetch_timeline( "TweetDetail", count, lambda data: _deep_get(data, "data", "tweetResult", "result", "timeline", "instructions") or _deep_get(data, "data", "threaded_conversation_with_injections_v2", "instructions"), extra_variables={ "focalTweetId": tweet_id, "referrer": "tweet", "with_rux_injections": False, "includePromotedContent": True, "rankingMode": "Relevance", "withCommunity": True, "withQuickPromoteEligibilityTweetFields": True, "withBirdwatchNotes": True, "withVoice": True, }, override_base_variables=True, field_toggles={ "withArticleRichContentState": True, "withArticlePlainText": False, "withGrokAnalyze": False, "withDisallowedReplyControls": False, }, ) def fetch_list_timeline(self, list_id, count=20): # type: (str, int) -> List[Tweet] """Fetch tweets from a Twitter List.""" return self._fetch_timeline( "ListLatestTweetsTimeline", count, lambda data: _deep_get(data, "data", "list", "tweets_timeline", "timeline", "instructions"), extra_variables={"listId": list_id}, override_base_variables=True, ) def fetch_followers(self, user_id, count=20): # type: (str, int) -> List[UserProfile] """Fetch followers of a user.""" return self._fetch_user_list( "Followers", user_id, count, lambda data: _deep_get(data, "data", "user", "result", "timeline", "timeline", "instructions"), ) def fetch_following(self, user_id, count=20): # type: (str, int) -> List[UserProfile] """Fetch users that a user is following.""" return self._fetch_user_list( "Following", user_id, count, lambda data: _deep_get(data, "data", "user", "result", "timeline", "timeline", "instructions"), ) # ── Write operations ───────────────────────────────────────────── def _write_delay(self): # type: () -> None """Sleep a random interval after write operations to avoid rate limits.""" delay = random.uniform(1.5, 4.0) logger.debug("Write operation delay: %.1fs", delay) time.sleep(delay) def create_tweet(self, text, reply_to_id=None): # type: (str, Optional[str]) -> str """Post a new tweet. Returns the new tweet ID.""" variables = { "tweet_text": text, "media": {"media_entities": [], "possibly_sensitive": False}, "semantic_annotation_ids": [], "dark_request": False, } # type: Dict[str, Any] if reply_to_id: variables["reply"] = { "in_reply_to_tweet_id": reply_to_id, "exclude_reply_user_ids": [], } data = self._graphql_post("CreateTweet", variables, FEATURES) self._write_delay() result = _deep_get(data, "data", "create_tweet", "tweet_results", "result") if result: return result.get("rest_id", "") raise TwitterAPIError(0, "Failed to create tweet") def delete_tweet(self, tweet_id): # type: (str) -> bool """Delete a tweet. Returns True on success.""" variables = {"tweet_id": tweet_id, "dark_request": False} self._graphql_post("DeleteTweet", variables) self._write_delay() return True def like_tweet(self, tweet_id): # type: (str) -> bool """Like a tweet. Returns True on success.""" self._graphql_post("FavoriteTweet", {"tweet_id": tweet_id}) self._write_delay() return True def unlike_tweet(self, tweet_id): # type: (str) -> bool """Unlike a tweet. Returns True on success.""" self._graphql_post("UnfavoriteTweet", {"tweet_id": tweet_id, "dark_request": False}) self._write_delay() return True def retweet(self, tweet_id): # type: (str) -> bool """Retweet a tweet. Returns True on success.""" self._graphql_post("CreateRetweet", {"tweet_id": tweet_id, "dark_request": False}) self._write_delay() return True def unretweet(self, tweet_id): # type: (str) -> bool """Undo a retweet. Returns True on success.""" self._graphql_post("DeleteRetweet", {"source_tweet_id": tweet_id, "dark_request": False}) self._write_delay() return True def bookmark_tweet(self, tweet_id): # type: (str) -> bool """Bookmark a tweet. Returns True on success.""" self._graphql_post("CreateBookmark", {"tweet_id": tweet_id}) self._write_delay() return True def unbookmark_tweet(self, tweet_id): # type: (str) -> bool """Remove a tweet from bookmarks. Returns True on success.""" self._graphql_post("DeleteBookmark", {"tweet_id": tweet_id}) self._write_delay() return True def fetch_me(self): # type: () -> UserProfile """Fetch the currently authenticated user's profile. Twitter's /account/multi/list.json endpoint changed its response format: - Old: list of dicts with nested "user" objects (rich fields) - New: {"users": [...]} with minimal fields (user_id, name, screen_name) When the response only has minimal fields, we use the screen_name to fetch the full profile via the GraphQL UserByScreenName endpoint. """ url = "https://x.com/i/api/1.1/account/multi/list.json" data = self._api_get(url) screen_name = None # New format: {"users": [{"user_id": ..., "screen_name": ..., ...}]} if isinstance(data, dict) and "users" in data: users = data["users"] if isinstance(users, list) and users: user_data = users[0] screen_name = user_data.get("screen_name") # Old format: [{"user": {"id_str": ..., ...}}] elif isinstance(data, list) and data: user_data = data[0].get("user", {}) if user_data: # Old format had rich fields — try to build profile directly sn = user_data.get("screen_name", "") if user_data.get("followers_count") is not None: return UserProfile( id=str(user_data.get("id_str", "")), name=user_data.get("name", ""), screen_name=sn, bio=user_data.get("description", ""), location=user_data.get("location", ""), url=_deep_get(user_data, "entities", "url", "urls", 0, "expanded_url") or "", followers_count=_parse_int(user_data.get("followers_count"), 0), following_count=_parse_int(user_data.get("friends_count"), 0), tweets_count=_parse_int(user_data.get("statuses_count"), 0), likes_count=_parse_int(user_data.get("favourites_count"), 0), verified=bool(user_data.get("verified", False)), profile_image_url=user_data.get("profile_image_url_https", ""), created_at=user_data.get("created_at", ""), ) screen_name = sn # Use screen_name to fetch full profile via GraphQL if screen_name: logger.info("Fetching full profile for @%s via GraphQL", screen_name) return self.fetch_user(screen_name) raise TwitterAPIError(0, "Failed to fetch current user info") def quote_tweet(self, tweet_id, text): # type: (str, str) -> str """Quote-tweet a tweet. Returns the new tweet ID.""" variables = { "tweet_text": text, "attachment_url": "https://x.com/i/status/%s" % tweet_id, "media": {"media_entities": [], "possibly_sensitive": False}, "semantic_annotation_ids": [], "dark_request": False, } data = self._graphql_post("CreateTweet", variables, FEATURES) self._write_delay() result = _deep_get(data, "data", "create_tweet", "tweet_results", "result") if result: return result.get("rest_id", "") raise TwitterAPIError(0, "Failed to create quote tweet") def follow_user(self, user_id): # type: (str) -> bool """Follow a user by user ID. Returns True on success.""" url = "https://x.com/i/api/1.1/friendships/create.json" body = {"user_id": user_id, "include_profile_interstitial_type": "1"} headers = self._build_headers(url=url, method="POST") headers["Content-Type"] = "application/x-www-form-urlencoded" session = _get_cffi_session() response = session.post(url, headers=headers, data=body, timeout=30) if response.status_code >= 400: raise TwitterAPIError(response.status_code, "Failed to follow user") self._write_delay() return True def unfollow_user(self, user_id): # type: (str) -> bool """Unfollow a user by user ID. Returns True on success.""" url = "https://x.com/i/api/1.1/friendships/destroy.json" body = {"user_id": user_id, "include_profile_interstitial_type": "1"} headers = self._build_headers(url=url, method="POST") headers["Content-Type"] = "application/x-www-form-urlencoded" session = _get_cffi_session() response = session.post(url, headers=headers, data=body, timeout=30) if response.status_code >= 400: raise TwitterAPIError(response.status_code, "Failed to unfollow user") self._write_delay() return True # ── Internal: timeline / user list fetchers ────────────────────── def _fetch_timeline(self, operation_name, count, get_instructions, extra_variables=None, override_base_variables=False, field_toggles=None): # type: (str, int, Callable[[Any], Any], Optional[Dict[str, Any]], bool, Optional[Dict[str, Any]]) -> List[Tweet] """Generic timeline fetcher with pagination and deduplication. Args: override_base_variables: If True, use only extra_variables + count/cursor instead of the default timeline base variables. Needed for endpoints like SearchTimeline that reject unknown variables. """ if count <= 0: return [] # Enforce max count cap count = min(count, self._max_count) tweets = [] # type: List[Tweet] seen_ids = set() # type: Set[str] cursor = None # type: Optional[str] attempts = 0 max_attempts = int(math.ceil(count / 20.0)) + 2 while len(tweets) < count and attempts < max_attempts: attempts += 1 variables: Dict[str, Any] if override_base_variables: variables = {"count": min(count - len(tweets) + 5, 40)} else: variables = { "count": min(count - len(tweets) + 5, 40), "includePromotedContent": False, "latestControlAvailable": True, "requestContext": "launch", } if extra_variables: variables.update(extra_variables) if cursor: variables["cursor"] = cursor data = self._graphql_get(operation_name, variables, FEATURES, field_toggles=field_toggles) new_tweets, next_cursor = self._parse_timeline_response(data, get_instructions) for tweet in new_tweets: if tweet.id and tweet.id not in seen_ids: seen_ids.add(tweet.id) tweets.append(tweet) if not next_cursor: break if next_cursor == cursor: logger.debug("Timeline pagination stopped because cursor did not advance: %s", next_cursor) break cursor = next_cursor if not new_tweets: logger.debug("Timeline page returned no tweets but exposed next cursor; continuing pagination") # Rate-limit: sleep between paginated requests with jitter if len(tweets) < count and self._request_delay > 0: jitter = self._request_delay * random.uniform(0.7, 1.5) logger.debug("Sleeping %.1fs between requests", jitter) time.sleep(jitter) return tweets[:count] def _fetch_user_list(self, operation_name, user_id, count, get_instructions): # type: (str, str, int, Callable[[Any], Any]) -> List[UserProfile] """Generic user list fetcher (for followers/following) with pagination.""" if count <= 0: return [] count = min(count, self._max_count) users = [] # type: List[UserProfile] seen_ids = set() # type: Set[str] cursor = None # type: Optional[str] attempts = 0 max_attempts = int(math.ceil(count / 20.0)) + 2 while len(users) < count and attempts < max_attempts: attempts += 1 variables = { "userId": user_id, "count": min(count - len(users) + 5, 40), "includePromotedContent": False, } # type: Dict[str, Any] if cursor: variables["cursor"] = cursor data = self._graphql_get(operation_name, variables, FEATURES) instructions = get_instructions(data) if not instructions: logger.warning("No user list instructions found") break new_users = [] # type: List[UserProfile] next_cursor = None # type: Optional[str] for instruction in instructions: entries = instruction.get("entries", []) for entry in entries: content = entry.get("content", {}) entry_type = content.get("entryType", "") if entry_type == "TimelineTimelineItem": item = content.get("itemContent", {}) user_results = _deep_get(item, "user_results", "result") if user_results: user = self._parse_user_result(user_results) if user: new_users.append(user) elif entry_type == "TimelineTimelineCursor": if content.get("cursorType") == "Bottom": next_cursor = content.get("value") for user in new_users: if user.id and user.id not in seen_ids: seen_ids.add(user.id) users.append(user) if not next_cursor or not new_users: break cursor = next_cursor if len(users) < count and self._request_delay > 0: time.sleep(self._request_delay * random.uniform(0.7, 1.5)) return users[:count] # ── Internal: GraphQL request methods ──────────────────────────── def _graphql_get(self, operation_name, variables, features, field_toggles=None): # type: (str, Dict[str, Any], Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any] """Issue GraphQL GET request with automatic stale-fallback retry.""" query_id = _resolve_query_id(operation_name, prefer_fallback=True, url_fetch_fn=_url_fetch) using_fallback = query_id == FALLBACK_QUERY_IDS.get(operation_name) url = _build_graphql_url(query_id, operation_name, variables, features, field_toggles) try: return self._api_get(url) except TwitterAPIError as exc: # Fallback query IDs can go stale. Retry with live lookup if 404. if exc.status_code == 404 and using_fallback: logger.info("Retrying %s with live queryId after 404", operation_name) _invalidate_query_id(operation_name) refreshed_query_id = _resolve_query_id(operation_name, prefer_fallback=False, url_fetch_fn=_url_fetch) retry_url = _build_graphql_url(refreshed_query_id, operation_name, variables, features, field_toggles) return self._api_get(retry_url) raise def _graphql_post(self, operation_name, variables, features=None): # type: (str, Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any] """Issue GraphQL POST request with automatic stale-fallback retry.""" query_id = _resolve_query_id(operation_name, prefer_fallback=True, url_fetch_fn=_url_fetch) using_fallback = query_id == FALLBACK_QUERY_IDS.get(operation_name) def _do_post(qid): # type: (str) -> Dict[str, Any] url = "https://x.com/i/api/graphql/%s/%s" % (qid, operation_name) body = {"variables": variables, "queryId": qid} # type: Dict[str, Any] if features: body["features"] = features return self._api_request(url, method="POST", body=body) try: return _do_post(query_id) except TwitterAPIError as exc: if exc.status_code == 404 and using_fallback: logger.info("Retrying POST %s with live queryId after 404", operation_name) _invalidate_query_id(operation_name) refreshed = _resolve_query_id(operation_name, prefer_fallback=False, url_fetch_fn=_url_fetch) return _do_post(refreshed) raise # ── Internal: HTTP request engine ──────────────────────────────── def _api_get(self, url): # type: (str) -> Dict[str, Any] """Make authenticated GET request to Twitter API.""" return self._api_request(url, method="GET") def _api_request(self, url, method="GET", body=None): # type: (str, str, Optional[Dict[str, Any]]) -> Dict[str, Any] """Make authenticated request to Twitter API with retry on rate limits. Uses curl_cffi for Chrome TLS/JA3/HTTP2 fingerprint impersonation. Handles both GET and POST. Retries on HTTP 429 and JSON error code 88. """ headers = self._build_headers(url=url, method=method) session = _get_cffi_session() json_body = body # curl_cffi handles JSON serialization for attempt in range(self._max_retries + 1): try: if method == "POST": response = session.post( url, headers=headers, json=json_body, timeout=30, ) else: response = session.get(url, headers=headers, timeout=30) status_code = response.status_code if status_code == 429 and attempt < self._max_retries: wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2) logger.warning( "Rate limited (429), retrying in %.1fs (attempt %d/%d)", wait, attempt + 1, self._max_retries, ) time.sleep(wait) continue if status_code >= 400: message = "Twitter API error %d: %s" % (status_code, response.text[:500]) raise TwitterAPIError(status_code, message) payload = response.text except TwitterAPIError: raise except Exception as exc: raise TwitterAPIError(0, "Twitter API network error: %s" % exc) try: parsed = json.loads(payload) except (json.JSONDecodeError, ValueError): raise TwitterAPIError(0, "Twitter API returned invalid JSON") if isinstance(parsed, dict) and parsed.get("errors"): err_msg = parsed["errors"][0].get("message", "Unknown error") # Rate limit can also surface as a JSON error (code 88) err_code = parsed["errors"][0].get("code", 0) if err_code == 88 and attempt < self._max_retries: wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2) logger.warning( "Rate limited (code 88), retrying in %.1fs (attempt %d/%d)", wait, attempt + 1, self._max_retries, ) time.sleep(wait) continue # Write operation rate limits (retweet/like/bookmark limits) # Code 348 = "retweet limit", 327 = "already retweeted" # Provide user-friendly message if err_code in (348, 349): raise TwitterAPIError( 429, "Rate limited: %s (try again later, recommended wait: 15+ minutes)" % err_msg ) raise TwitterAPIError(0, "Twitter API returned errors: %s" % err_msg) # GraphQL write mutations return errors in data.errors (separate from top-level) if isinstance(parsed, dict) and "data" in parsed: data_obj = parsed["data"] if isinstance(data_obj, dict): for key, val in data_obj.items(): if isinstance(val, dict) and val.get("errors"): inner_errors = val["errors"] if inner_errors: inner_msg = inner_errors[0].get("message", "Unknown error") raise TwitterAPIError(0, "Twitter API: %s" % inner_msg) return parsed # Should not be reached, but just in case raise TwitterAPIError(429, "Rate limited after %d retries" % self._max_retries) # ── Internal: Anti-detection / headers ─────────────────────────── @staticmethod def _ct_cache_path(): # type: () -> str """Return path for transaction cache file.""" home = os.path.expanduser("~") return os.path.join(home, ".twitter-cli", "transaction_cache.json") def _load_ct_cache(self): # type: () -> bool """Try to load ClientTransaction from cache. Returns True on success.""" try: cache_path = self._ct_cache_path() if not os.path.exists(cache_path): return False with open(cache_path, "r", encoding="utf-8") as f: cache = json.load(f) # Check TTL (1 hour) if time.time() - cache.get("created_at", 0) > 3600: return False home_html = cache.get("home_html", "") ondemand_text = cache.get("ondemand_text", "") if not home_html or not ondemand_text: return False home_page_response = bs4.BeautifulSoup(home_html, "html.parser") self._client_transaction = ClientTransaction( home_page_response=home_page_response, ondemand_file_response=ondemand_text, ) _update_features_from_html(home_html) logger.info("ClientTransaction loaded from cache") return True except Exception as exc: logger.debug("Failed to load CT cache: %s", exc) return False def _save_ct_cache(self, home_html, ondemand_text): # type: (str, str) -> None """Save transaction data to cache file.""" try: cache_path = self._ct_cache_path() cache_dir = os.path.dirname(cache_path) os.makedirs(cache_dir, exist_ok=True) cache = { "home_html": home_html, "ondemand_text": ondemand_text, "created_at": time.time(), } with open(cache_path, "w", encoding="utf-8") as f: json.dump(cache, f) logger.debug("Saved CT cache to %s", cache_path) except Exception as exc: logger.debug("Failed to save CT cache: %s", exc) def _ensure_client_transaction(self): # type: () -> None """Initialize ClientTransaction for x-client-transaction-id header. Tries cache first (1h TTL), then fetches fresh data from x.com. Also attempts to extract live feature flags from JS bundles. """ if self._ct_init_attempted: return self._ct_init_attempted = True # Try loading from cache first if self._load_ct_cache(): return try: # Use curl_cffi for ClientTransaction init to maintain consistent # Chrome TLS fingerprint. Using Python requests here would leak # a different TLS fingerprint on the same IP — a detection vector. cffi_session = _get_cffi_session() ct_headers = _gen_ct_headers() home_page = cffi_session.get( "https://x.com", headers=ct_headers, timeout=10, ) home_page_response = bs4.BeautifulSoup(home_page.content, "html.parser") ondemand_url = get_ondemand_file_url(response=home_page_response) ondemand_file = cffi_session.get( ondemand_url, headers=ct_headers, timeout=10, ) self._client_transaction = ClientTransaction( home_page_response=home_page_response, ondemand_file_response=ondemand_file.text, ) logger.info("ClientTransaction initialized for x-client-transaction-id") # Try to extract live FEATURES from the homepage JS bundles _update_features_from_html(home_page.text) # Save to cache for future use self._save_ct_cache(home_page.text, ondemand_file.text) except Exception as exc: logger.warning("Failed to init ClientTransaction: %s", exc) def _build_headers(self, url="", method="GET"): # type: (str, str) -> Dict[str, str] """Build shared headers for authenticated API calls.""" headers = { "Authorization": "Bearer %s" % BEARER_TOKEN, "Cookie": self._cookie_string or "auth_token=%s; ct0=%s" % (self._auth_token, self._ct0), "X-Csrf-Token": self._ct0, "X-Twitter-Active-User": "yes", "X-Twitter-Auth-Type": "OAuth2Session", "X-Twitter-Client-Language": get_twitter_client_language(), "User-Agent": get_user_agent(), "Origin": "https://x.com", "Referer": "https://x.com/", "Accept": "*/*", "Accept-Language": get_accept_language(), "sec-ch-ua": get_sec_ch_ua(), "sec-ch-ua-mobile": SEC_CH_UA_MOBILE, "sec-ch-ua-platform": get_sec_ch_ua_platform(), "sec-ch-ua-arch": SEC_CH_UA_ARCH, "sec-ch-ua-bitness": SEC_CH_UA_BITNESS, "sec-ch-ua-full-version": get_sec_ch_ua_full_version(), "sec-ch-ua-full-version-list": get_sec_ch_ua_full_version_list(), "sec-ch-ua-model": SEC_CH_UA_MODEL, "sec-ch-ua-platform-version": SEC_CH_UA_PLATFORM_VERSION, "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", } if method == "POST": headers["Content-Type"] = "application/json" headers["Referer"] = "https://x.com/compose/post" headers["Priority"] = "u=1, i" # Generate x-client-transaction-id if available if self._client_transaction and url: try: path = urllib.parse.urlparse(url).path tid = self._client_transaction.generate_transaction_id( method=method, path=path, ) headers["X-Client-Transaction-Id"] = tid except Exception as exc: logger.debug("Failed to generate transaction id: %s", exc) return headers # ── Backward-compatible delegation to parser module ────────────── @staticmethod def _parse_user_result(user_data): # type: (Dict[str, Any]) -> Optional[UserProfile] """Parse a user result object into UserProfile.""" return parse_user_result(user_data) def _parse_tweet_result(self, result, depth=0): # type: (Dict[str, Any], int) -> Optional[Tweet] """Parse a single TweetResult into a Tweet dataclass.""" return parse_tweet_result(result, depth) def _parse_timeline_response(self, data, get_instructions): # type: (Any, Callable[[Any], Any]) -> Tuple[List[Tweet], Optional[str]] """Parse timeline GraphQL response into tweets and next cursor.""" return parse_timeline_response(data, get_instructions) # ── Backward compatibility re-exports ──────────────────────────────────── # These keep existing test imports working without modification. from .graphql import FALLBACK_QUERY_IDS # noqa: E402, F401 from .parser import _extract_cursor, _extract_media # noqa: E402, F401