refactor: deep review fixes round 3
- client.py:
- Remove dead _extract_cursor second branch (unreachable code)
- Cache SSL context as module-level _SSL_CTX (avoid re-reading CA certs)
- Add 404 stale-fallback retry to _graphql_post (parity with GET)
- Remove dead core.get('name')/core.get('screen_name') in fetch_user
- Set Content-Type: application/json only for POST requests
- Rename _to_int → _parse_int for clarity vs config._as_int
- Add 'not thread-safe' note on module-level caches
- cli.py:
- _fetch_and_display now accepts optional config param (fix double load)
- Refactor user_posts to use _fetch_and_display
- Pass config to all _fetch_and_display callers
- pyproject.toml:
- Move xclienttransaction/requests to optional [transaction] deps
- Add beautifulsoup4 to [transaction] optional deps
- README.md:
- Add rateLimit config section with comments
- Add constants.py to project structure tree
This commit is contained in:
@@ -115,10 +115,11 @@ def cli(verbose):
|
||||
_setup_logging(verbose)
|
||||
|
||||
|
||||
def _fetch_and_display(fetch_fn, label, emoji, max_count, as_json, output_file, do_filter):
|
||||
# type: (Any, str, str, Optional[int], bool, Optional[str], bool) -> None
|
||||
def _fetch_and_display(fetch_fn, label, emoji, max_count, as_json, output_file, do_filter, config=None):
|
||||
# type: (Any, str, str, Optional[int], bool, Optional[str], bool, Optional[dict]) -> None
|
||||
"""Common fetch-filter-display logic for timeline-like commands."""
|
||||
config = load_config()
|
||||
if config is None:
|
||||
config = load_config()
|
||||
try:
|
||||
fetch_count = _resolve_fetch_count(max_count, config.get("fetch", {}).get("count", 50))
|
||||
console.print("%s Fetching %s (%d tweets)...\n" % (emoji, label, fetch_count))
|
||||
@@ -211,7 +212,7 @@ def favorite(max_count, as_json, output_file, do_filter):
|
||||
client = _get_client(config)
|
||||
_fetch_and_display(
|
||||
lambda count: client.fetch_bookmarks(count),
|
||||
"favorites", "🔖", max_count, as_json, output_file, do_filter,
|
||||
"favorites", "🔖", max_count, as_json, output_file, do_filter, config,
|
||||
)
|
||||
|
||||
|
||||
@@ -243,26 +244,17 @@ def user_posts(screen_name, max_count, as_json):
|
||||
"""List a user's tweets. SCREEN_NAME is the @handle (without @)."""
|
||||
screen_name = screen_name.lstrip("@")
|
||||
config = load_config()
|
||||
client = _get_client(config)
|
||||
console.print("👤 Fetching @%s's profile..." % screen_name)
|
||||
try:
|
||||
fetch_count = _resolve_fetch_count(max_count, 20)
|
||||
client = _get_client(config)
|
||||
console.print("👤 Fetching @%s's profile..." % screen_name)
|
||||
profile = client.fetch_user(screen_name)
|
||||
console.print("📝 Fetching tweets (%d)...\n" % fetch_count)
|
||||
start = time.time()
|
||||
tweets = client.fetch_user_tweets(profile.id, fetch_count)
|
||||
elapsed = time.time() - start
|
||||
console.print("✅ Fetched %d tweets in %.1fs\n" % (len(tweets), elapsed))
|
||||
except RuntimeError as exc:
|
||||
console.print("[red]❌ %s[/red]" % exc)
|
||||
sys.exit(1)
|
||||
|
||||
if as_json:
|
||||
click.echo(tweets_to_json(tweets))
|
||||
return
|
||||
|
||||
print_tweet_table(tweets, console, title="📝 @%s — %d tweets" % (screen_name, len(tweets)))
|
||||
console.print()
|
||||
_fetch_and_display(
|
||||
lambda count: client.fetch_user_tweets(profile.id, count),
|
||||
"@%s tweets" % screen_name, "📝", max_count, as_json, None, False, config,
|
||||
)
|
||||
|
||||
|
||||
SEARCH_PRODUCTS = ["Top", "Latest", "Photos", "Videos"]
|
||||
@@ -288,7 +280,7 @@ def search(query, product, max_count, as_json, do_filter):
|
||||
client = _get_client(config)
|
||||
_fetch_and_display(
|
||||
lambda count: client.fetch_search(query, count, product),
|
||||
"'%s' (%s)" % (query, product), "🔍", max_count, as_json, None, do_filter,
|
||||
"'%s' (%s)" % (query, product), "🔍", max_count, as_json, None, do_filter, config,
|
||||
)
|
||||
|
||||
|
||||
@@ -307,7 +299,7 @@ def likes(screen_name, max_count, as_json, do_filter):
|
||||
profile = client.fetch_user(screen_name)
|
||||
_fetch_and_display(
|
||||
lambda count: client.fetch_user_likes(profile.id, count),
|
||||
"@%s likes" % screen_name, "❤️", max_count, as_json, None, do_filter,
|
||||
"@%s likes" % screen_name, "❤️", max_count, as_json, None, do_filter, config,
|
||||
)
|
||||
|
||||
|
||||
@@ -356,7 +348,7 @@ def list_timeline(list_id, max_count, as_json, do_filter):
|
||||
client = _get_client(config)
|
||||
_fetch_and_display(
|
||||
lambda count: client.fetch_list_timeline(list_id, count),
|
||||
"list %s" % list_id, "📋", max_count, as_json, None, do_filter,
|
||||
"list %s" % list_id, "📋", max_count, as_json, None, do_filter, config,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -96,6 +96,7 @@ FEATURES = {
|
||||
"responsive_web_enhance_cards_enabled": False,
|
||||
}
|
||||
|
||||
# Module-level caches (not thread-safe — CLI is single-threaded)
|
||||
_cached_query_ids = {} # type: Dict[str, str]
|
||||
_bundles_scanned = False
|
||||
|
||||
@@ -109,10 +110,14 @@ class TwitterAPIError(RuntimeError):
|
||||
self.status_code = status_code
|
||||
|
||||
|
||||
# Reuse a single SSL context across all requests (avoids re-reading CA certs)
|
||||
_SSL_CTX = ssl.create_default_context()
|
||||
|
||||
|
||||
def _create_ssl_context():
|
||||
# type: () -> ssl.SSLContext
|
||||
"""Create SSL context for urllib."""
|
||||
return ssl.create_default_context()
|
||||
"""Return shared SSL context."""
|
||||
return _SSL_CTX
|
||||
|
||||
|
||||
def _url_fetch(url, headers=None):
|
||||
@@ -304,18 +309,17 @@ class TwitterClient:
|
||||
raise RuntimeError("User @%s not found" % screen_name)
|
||||
|
||||
legacy = result.get("legacy", {})
|
||||
core = result.get("core", {})
|
||||
return UserProfile(
|
||||
id=result.get("rest_id", ""),
|
||||
name=core.get("name") or legacy.get("name", ""),
|
||||
screen_name=core.get("screen_name") or legacy.get("screen_name", screen_name),
|
||||
name=legacy.get("name", ""),
|
||||
screen_name=legacy.get("screen_name", screen_name),
|
||||
bio=legacy.get("description", ""),
|
||||
location=legacy.get("location", ""),
|
||||
url=_deep_get(legacy, "entities", "url", "urls", 0, "expanded_url") or "",
|
||||
followers_count=_to_int(legacy.get("followers_count"), 0),
|
||||
following_count=_to_int(legacy.get("friends_count"), 0),
|
||||
tweets_count=_to_int(legacy.get("statuses_count"), 0),
|
||||
likes_count=_to_int(legacy.get("favourites_count"), 0),
|
||||
followers_count=_parse_int(legacy.get("followers_count"), 0),
|
||||
following_count=_parse_int(legacy.get("friends_count"), 0),
|
||||
tweets_count=_parse_int(legacy.get("statuses_count"), 0),
|
||||
likes_count=_parse_int(legacy.get("favourites_count"), 0),
|
||||
verified=bool(result.get("is_blue_verified") or legacy.get("verified", False)),
|
||||
profile_image_url=legacy.get("profile_image_url_https", ""),
|
||||
created_at=legacy.get("created_at", ""),
|
||||
@@ -602,12 +606,13 @@ class TwitterClient:
|
||||
"X-Twitter-Active-User": "yes",
|
||||
"X-Twitter-Auth-Type": "OAuth2Session",
|
||||
"X-Twitter-Client-Language": "en",
|
||||
"Content-Type": "application/json",
|
||||
"User-Agent": USER_AGENT,
|
||||
"Referer": "https://x.com",
|
||||
"Accept": "*/*",
|
||||
"Accept-Language": "en-US,en;q=0.9",
|
||||
}
|
||||
if method == "POST":
|
||||
headers["Content-Type"] = "application/json"
|
||||
# Generate x-client-transaction-id if available
|
||||
if self._client_transaction and url:
|
||||
try:
|
||||
@@ -627,13 +632,27 @@ class TwitterClient:
|
||||
|
||||
def _graphql_post(self, operation_name, variables, features=None):
|
||||
# type: (str, Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any]
|
||||
"""Issue GraphQL POST request."""
|
||||
"""Issue GraphQL POST request with automatic stale-fallback retry."""
|
||||
query_id = _resolve_query_id(operation_name, prefer_fallback=True)
|
||||
url = "https://x.com/i/api/graphql/%s/%s" % (query_id, operation_name)
|
||||
body = {"variables": variables, "queryId": query_id}
|
||||
if features:
|
||||
body["features"] = features
|
||||
return self._api_request(url, method="POST", body=body)
|
||||
using_fallback = query_id == FALLBACK_QUERY_IDS.get(operation_name)
|
||||
|
||||
def _do_post(qid):
|
||||
# type: (str) -> Dict[str, Any]
|
||||
url = "https://x.com/i/api/graphql/%s/%s" % (qid, operation_name)
|
||||
body = {"variables": variables, "queryId": qid} # type: Dict[str, Any]
|
||||
if features:
|
||||
body["features"] = features
|
||||
return self._api_request(url, method="POST", body=body)
|
||||
|
||||
try:
|
||||
return _do_post(query_id)
|
||||
except TwitterAPIError as exc:
|
||||
if exc.status_code == 404 and using_fallback:
|
||||
logger.info("Retrying POST %s with live queryId after 404", operation_name)
|
||||
_invalidate_query_id(operation_name)
|
||||
refreshed = _resolve_query_id(operation_name, prefer_fallback=False)
|
||||
return _do_post(refreshed)
|
||||
raise RuntimeError(str(exc))
|
||||
|
||||
def _api_request(self, url, method="GET", body=None):
|
||||
# type: (str, str, Optional[Dict[str, Any]]) -> Dict[str, Any]
|
||||
@@ -870,12 +889,12 @@ class TwitterClient:
|
||||
text=actual_legacy.get("full_text", ""),
|
||||
author=author,
|
||||
metrics=Metrics(
|
||||
likes=_to_int(actual_legacy.get("favorite_count"), 0),
|
||||
retweets=_to_int(actual_legacy.get("retweet_count"), 0),
|
||||
replies=_to_int(actual_legacy.get("reply_count"), 0),
|
||||
quotes=_to_int(actual_legacy.get("quote_count"), 0),
|
||||
views=_to_int(_deep_get(actual_data, "views", "count"), 0),
|
||||
bookmarks=_to_int(actual_legacy.get("bookmark_count"), 0),
|
||||
likes=_parse_int(actual_legacy.get("favorite_count"), 0),
|
||||
retweets=_parse_int(actual_legacy.get("retweet_count"), 0),
|
||||
replies=_parse_int(actual_legacy.get("reply_count"), 0),
|
||||
quotes=_parse_int(actual_legacy.get("quote_count"), 0),
|
||||
views=_parse_int(_deep_get(actual_data, "views", "count"), 0),
|
||||
bookmarks=_parse_int(actual_legacy.get("bookmark_count"), 0),
|
||||
),
|
||||
created_at=actual_legacy.get("created_at", ""),
|
||||
media=media,
|
||||
@@ -959,14 +978,12 @@ def _extract_cursor(content):
|
||||
"""Extract Bottom pagination cursor from timeline content."""
|
||||
if content.get("cursorType") == "Bottom":
|
||||
return content.get("value")
|
||||
if content.get("entryType") == "TimelineTimelineCursor" and content.get("cursorType") == "Bottom":
|
||||
return content.get("value")
|
||||
return None
|
||||
|
||||
|
||||
def _to_int(value, default):
|
||||
def _parse_int(value, default):
|
||||
# type: (Any, int) -> int
|
||||
"""Best-effort integer conversion."""
|
||||
"""Best-effort integer conversion. Handles commas and float strings."""
|
||||
try:
|
||||
text = str(value).replace(",", "").strip()
|
||||
if not text:
|
||||
|
||||
Reference in New Issue
Block a user