diff --git a/README.md b/README.md index 16e7b0d..380f326 100644 --- a/README.md +++ b/README.md @@ -139,7 +139,7 @@ export TWITTER_PROXY=http://127.0.0.1:7890 export TWITTER_PROXY=socks5://127.0.0.1:1080 ``` -Using a proxy is **strongly recommended** to avoid IP-based rate limiting. +Using a proxy can help reduce IP-based rate limiting risks. ### Configuration @@ -361,7 +361,7 @@ export TWITTER_PROXY=http://127.0.0.1:7890 export TWITTER_PROXY=socks5://127.0.0.1:1080 ``` -**强烈建议使用代理**,避免 IP 维度的风控。 +使用代理可以降低 IP 维度的风控风险。 ### 筛选算法 diff --git a/pyproject.toml b/pyproject.toml index d438f42..8f6832b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "twitter-cli" -version = "0.4.0" +version = "0.4.1" description = "A CLI for Twitter/X — feed, bookmarks, and user timeline in terminal" readme = "README.md" license = "Apache-2.0" diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..a707a37 --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,394 @@ +"""Unit tests for core client.py functions. + +Tests the parsing, header building, media extraction, Chrome target detection, +and feature flag update logic — all without requiring network access. +""" + +from __future__ import annotations + +import copy +from unittest.mock import MagicMock, patch + + +from twitter_cli.client import ( + FEATURES, + _best_chrome_target, + _build_graphql_url, + _deep_get, + _extract_cursor, + _extract_media, + _parse_int, + _update_features_from_html, + TwitterAPIError, + TwitterClient, +) + + +# ── _deep_get ──────────────────────────────────────────────────────────── + +class TestDeepGet: + def test_nested_dict(self): + data = {"a": {"b": {"c": 42}}} + assert _deep_get(data, "a", "b", "c") == 42 + + def test_missing_key(self): + assert _deep_get({"a": 1}, "b") is None + + def test_deeply_missing(self): + assert _deep_get({"a": {"b": 1}}, "a", "c", "d") is None + + def test_list_access(self): + data = {"items": [10, 20, 30]} + assert _deep_get(data, "items", 1) == 20 + + def test_list_out_of_bounds(self): + data = {"items": [10]} + assert _deep_get(data, "items", 5) is None + + def test_none_input(self): + assert _deep_get(None, "a") is None + + def test_empty_keys(self): + data = {"x": 1} + assert _deep_get(data) == data + + +# ── _parse_int ─────────────────────────────────────────────────────────── + +class TestParseInt: + def test_normal_int(self): + assert _parse_int(42, 0) == 42 + + def test_string_int(self): + assert _parse_int("123", 0) == 123 + + def test_float_string(self): + assert _parse_int("99.9", 0) == 99 + + def test_comma_separated(self): + assert _parse_int("1,234", 0) == 1234 + + def test_empty_string(self): + assert _parse_int("", 0) == 0 + + def test_none(self): + assert _parse_int(None, -1) == -1 + + def test_invalid(self): + assert _parse_int("abc", 5) == 5 + + +# ── _extract_cursor ────────────────────────────────────────────────────── + +class TestExtractCursor: + def test_bottom_cursor(self): + content = {"cursorType": "Bottom", "value": "cursor_abc"} + assert _extract_cursor(content) == "cursor_abc" + + def test_top_cursor_ignored(self): + content = {"cursorType": "Top", "value": "cursor_top"} + assert _extract_cursor(content) is None + + def test_no_cursor(self): + assert _extract_cursor({}) is None + + +# ── _extract_media ─────────────────────────────────────────────────────── + +class TestExtractMedia: + def test_photo(self): + legacy = { + "extended_entities": { + "media": [ + { + "type": "photo", + "media_url_https": "https://pbs.twimg.com/img.jpg", + "original_info": {"width": 1200, "height": 800}, + } + ] + } + } + media = _extract_media(legacy) + assert len(media) == 1 + assert media[0].type == "photo" + assert media[0].url == "https://pbs.twimg.com/img.jpg" + assert media[0].width == 1200 + + def test_video_picks_highest_bitrate(self): + legacy = { + "extended_entities": { + "media": [ + { + "type": "video", + "media_url_https": "https://pbs.twimg.com/thumb.jpg", + "original_info": {"width": 1920, "height": 1080}, + "video_info": { + "variants": [ + {"content_type": "video/mp4", "bitrate": 832000, "url": "https://low.mp4"}, + {"content_type": "video/mp4", "bitrate": 2176000, "url": "https://high.mp4"}, + {"content_type": "application/x-mpegURL", "url": "https://stream.m3u8"}, + ] + }, + } + ] + } + } + media = _extract_media(legacy) + assert len(media) == 1 + assert media[0].type == "video" + assert media[0].url == "https://high.mp4" + + def test_no_media(self): + assert _extract_media({}) == [] + + def test_animated_gif(self): + legacy = { + "extended_entities": { + "media": [ + { + "type": "animated_gif", + "media_url_https": "https://pbs.twimg.com/gif.mp4", + "original_info": {"width": 480, "height": 270}, + "video_info": { + "variants": [ + {"content_type": "video/mp4", "bitrate": 0, "url": "https://gif.mp4"}, + ] + }, + } + ] + } + } + media = _extract_media(legacy) + assert len(media) == 1 + assert media[0].type == "animated_gif" + + +# ── _build_graphql_url ─────────────────────────────────────────────────── + +class TestBuildGraphqlUrl: + def test_basic_url(self): + url = _build_graphql_url("abc123", "HomeTimeline", {"count": 20}, {"f1": True}) + assert "graphql/abc123/HomeTimeline" in url + assert "variables=" in url + assert "features=" in url + + def test_field_toggles(self): + url = _build_graphql_url("x", "Op", {}, {}, {"toggle": True}) + assert "fieldToggles=" in url + + +# ── _best_chrome_target ────────────────────────────────────────────────── + +class TestBestChromeTarget: + def test_returns_string(self): + target = _best_chrome_target() + assert isinstance(target, str) + assert "chrome" in target + + def test_fallback_when_no_browser_type(self): + with patch.dict("sys.modules", {"curl_cffi.requests": MagicMock(BrowserType=MagicMock(side_effect=TypeError))}): + # Force re-evaluation by clearing cached result + # When BrowserType iteration fails, should still return a fallback + target = _best_chrome_target() + assert isinstance(target, str) + + +# ── _update_features_from_html ─────────────────────────────────────────── + +class TestUpdateFeaturesFromHtml: + def test_extracts_feature_flags(self): + # Save original state + original = dict(FEATURES) + try: + html = ''' + "responsive_web_test_feature":{"value":true}, + "responsive_web_another_feature":{"value":false}, + "rweb_some_flag":{"value":true} + ''' + _update_features_from_html(html) + assert FEATURES["responsive_web_test_feature"] is True + assert FEATURES["responsive_web_another_feature"] is False + assert FEATURES["rweb_some_flag"] is True + finally: + # Restore original state + FEATURES.clear() + FEATURES.update(original) + + def test_ignores_non_feature_keys(self): + original = dict(FEATURES) + try: + html = '"some_random_key":{"value":true}' + _update_features_from_html(html) + assert "some_random_key" not in FEATURES + finally: + FEATURES.clear() + FEATURES.update(original) + + def test_handles_empty_html(self): + _update_features_from_html("") + + def test_handles_malformed_html(self): + _update_features_from_html("not json at all {{{") + + +# ── TwitterClient._build_headers ───────────────────────────────────────── + +class TestBuildHeaders: + @patch("twitter_cli.client._get_cffi_session") + @patch("twitter_cli.client._gen_ct_headers", return_value={}) + def test_required_headers_present(self, mock_ct_headers, mock_session): + mock_session.return_value = MagicMock() + mock_session.return_value.get = MagicMock(side_effect=Exception("skip init")) + + client = TwitterClient.__new__(TwitterClient) + client._auth_token = "test_token" + client._ct0 = "test_ct0" + client._cookie_string = None + client._request_delay = 2.5 + client._max_retries = 3 + client._retry_base_delay = 5.0 + client._max_count = 200 + client._client_transaction = None + client._ct_init_attempted = True + + headers = client._build_headers("https://x.com/i/api/graphql/test", "GET") + + assert "Authorization" in headers + assert "Bearer" in headers["Authorization"] + assert headers["X-Csrf-Token"] == "test_ct0" + assert headers["X-Twitter-Auth-Type"] == "OAuth2Session" + assert "User-Agent" in headers + assert "sec-ch-ua" in headers + + @patch("twitter_cli.client._get_cffi_session") + @patch("twitter_cli.client._gen_ct_headers", return_value={}) + def test_cookie_string_used_when_available(self, mock_ct_headers, mock_session): + mock_session.return_value = MagicMock() + mock_session.return_value.get = MagicMock(side_effect=Exception("skip")) + + client = TwitterClient.__new__(TwitterClient) + client._auth_token = "token" + client._ct0 = "ct0" + client._cookie_string = "auth_token=x; ct0=y; other=z" + client._request_delay = 2.5 + client._max_retries = 3 + client._retry_base_delay = 5.0 + client._max_count = 200 + client._client_transaction = None + client._ct_init_attempted = True + + headers = client._build_headers() + assert headers["Cookie"] == "auth_token=x; ct0=y; other=z" + + +# ── TwitterClient._parse_tweet_result ───────────────────────────────────── + +class TestParseTweetResult: + SAMPLE_TWEET_RESULT = { + "__typename": "Tweet", + "rest_id": "1234567890", + "core": { + "user_results": { + "result": { + "rest_id": "user123", + "core": {"name": "Test User", "screen_name": "testuser"}, + "legacy": { + "name": "Test User", + "screen_name": "testuser", + "verified": False, + "profile_image_url_https": "https://img.com/avatar.jpg", + }, + "is_blue_verified": True, + } + } + }, + "legacy": { + "full_text": "Hello world! This is a test tweet.", + "created_at": "Sat Mar 08 12:00:00 +0000 2026", + "favorite_count": 100, + "retweet_count": 25, + "reply_count": 5, + "quote_count": 3, + "bookmark_count": 10, + "lang": "en", + "entities": {"urls": []}, + }, + "views": {"count": "5000"}, + } + + @patch("twitter_cli.client._get_cffi_session") + @patch("twitter_cli.client._gen_ct_headers", return_value={}) + def test_parses_basic_tweet(self, mock_ct_headers, mock_session): + mock_session.return_value = MagicMock() + mock_session.return_value.get = MagicMock(side_effect=Exception("skip")) + + client = TwitterClient.__new__(TwitterClient) + client._ct_init_attempted = True + client._client_transaction = None + + tweet = client._parse_tweet_result(copy.deepcopy(self.SAMPLE_TWEET_RESULT)) + assert tweet is not None + assert tweet.id == "1234567890" + assert tweet.text == "Hello world! This is a test tweet." + assert tweet.author.screen_name == "testuser" + assert tweet.author.verified is True # is_blue_verified + assert tweet.metrics.likes == 100 + assert tweet.metrics.views == 5000 + assert tweet.lang == "en" + assert tweet.is_retweet is False + + @patch("twitter_cli.client._get_cffi_session") + @patch("twitter_cli.client._gen_ct_headers", return_value={}) + def test_parses_tombstone_returns_none(self, mock_ct_headers, mock_session): + mock_session.return_value = MagicMock() + mock_session.return_value.get = MagicMock(side_effect=Exception("skip")) + + client = TwitterClient.__new__(TwitterClient) + client._ct_init_attempted = True + client._client_transaction = None + + result = {"__typename": "TweetTombstone"} + assert client._parse_tweet_result(result) is None + + @patch("twitter_cli.client._get_cffi_session") + @patch("twitter_cli.client._gen_ct_headers", return_value={}) + def test_parses_visibility_wrapper(self, mock_ct_headers, mock_session): + mock_session.return_value = MagicMock() + mock_session.return_value.get = MagicMock(side_effect=Exception("skip")) + + client = TwitterClient.__new__(TwitterClient) + client._ct_init_attempted = True + client._client_transaction = None + + wrapped = { + "__typename": "TweetWithVisibilityResults", + "tweet": copy.deepcopy(self.SAMPLE_TWEET_RESULT), + } + tweet = client._parse_tweet_result(wrapped) + assert tweet is not None + assert tweet.id == "1234567890" + + @patch("twitter_cli.client._get_cffi_session") + @patch("twitter_cli.client._gen_ct_headers", return_value={}) + def test_depth_limit(self, mock_ct_headers, mock_session): + mock_session.return_value = MagicMock() + mock_session.return_value.get = MagicMock(side_effect=Exception("skip")) + + client = TwitterClient.__new__(TwitterClient) + client._ct_init_attempted = True + client._client_transaction = None + + assert client._parse_tweet_result(self.SAMPLE_TWEET_RESULT, depth=3) is None + + +# ── TwitterAPIError ────────────────────────────────────────────────────── + +class TestTwitterAPIError: + def test_stores_status_code(self): + err = TwitterAPIError(429, "Rate limited") + assert err.status_code == 429 + assert str(err) == "Rate limited" + + def test_is_runtime_error(self): + err = TwitterAPIError(500, "Server error") + assert isinstance(err, RuntimeError) diff --git a/twitter_cli/client.py b/twitter_cli/client.py index 25dd38b..3cb9848 100644 --- a/twitter_cli/client.py +++ b/twitter_cli/client.py @@ -53,7 +53,7 @@ TWITTER_OPENAPI_URL = ( "main/src/config/placeholder.json" ) -FEATURES = { +_DEFAULT_FEATURES = { "rweb_video_screen_enabled": False, "profile_label_improvements_pcf_label_in_post_enabled": True, "responsive_web_profile_redirect_enabled": False, @@ -93,6 +93,9 @@ FEATURES = { "responsive_web_enhance_cards_enabled": False, } +# Features dict that gets updated dynamically from x.com JS bundles +FEATURES = dict(_DEFAULT_FEATURES) + # Module-level caches (not thread-safe — CLI is single-threaded) _cached_query_ids = {} # type: Dict[str, str] _bundles_scanned = False @@ -211,6 +214,35 @@ def _scan_bundles(): logger.info("Scanned %d JS bundles, cached %d query IDs", len(script_urls), len(_cached_query_ids)) +def _update_features_from_html(html): + # type: (str) -> None + """Extract live feature flags from x.com HTML and update the global FEATURES dict. + + Twitter embeds feature switch config in inline scripts on the homepage. + We parse these to keep FEATURES in sync with the current frontend. + """ + try: + # Look for feature flags in inline script content + # Pattern: "featureSwitch":{"...":{"value":true/false},...} + # Also try: features:{key:!0, key2:!1, ...} in JS bundles + feature_pattern = re.compile( + r'"([a-z][a-z0-9_]+)":\s*\{\s*"value"\s*:\s*(true|false)', + re.IGNORECASE, + ) + found = 0 + for match in feature_pattern.finditer(html): + key = match.group(1) + value = match.group(2).lower() == "true" + # Only update keys that look like feature flags + if any(prefix in key for prefix in ("responsive_web_", "rweb_", "longform_", "creator_", "communities_", "c9s_")): + FEATURES[key] = value + found += 1 + if found: + logger.info("Updated %d feature flags from x.com", found) + except Exception as exc: + logger.debug("Feature extraction from HTML failed: %s", exc) + + def _fetch_from_github(operation_name): # type: (str) -> Optional[str] """Fetch queryId from community-maintained twitter-openapi file.""" @@ -468,6 +500,13 @@ class TwitterClient: # ── Write operations ──────────────────────────────────────────────── + def _write_delay(self): + # type: () -> None + """Sleep a random interval after write operations to avoid rate limits.""" + delay = random.uniform(1.5, 4.0) + logger.debug("Write operation delay: %.1fs", delay) + time.sleep(delay) + def create_tweet(self, text, reply_to_id=None): # type: (str, Optional[str]) -> str """Post a new tweet. Returns the new tweet ID.""" @@ -483,6 +522,7 @@ class TwitterClient: "exclude_reply_user_ids": [], } data = self._graphql_post("CreateTweet", variables, FEATURES) + self._write_delay() result = _deep_get(data, "data", "create_tweet", "tweet_results", "result") if result: return result.get("rest_id", "") @@ -493,42 +533,49 @@ class TwitterClient: """Delete a tweet. Returns True on success.""" variables = {"tweet_id": tweet_id, "dark_request": False} self._graphql_post("DeleteTweet", variables) + self._write_delay() return True def like_tweet(self, tweet_id): # type: (str) -> bool """Like a tweet. Returns True on success.""" self._graphql_post("FavoriteTweet", {"tweet_id": tweet_id}) + self._write_delay() return True def unlike_tweet(self, tweet_id): # type: (str) -> bool """Unlike a tweet. Returns True on success.""" self._graphql_post("UnfavoriteTweet", {"tweet_id": tweet_id, "dark_request": False}) + self._write_delay() return True def retweet(self, tweet_id): # type: (str) -> bool """Retweet a tweet. Returns True on success.""" self._graphql_post("CreateRetweet", {"tweet_id": tweet_id, "dark_request": False}) + self._write_delay() return True def unretweet(self, tweet_id): # type: (str) -> bool """Undo a retweet. Returns True on success.""" self._graphql_post("DeleteRetweet", {"source_tweet_id": tweet_id, "dark_request": False}) + self._write_delay() return True def bookmark_tweet(self, tweet_id): # type: (str) -> bool """Bookmark a tweet. Returns True on success.""" self._graphql_post("CreateBookmark", {"tweet_id": tweet_id}) + self._write_delay() return True def unbookmark_tweet(self, tweet_id): # type: (str) -> bool """Remove a tweet from bookmarks. Returns True on success.""" self._graphql_post("DeleteBookmark", {"tweet_id": tweet_id}) + self._write_delay() return True def _fetch_timeline(self, operation_name, count, get_instructions, extra_variables=None, override_base_variables=False, field_toggles=None): @@ -609,7 +656,10 @@ class TwitterClient: def _ensure_client_transaction(self): # type: () -> None - """Initialize ClientTransaction for x-client-transaction-id header.""" + """Initialize ClientTransaction for x-client-transaction-id header. + + Also attempts to extract live feature flags from JS bundles. + """ if self._ct_init_attempted: return self._ct_init_attempted = True @@ -632,6 +682,9 @@ class TwitterClient: ondemand_file_response=ondemand_file.text, ) logger.info("ClientTransaction initialized for x-client-transaction-id") + + # Try to extract live FEATURES from the homepage JS bundles + _update_features_from_html(home_page.text) except Exception as exc: logger.warning("Failed to init ClientTransaction: %s", exc) diff --git a/uv.lock b/uv.lock index 3cf9922..21eec6a 100644 --- a/uv.lock +++ b/uv.lock @@ -950,7 +950,7 @@ wheels = [ [[package]] name = "twitter-cli" -version = "0.3.4" +version = "0.4.0" source = { editable = "." } dependencies = [ { name = "beautifulsoup4" },