feat: write operation delays, dynamic FEATURES update, 30+ client.py tests, fix README proxy wording

This commit is contained in:
jackwener
2026-03-09 20:45:51 +08:00
parent b9c226b804
commit 12f425abea
5 changed files with 453 additions and 6 deletions

View File

@@ -139,7 +139,7 @@ export TWITTER_PROXY=http://127.0.0.1:7890
export TWITTER_PROXY=socks5://127.0.0.1:1080 export TWITTER_PROXY=socks5://127.0.0.1:1080
``` ```
Using a proxy is **strongly recommended** to avoid IP-based rate limiting. Using a proxy can help reduce IP-based rate limiting risks.
### Configuration ### Configuration
@@ -361,7 +361,7 @@ export TWITTER_PROXY=http://127.0.0.1:7890
export TWITTER_PROXY=socks5://127.0.0.1:1080 export TWITTER_PROXY=socks5://127.0.0.1:1080
``` ```
**强烈建议使用代理**,避免 IP 维度的风控。 使用代理可以降低 IP 维度的风控风险
### 筛选算法 ### 筛选算法

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "twitter-cli" name = "twitter-cli"
version = "0.4.0" version = "0.4.1"
description = "A CLI for Twitter/X — feed, bookmarks, and user timeline in terminal" description = "A CLI for Twitter/X — feed, bookmarks, and user timeline in terminal"
readme = "README.md" readme = "README.md"
license = "Apache-2.0" license = "Apache-2.0"

394
tests/test_client.py Normal file
View File

@@ -0,0 +1,394 @@
"""Unit tests for core client.py functions.
Tests the parsing, header building, media extraction, Chrome target detection,
and feature flag update logic — all without requiring network access.
"""
from __future__ import annotations
import copy
from unittest.mock import MagicMock, patch
from twitter_cli.client import (
FEATURES,
_best_chrome_target,
_build_graphql_url,
_deep_get,
_extract_cursor,
_extract_media,
_parse_int,
_update_features_from_html,
TwitterAPIError,
TwitterClient,
)
# ── _deep_get ────────────────────────────────────────────────────────────
class TestDeepGet:
def test_nested_dict(self):
data = {"a": {"b": {"c": 42}}}
assert _deep_get(data, "a", "b", "c") == 42
def test_missing_key(self):
assert _deep_get({"a": 1}, "b") is None
def test_deeply_missing(self):
assert _deep_get({"a": {"b": 1}}, "a", "c", "d") is None
def test_list_access(self):
data = {"items": [10, 20, 30]}
assert _deep_get(data, "items", 1) == 20
def test_list_out_of_bounds(self):
data = {"items": [10]}
assert _deep_get(data, "items", 5) is None
def test_none_input(self):
assert _deep_get(None, "a") is None
def test_empty_keys(self):
data = {"x": 1}
assert _deep_get(data) == data
# ── _parse_int ───────────────────────────────────────────────────────────
class TestParseInt:
def test_normal_int(self):
assert _parse_int(42, 0) == 42
def test_string_int(self):
assert _parse_int("123", 0) == 123
def test_float_string(self):
assert _parse_int("99.9", 0) == 99
def test_comma_separated(self):
assert _parse_int("1,234", 0) == 1234
def test_empty_string(self):
assert _parse_int("", 0) == 0
def test_none(self):
assert _parse_int(None, -1) == -1
def test_invalid(self):
assert _parse_int("abc", 5) == 5
# ── _extract_cursor ──────────────────────────────────────────────────────
class TestExtractCursor:
def test_bottom_cursor(self):
content = {"cursorType": "Bottom", "value": "cursor_abc"}
assert _extract_cursor(content) == "cursor_abc"
def test_top_cursor_ignored(self):
content = {"cursorType": "Top", "value": "cursor_top"}
assert _extract_cursor(content) is None
def test_no_cursor(self):
assert _extract_cursor({}) is None
# ── _extract_media ───────────────────────────────────────────────────────
class TestExtractMedia:
def test_photo(self):
legacy = {
"extended_entities": {
"media": [
{
"type": "photo",
"media_url_https": "https://pbs.twimg.com/img.jpg",
"original_info": {"width": 1200, "height": 800},
}
]
}
}
media = _extract_media(legacy)
assert len(media) == 1
assert media[0].type == "photo"
assert media[0].url == "https://pbs.twimg.com/img.jpg"
assert media[0].width == 1200
def test_video_picks_highest_bitrate(self):
legacy = {
"extended_entities": {
"media": [
{
"type": "video",
"media_url_https": "https://pbs.twimg.com/thumb.jpg",
"original_info": {"width": 1920, "height": 1080},
"video_info": {
"variants": [
{"content_type": "video/mp4", "bitrate": 832000, "url": "https://low.mp4"},
{"content_type": "video/mp4", "bitrate": 2176000, "url": "https://high.mp4"},
{"content_type": "application/x-mpegURL", "url": "https://stream.m3u8"},
]
},
}
]
}
}
media = _extract_media(legacy)
assert len(media) == 1
assert media[0].type == "video"
assert media[0].url == "https://high.mp4"
def test_no_media(self):
assert _extract_media({}) == []
def test_animated_gif(self):
legacy = {
"extended_entities": {
"media": [
{
"type": "animated_gif",
"media_url_https": "https://pbs.twimg.com/gif.mp4",
"original_info": {"width": 480, "height": 270},
"video_info": {
"variants": [
{"content_type": "video/mp4", "bitrate": 0, "url": "https://gif.mp4"},
]
},
}
]
}
}
media = _extract_media(legacy)
assert len(media) == 1
assert media[0].type == "animated_gif"
# ── _build_graphql_url ───────────────────────────────────────────────────
class TestBuildGraphqlUrl:
def test_basic_url(self):
url = _build_graphql_url("abc123", "HomeTimeline", {"count": 20}, {"f1": True})
assert "graphql/abc123/HomeTimeline" in url
assert "variables=" in url
assert "features=" in url
def test_field_toggles(self):
url = _build_graphql_url("x", "Op", {}, {}, {"toggle": True})
assert "fieldToggles=" in url
# ── _best_chrome_target ──────────────────────────────────────────────────
class TestBestChromeTarget:
def test_returns_string(self):
target = _best_chrome_target()
assert isinstance(target, str)
assert "chrome" in target
def test_fallback_when_no_browser_type(self):
with patch.dict("sys.modules", {"curl_cffi.requests": MagicMock(BrowserType=MagicMock(side_effect=TypeError))}):
# Force re-evaluation by clearing cached result
# When BrowserType iteration fails, should still return a fallback
target = _best_chrome_target()
assert isinstance(target, str)
# ── _update_features_from_html ───────────────────────────────────────────
class TestUpdateFeaturesFromHtml:
def test_extracts_feature_flags(self):
# Save original state
original = dict(FEATURES)
try:
html = '''
"responsive_web_test_feature":{"value":true},
"responsive_web_another_feature":{"value":false},
"rweb_some_flag":{"value":true}
'''
_update_features_from_html(html)
assert FEATURES["responsive_web_test_feature"] is True
assert FEATURES["responsive_web_another_feature"] is False
assert FEATURES["rweb_some_flag"] is True
finally:
# Restore original state
FEATURES.clear()
FEATURES.update(original)
def test_ignores_non_feature_keys(self):
original = dict(FEATURES)
try:
html = '"some_random_key":{"value":true}'
_update_features_from_html(html)
assert "some_random_key" not in FEATURES
finally:
FEATURES.clear()
FEATURES.update(original)
def test_handles_empty_html(self):
_update_features_from_html("")
def test_handles_malformed_html(self):
_update_features_from_html("not json at all {{{")
# ── TwitterClient._build_headers ─────────────────────────────────────────
class TestBuildHeaders:
@patch("twitter_cli.client._get_cffi_session")
@patch("twitter_cli.client._gen_ct_headers", return_value={})
def test_required_headers_present(self, mock_ct_headers, mock_session):
mock_session.return_value = MagicMock()
mock_session.return_value.get = MagicMock(side_effect=Exception("skip init"))
client = TwitterClient.__new__(TwitterClient)
client._auth_token = "test_token"
client._ct0 = "test_ct0"
client._cookie_string = None
client._request_delay = 2.5
client._max_retries = 3
client._retry_base_delay = 5.0
client._max_count = 200
client._client_transaction = None
client._ct_init_attempted = True
headers = client._build_headers("https://x.com/i/api/graphql/test", "GET")
assert "Authorization" in headers
assert "Bearer" in headers["Authorization"]
assert headers["X-Csrf-Token"] == "test_ct0"
assert headers["X-Twitter-Auth-Type"] == "OAuth2Session"
assert "User-Agent" in headers
assert "sec-ch-ua" in headers
@patch("twitter_cli.client._get_cffi_session")
@patch("twitter_cli.client._gen_ct_headers", return_value={})
def test_cookie_string_used_when_available(self, mock_ct_headers, mock_session):
mock_session.return_value = MagicMock()
mock_session.return_value.get = MagicMock(side_effect=Exception("skip"))
client = TwitterClient.__new__(TwitterClient)
client._auth_token = "token"
client._ct0 = "ct0"
client._cookie_string = "auth_token=x; ct0=y; other=z"
client._request_delay = 2.5
client._max_retries = 3
client._retry_base_delay = 5.0
client._max_count = 200
client._client_transaction = None
client._ct_init_attempted = True
headers = client._build_headers()
assert headers["Cookie"] == "auth_token=x; ct0=y; other=z"
# ── TwitterClient._parse_tweet_result ─────────────────────────────────────
class TestParseTweetResult:
SAMPLE_TWEET_RESULT = {
"__typename": "Tweet",
"rest_id": "1234567890",
"core": {
"user_results": {
"result": {
"rest_id": "user123",
"core": {"name": "Test User", "screen_name": "testuser"},
"legacy": {
"name": "Test User",
"screen_name": "testuser",
"verified": False,
"profile_image_url_https": "https://img.com/avatar.jpg",
},
"is_blue_verified": True,
}
}
},
"legacy": {
"full_text": "Hello world! This is a test tweet.",
"created_at": "Sat Mar 08 12:00:00 +0000 2026",
"favorite_count": 100,
"retweet_count": 25,
"reply_count": 5,
"quote_count": 3,
"bookmark_count": 10,
"lang": "en",
"entities": {"urls": []},
},
"views": {"count": "5000"},
}
@patch("twitter_cli.client._get_cffi_session")
@patch("twitter_cli.client._gen_ct_headers", return_value={})
def test_parses_basic_tweet(self, mock_ct_headers, mock_session):
mock_session.return_value = MagicMock()
mock_session.return_value.get = MagicMock(side_effect=Exception("skip"))
client = TwitterClient.__new__(TwitterClient)
client._ct_init_attempted = True
client._client_transaction = None
tweet = client._parse_tweet_result(copy.deepcopy(self.SAMPLE_TWEET_RESULT))
assert tweet is not None
assert tweet.id == "1234567890"
assert tweet.text == "Hello world! This is a test tweet."
assert tweet.author.screen_name == "testuser"
assert tweet.author.verified is True # is_blue_verified
assert tweet.metrics.likes == 100
assert tweet.metrics.views == 5000
assert tweet.lang == "en"
assert tweet.is_retweet is False
@patch("twitter_cli.client._get_cffi_session")
@patch("twitter_cli.client._gen_ct_headers", return_value={})
def test_parses_tombstone_returns_none(self, mock_ct_headers, mock_session):
mock_session.return_value = MagicMock()
mock_session.return_value.get = MagicMock(side_effect=Exception("skip"))
client = TwitterClient.__new__(TwitterClient)
client._ct_init_attempted = True
client._client_transaction = None
result = {"__typename": "TweetTombstone"}
assert client._parse_tweet_result(result) is None
@patch("twitter_cli.client._get_cffi_session")
@patch("twitter_cli.client._gen_ct_headers", return_value={})
def test_parses_visibility_wrapper(self, mock_ct_headers, mock_session):
mock_session.return_value = MagicMock()
mock_session.return_value.get = MagicMock(side_effect=Exception("skip"))
client = TwitterClient.__new__(TwitterClient)
client._ct_init_attempted = True
client._client_transaction = None
wrapped = {
"__typename": "TweetWithVisibilityResults",
"tweet": copy.deepcopy(self.SAMPLE_TWEET_RESULT),
}
tweet = client._parse_tweet_result(wrapped)
assert tweet is not None
assert tweet.id == "1234567890"
@patch("twitter_cli.client._get_cffi_session")
@patch("twitter_cli.client._gen_ct_headers", return_value={})
def test_depth_limit(self, mock_ct_headers, mock_session):
mock_session.return_value = MagicMock()
mock_session.return_value.get = MagicMock(side_effect=Exception("skip"))
client = TwitterClient.__new__(TwitterClient)
client._ct_init_attempted = True
client._client_transaction = None
assert client._parse_tweet_result(self.SAMPLE_TWEET_RESULT, depth=3) is None
# ── TwitterAPIError ──────────────────────────────────────────────────────
class TestTwitterAPIError:
def test_stores_status_code(self):
err = TwitterAPIError(429, "Rate limited")
assert err.status_code == 429
assert str(err) == "Rate limited"
def test_is_runtime_error(self):
err = TwitterAPIError(500, "Server error")
assert isinstance(err, RuntimeError)

View File

@@ -53,7 +53,7 @@ TWITTER_OPENAPI_URL = (
"main/src/config/placeholder.json" "main/src/config/placeholder.json"
) )
FEATURES = { _DEFAULT_FEATURES = {
"rweb_video_screen_enabled": False, "rweb_video_screen_enabled": False,
"profile_label_improvements_pcf_label_in_post_enabled": True, "profile_label_improvements_pcf_label_in_post_enabled": True,
"responsive_web_profile_redirect_enabled": False, "responsive_web_profile_redirect_enabled": False,
@@ -93,6 +93,9 @@ FEATURES = {
"responsive_web_enhance_cards_enabled": False, "responsive_web_enhance_cards_enabled": False,
} }
# Features dict that gets updated dynamically from x.com JS bundles
FEATURES = dict(_DEFAULT_FEATURES)
# Module-level caches (not thread-safe — CLI is single-threaded) # Module-level caches (not thread-safe — CLI is single-threaded)
_cached_query_ids = {} # type: Dict[str, str] _cached_query_ids = {} # type: Dict[str, str]
_bundles_scanned = False _bundles_scanned = False
@@ -211,6 +214,35 @@ def _scan_bundles():
logger.info("Scanned %d JS bundles, cached %d query IDs", len(script_urls), len(_cached_query_ids)) logger.info("Scanned %d JS bundles, cached %d query IDs", len(script_urls), len(_cached_query_ids))
def _update_features_from_html(html):
# type: (str) -> None
"""Extract live feature flags from x.com HTML and update the global FEATURES dict.
Twitter embeds feature switch config in inline scripts on the homepage.
We parse these to keep FEATURES in sync with the current frontend.
"""
try:
# Look for feature flags in inline script content
# Pattern: "featureSwitch":{"...":{"value":true/false},...}
# Also try: features:{key:!0, key2:!1, ...} in JS bundles
feature_pattern = re.compile(
r'"([a-z][a-z0-9_]+)":\s*\{\s*"value"\s*:\s*(true|false)',
re.IGNORECASE,
)
found = 0
for match in feature_pattern.finditer(html):
key = match.group(1)
value = match.group(2).lower() == "true"
# Only update keys that look like feature flags
if any(prefix in key for prefix in ("responsive_web_", "rweb_", "longform_", "creator_", "communities_", "c9s_")):
FEATURES[key] = value
found += 1
if found:
logger.info("Updated %d feature flags from x.com", found)
except Exception as exc:
logger.debug("Feature extraction from HTML failed: %s", exc)
def _fetch_from_github(operation_name): def _fetch_from_github(operation_name):
# type: (str) -> Optional[str] # type: (str) -> Optional[str]
"""Fetch queryId from community-maintained twitter-openapi file.""" """Fetch queryId from community-maintained twitter-openapi file."""
@@ -468,6 +500,13 @@ class TwitterClient:
# ── Write operations ──────────────────────────────────────────────── # ── Write operations ────────────────────────────────────────────────
def _write_delay(self):
# type: () -> None
"""Sleep a random interval after write operations to avoid rate limits."""
delay = random.uniform(1.5, 4.0)
logger.debug("Write operation delay: %.1fs", delay)
time.sleep(delay)
def create_tweet(self, text, reply_to_id=None): def create_tweet(self, text, reply_to_id=None):
# type: (str, Optional[str]) -> str # type: (str, Optional[str]) -> str
"""Post a new tweet. Returns the new tweet ID.""" """Post a new tweet. Returns the new tweet ID."""
@@ -483,6 +522,7 @@ class TwitterClient:
"exclude_reply_user_ids": [], "exclude_reply_user_ids": [],
} }
data = self._graphql_post("CreateTweet", variables, FEATURES) data = self._graphql_post("CreateTweet", variables, FEATURES)
self._write_delay()
result = _deep_get(data, "data", "create_tweet", "tweet_results", "result") result = _deep_get(data, "data", "create_tweet", "tweet_results", "result")
if result: if result:
return result.get("rest_id", "") return result.get("rest_id", "")
@@ -493,42 +533,49 @@ class TwitterClient:
"""Delete a tweet. Returns True on success.""" """Delete a tweet. Returns True on success."""
variables = {"tweet_id": tweet_id, "dark_request": False} variables = {"tweet_id": tweet_id, "dark_request": False}
self._graphql_post("DeleteTweet", variables) self._graphql_post("DeleteTweet", variables)
self._write_delay()
return True return True
def like_tweet(self, tweet_id): def like_tweet(self, tweet_id):
# type: (str) -> bool # type: (str) -> bool
"""Like a tweet. Returns True on success.""" """Like a tweet. Returns True on success."""
self._graphql_post("FavoriteTweet", {"tweet_id": tweet_id}) self._graphql_post("FavoriteTweet", {"tweet_id": tweet_id})
self._write_delay()
return True return True
def unlike_tweet(self, tweet_id): def unlike_tweet(self, tweet_id):
# type: (str) -> bool # type: (str) -> bool
"""Unlike a tweet. Returns True on success.""" """Unlike a tweet. Returns True on success."""
self._graphql_post("UnfavoriteTweet", {"tweet_id": tweet_id, "dark_request": False}) self._graphql_post("UnfavoriteTweet", {"tweet_id": tweet_id, "dark_request": False})
self._write_delay()
return True return True
def retweet(self, tweet_id): def retweet(self, tweet_id):
# type: (str) -> bool # type: (str) -> bool
"""Retweet a tweet. Returns True on success.""" """Retweet a tweet. Returns True on success."""
self._graphql_post("CreateRetweet", {"tweet_id": tweet_id, "dark_request": False}) self._graphql_post("CreateRetweet", {"tweet_id": tweet_id, "dark_request": False})
self._write_delay()
return True return True
def unretweet(self, tweet_id): def unretweet(self, tweet_id):
# type: (str) -> bool # type: (str) -> bool
"""Undo a retweet. Returns True on success.""" """Undo a retweet. Returns True on success."""
self._graphql_post("DeleteRetweet", {"source_tweet_id": tweet_id, "dark_request": False}) self._graphql_post("DeleteRetweet", {"source_tweet_id": tweet_id, "dark_request": False})
self._write_delay()
return True return True
def bookmark_tweet(self, tweet_id): def bookmark_tweet(self, tweet_id):
# type: (str) -> bool # type: (str) -> bool
"""Bookmark a tweet. Returns True on success.""" """Bookmark a tweet. Returns True on success."""
self._graphql_post("CreateBookmark", {"tweet_id": tweet_id}) self._graphql_post("CreateBookmark", {"tweet_id": tweet_id})
self._write_delay()
return True return True
def unbookmark_tweet(self, tweet_id): def unbookmark_tweet(self, tweet_id):
# type: (str) -> bool # type: (str) -> bool
"""Remove a tweet from bookmarks. Returns True on success.""" """Remove a tweet from bookmarks. Returns True on success."""
self._graphql_post("DeleteBookmark", {"tweet_id": tweet_id}) self._graphql_post("DeleteBookmark", {"tweet_id": tweet_id})
self._write_delay()
return True return True
def _fetch_timeline(self, operation_name, count, get_instructions, extra_variables=None, override_base_variables=False, field_toggles=None): def _fetch_timeline(self, operation_name, count, get_instructions, extra_variables=None, override_base_variables=False, field_toggles=None):
@@ -609,7 +656,10 @@ class TwitterClient:
def _ensure_client_transaction(self): def _ensure_client_transaction(self):
# type: () -> None # type: () -> None
"""Initialize ClientTransaction for x-client-transaction-id header.""" """Initialize ClientTransaction for x-client-transaction-id header.
Also attempts to extract live feature flags from JS bundles.
"""
if self._ct_init_attempted: if self._ct_init_attempted:
return return
self._ct_init_attempted = True self._ct_init_attempted = True
@@ -632,6 +682,9 @@ class TwitterClient:
ondemand_file_response=ondemand_file.text, ondemand_file_response=ondemand_file.text,
) )
logger.info("ClientTransaction initialized for x-client-transaction-id") logger.info("ClientTransaction initialized for x-client-transaction-id")
# Try to extract live FEATURES from the homepage JS bundles
_update_features_from_html(home_page.text)
except Exception as exc: except Exception as exc:
logger.warning("Failed to init ClientTransaction: %s", exc) logger.warning("Failed to init ClientTransaction: %s", exc)

2
uv.lock generated
View File

@@ -950,7 +950,7 @@ wheels = [
[[package]] [[package]]
name = "twitter-cli" name = "twitter-cli"
version = "0.3.4" version = "0.4.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "beautifulsoup4" }, { name = "beautifulsoup4" },