From df39a15d00cb63eb9fb9e221d9050d1f3748c30f Mon Sep 17 00:00:00 2001 From: jackwener Date: Sat, 7 Mar 2026 20:30:59 +0800 Subject: [PATCH] refactor: code review fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug fixes: - _extract_cursor: only extract Bottom cursors, preventing Top cursor from corrupting pagination state - _api_request: merge _api_get/_api_post into unified method — POST now has rate-limit code 88 retry (was missing) - fetch_user_likes: add override_base_variables=True Code quality: - Extract BEARER_TOKEN and USER_AGENT into constants.py (was duped in auth.py and client.py) - Add user_profile_to_dict/users_to_json for proper UserProfile serialization (followers/following JSON output was ad-hoc) - Refactor 6 CLI write commands via _write_action helper - Extract _extract_media and _extract_author from _parse_tweet_result - Update CLI module docstring with all 18 commands --- twitter_cli/auth.py | 17 +--- twitter_cli/cli.py | 126 ++++++++--------------- twitter_cli/client.py | 192 +++++++++++++++-------------------- twitter_cli/constants.py | 12 +++ twitter_cli/serialization.py | 32 +++++- 5 files changed, 170 insertions(+), 209 deletions(-) create mode 100644 twitter_cli/constants.py diff --git a/twitter_cli/auth.py b/twitter_cli/auth.py index 8db7c6a..73bcb25 100644 --- a/twitter_cli/auth.py +++ b/twitter_cli/auth.py @@ -17,15 +17,10 @@ import urllib.error import urllib.request from typing import Dict, Optional +from .constants import BEARER_TOKEN, USER_AGENT + logger = logging.getLogger(__name__) -# Public bearer token (same as in client.py) -_BEARER_TOKEN = ( - "AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs" - "%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA" -) - - def load_from_env() -> Optional[Dict[str, str]]: """Load cookies from environment variables.""" auth_token = os.environ.get("TWITTER_AUTH_TOKEN", "") @@ -49,16 +44,12 @@ def verify_cookies(auth_token, ct0): ] headers = { - "Authorization": "Bearer %s" % _BEARER_TOKEN, + "Authorization": "Bearer %s" % BEARER_TOKEN, "Cookie": "auth_token=%s; ct0=%s" % (auth_token, ct0), "X-Csrf-Token": ct0, "X-Twitter-Active-User": "yes", "X-Twitter-Auth-Type": "OAuth2Session", - "User-Agent": ( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/131.0.0.0 Safari/537.36" - ), + "User-Agent": USER_AGENT, } for url in urls: diff --git a/twitter_cli/cli.py b/twitter_cli/cli.py index ec6f2f7..6c2fc97 100644 --- a/twitter_cli/cli.py +++ b/twitter_cli/cli.py @@ -1,16 +1,24 @@ """CLI entry point for twitter-cli. -Usage: - twitter feed # fetch home timeline (For You) - twitter feed -t following # fetch following feed - twitter feed --max 50 # custom fetch count - twitter feed --filter # enable score-based filtering - twitter feed --json # JSON output - twitter favorite # fetch bookmarks - twitter feed --input tweets.json # load existing data - twitter feed --output out.json # save filtered tweets - twitter user elonmusk # view user profile - twitter user-posts elonmusk # list user tweets +Read commands: + twitter feed # home timeline (For You) + twitter feed -t following # following feed + twitter favorite # bookmarks + twitter search "query" # search tweets + twitter user elonmusk # user profile + twitter user-posts elonmusk # user tweets + twitter likes elonmusk # user likes + twitter tweet # tweet detail + replies + twitter list # list timeline + twitter followers # followers list + twitter following # following list + +Write commands: + twitter post "text" # post a tweet + twitter delete # delete a tweet + twitter like/unlike # like/unlike + twitter rt/unrt # retweet/unretweet + twitter bookmark-add/rm # bookmark management """ from __future__ import annotations @@ -35,7 +43,7 @@ from .formatter import ( print_user_profile, print_user_table, ) -from .serialization import tweets_from_json, tweets_to_json +from .serialization import tweets_from_json, tweets_to_json, users_to_json console = Console() @@ -413,10 +421,7 @@ def followers(screen_name, max_count, as_json): sys.exit(1) if as_json: - import json - click.echo(json.dumps([{"id": u.id, "name": u.name, "screen_name": u.screen_name, - "bio": u.bio, "followers": u.followers_count, - "following": u.following_count} for u in users], indent=2, ensure_ascii=False)) + click.echo(users_to_json(users)) return print_user_table(users, console, title="👥 @%s followers — %d" % (screen_name, len(users))) @@ -446,10 +451,7 @@ def following(screen_name, max_count, as_json): sys.exit(1) if as_json: - import json - click.echo(json.dumps([{"id": u.id, "name": u.name, "screen_name": u.screen_name, - "bio": u.bio, "followers": u.followers_count, - "following": u.following_count} for u in users], indent=2, ensure_ascii=False)) + click.echo(users_to_json(users)) return print_user_table(users, console, title="👥 @%s following — %d" % (screen_name, len(users))) @@ -458,6 +460,20 @@ def following(screen_name, max_count, as_json): # ── Write commands ────────────────────────────────────────────────────── +def _write_action(emoji, action_desc, client_method, tweet_id): + # type: (str, str, str, str) -> None + """Generic write action helper to reduce CLI command boilerplate.""" + config = load_config() + try: + client = _get_client(config) + console.print("%s %s %s..." % (emoji, action_desc, tweet_id)) + getattr(client, client_method)(tweet_id) + console.print("[green]✅ Done.[/green]") + except RuntimeError as exc: + console.print("[red]❌ %s[/red]" % exc) + sys.exit(1) + + @cli.command() @click.argument("text") @click.option("--reply-to", "-r", default=None, help="Reply to this tweet ID.") @@ -483,15 +499,7 @@ def post(text, reply_to): def delete_tweet(tweet_id): # type: (str,) -> None """Delete a tweet. TWEET_ID is the numeric tweet ID.""" - config = load_config() - try: - client = _get_client(config) - console.print("🗑️ Deleting tweet %s..." % tweet_id) - client.delete_tweet(tweet_id) - console.print("[green]✅ Tweet deleted.[/green]") - except RuntimeError as exc: - console.print("[red]❌ %s[/red]" % exc) - sys.exit(1) + _write_action("🗑️", "Deleting tweet", "delete_tweet", tweet_id) @cli.command() @@ -499,15 +507,7 @@ def delete_tweet(tweet_id): def like(tweet_id): # type: (str,) -> None """Like a tweet. TWEET_ID is the numeric tweet ID.""" - config = load_config() - try: - client = _get_client(config) - console.print("❤️ Liking tweet %s..." % tweet_id) - client.like_tweet(tweet_id) - console.print("[green]✅ Liked![/green]") - except RuntimeError as exc: - console.print("[red]❌ %s[/red]" % exc) - sys.exit(1) + _write_action("❤️", "Liking tweet", "like_tweet", tweet_id) @cli.command() @@ -515,15 +515,7 @@ def like(tweet_id): def unlike(tweet_id): # type: (str,) -> None """Unlike a tweet. TWEET_ID is the numeric tweet ID.""" - config = load_config() - try: - client = _get_client(config) - console.print("💔 Unliking tweet %s..." % tweet_id) - client.unlike_tweet(tweet_id) - console.print("[green]✅ Unliked.[/green]") - except RuntimeError as exc: - console.print("[red]❌ %s[/red]" % exc) - sys.exit(1) + _write_action("💔", "Unliking tweet", "unlike_tweet", tweet_id) @cli.command() @@ -531,15 +523,7 @@ def unlike(tweet_id): def rt(tweet_id): # type: (str,) -> None """Retweet a tweet. TWEET_ID is the numeric tweet ID.""" - config = load_config() - try: - client = _get_client(config) - console.print("🔄 Retweeting %s..." % tweet_id) - client.retweet(tweet_id) - console.print("[green]✅ Retweeted![/green]") - except RuntimeError as exc: - console.print("[red]❌ %s[/red]" % exc) - sys.exit(1) + _write_action("🔄", "Retweeting", "retweet", tweet_id) @cli.command() @@ -547,15 +531,7 @@ def rt(tweet_id): def unrt(tweet_id): # type: (str,) -> None """Undo a retweet. TWEET_ID is the numeric tweet ID.""" - config = load_config() - try: - client = _get_client(config) - console.print("🔄 Undoing retweet %s..." % tweet_id) - client.unretweet(tweet_id) - console.print("[green]✅ Retweet undone.[/green]") - except RuntimeError as exc: - console.print("[red]❌ %s[/red]" % exc) - sys.exit(1) + _write_action("🔄", "Undoing retweet", "unretweet", tweet_id) @cli.command(name="bookmark-add") @@ -563,15 +539,7 @@ def unrt(tweet_id): def bookmark_add(tweet_id): # type: (str,) -> None """Bookmark a tweet. TWEET_ID is the numeric tweet ID.""" - config = load_config() - try: - client = _get_client(config) - console.print("🔖 Bookmarking tweet %s..." % tweet_id) - client.bookmark_tweet(tweet_id) - console.print("[green]✅ Bookmarked![/green]") - except RuntimeError as exc: - console.print("[red]❌ %s[/red]" % exc) - sys.exit(1) + _write_action("🔖", "Bookmarking tweet", "bookmark_tweet", tweet_id) @cli.command(name="bookmark-rm") @@ -579,15 +547,7 @@ def bookmark_add(tweet_id): def bookmark_rm(tweet_id): # type: (str,) -> None """Remove a tweet from bookmarks. TWEET_ID is the numeric tweet ID.""" - config = load_config() - try: - client = _get_client(config) - console.print("🔖 Removing bookmark %s..." % tweet_id) - client.unbookmark_tweet(tweet_id) - console.print("[green]✅ Bookmark removed.[/green]") - except RuntimeError as exc: - console.print("[red]❌ %s[/red]" % exc) - sys.exit(1) + _write_action("🔖", "Removing bookmark", "unbookmark_tweet", tweet_id) if __name__ == "__main__": diff --git a/twitter_cli/client.py b/twitter_cli/client.py index 9dfba7b..5a610d7 100644 --- a/twitter_cli/client.py +++ b/twitter_cli/client.py @@ -12,6 +12,7 @@ import urllib.error import urllib.parse import urllib.request +from .constants import BEARER_TOKEN, USER_AGENT from .models import Author, Metrics, Tweet, TweetMedia, UserProfile try: @@ -26,11 +27,6 @@ except ImportError: # pragma: no cover logger = logging.getLogger(__name__) -BEARER_TOKEN = ( - "AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs" - "%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA" -) - FALLBACK_QUERY_IDS = { # Read operations "HomeTimeline": "c-CzHF1LboFilMpsx4ZCrQ", @@ -100,12 +96,6 @@ FEATURES = { "responsive_web_enhance_cards_enabled": False, } -USER_AGENT = ( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/131.0.0.0 Safari/537.36" -) - _cached_query_ids = {} # type: Dict[str, str] _bundles_scanned = False @@ -364,6 +354,7 @@ class TwitterClient: "withBirdwatchNotes": False, "withVoice": True, }, + override_base_variables=True, ) def fetch_search(self, query, count=20, product="Top"): @@ -635,12 +626,31 @@ class TwitterClient: def _api_get(self, url): # type: (str) -> Dict[str, Any] - """Make authenticated GET request to Twitter API with retry on 429.""" + """Make authenticated GET request to Twitter API.""" + return self._api_request(url, method="GET") + + def _graphql_post(self, operation_name, variables, features=None): + # type: (str, Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any] + """Issue GraphQL POST request.""" + query_id = _resolve_query_id(operation_name, prefer_fallback=True) + url = "https://x.com/i/api/graphql/%s/%s" % (query_id, operation_name) + body = {"variables": variables, "queryId": query_id} + if features: + body["features"] = features + return self._api_request(url, method="POST", body=body) + + def _api_request(self, url, method="GET", body=None): + # type: (str, str, Optional[Dict[str, Any]]) -> Dict[str, Any] + """Make authenticated request to Twitter API with retry on rate limits. + + Handles both GET and POST. Retries on HTTP 429 and JSON error code 88. + """ self._ensure_client_transaction() - headers = self._build_headers(url=url) + headers = self._build_headers(url=url, method=method) + encoded_body = json.dumps(body).encode("utf-8") if body else None for attempt in range(self._max_retries + 1): - request = urllib.request.Request(url) + request = urllib.request.Request(url, data=encoded_body, method=method) for key, value in headers.items(): request.add_header(key, value) @@ -656,8 +666,8 @@ class TwitterClient: ) time.sleep(wait) continue - body = exc.read().decode("utf-8", errors="replace") - message = "Twitter API error %d: %s" % (exc.code, body[:500]) + resp_body = exc.read().decode("utf-8", errors="replace") + message = "Twitter API error %d: %s" % (exc.code, resp_body[:500]) raise TwitterAPIError(exc.code, message) except urllib.error.URLError as exc: raise TwitterAPIError(0, "Twitter API network error: %s" % exc.reason) @@ -685,58 +695,6 @@ class TwitterClient: # Should not be reached, but just in case raise TwitterAPIError(429, "Rate limited after %d retries" % self._max_retries) - def _graphql_post(self, operation_name, variables, features=None): - # type: (str, Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any] - """Issue GraphQL POST request.""" - query_id = _resolve_query_id(operation_name, prefer_fallback=True) - url = "https://x.com/i/api/graphql/%s/%s" % (query_id, operation_name) - body = {"variables": variables, "queryId": query_id} - if features: - body["features"] = features - return self._api_post(url, body) - - def _api_post(self, url, body): - # type: (str, Dict[str, Any]) -> Dict[str, Any] - """Make authenticated POST request to Twitter API.""" - self._ensure_client_transaction() - headers = self._build_headers(url=url, method="POST") - data = json.dumps(body).encode("utf-8") - - for attempt in range(self._max_retries + 1): - request = urllib.request.Request(url, data=data, method="POST") - for key, value in headers.items(): - request.add_header(key, value) - - try: - with urllib.request.urlopen(request, context=_create_ssl_context(), timeout=30) as response: - payload = response.read().decode("utf-8") - except urllib.error.HTTPError as exc: - if exc.code == 429 and attempt < self._max_retries: - wait = self._retry_base_delay * (2 ** attempt) - logger.warning( - "Rate limited (429), retrying in %.1fs (attempt %d/%d)", - wait, attempt + 1, self._max_retries, - ) - time.sleep(wait) - continue - body_text = exc.read().decode("utf-8", errors="replace") - message = "Twitter API error %d: %s" % (exc.code, body_text[:500]) - raise TwitterAPIError(exc.code, message) - except urllib.error.URLError as exc: - raise TwitterAPIError(0, "Twitter API network error: %s" % exc.reason) - - try: - parsed = json.loads(payload) - except json.JSONDecodeError: - raise TwitterAPIError(0, "Twitter API returned invalid JSON") - - if isinstance(parsed, dict) and parsed.get("errors"): - err_msg = parsed["errors"][0].get("message", "Unknown error") - raise TwitterAPIError(0, "Twitter API returned errors: %s" % err_msg) - return parsed - - raise TwitterAPIError(429, "Rate limited after %d retries" % self._max_retries) - def _fetch_user_list(self, operation_name, user_id, count, get_instructions): # type: (str, str, int, Callable[[Any], Any]) -> List[UserProfile] """Generic user list fetcher (for followers/following) with pagination.""" @@ -901,44 +859,12 @@ class TwitterClient: actual_user = _deep_get(rt_core, "user_results", "result") or {} actual_user_legacy = actual_user.get("legacy", {}) - media = [] # type: List[TweetMedia] - for media_item in _deep_get(actual_legacy, "extended_entities", "media") or []: - media_type = media_item.get("type", "") - if media_type == "photo": - media.append( - TweetMedia( - type="photo", - url=media_item.get("media_url_https", ""), - width=_deep_get(media_item, "original_info", "width"), - height=_deep_get(media_item, "original_info", "height"), - ) - ) - elif media_type in {"video", "animated_gif"}: - variants = media_item.get("video_info", {}).get("variants", []) - mp4_variants = [item for item in variants if item.get("content_type") == "video/mp4"] - mp4_variants.sort(key=lambda item: item.get("bitrate", 0), reverse=True) - media.append( - TweetMedia( - type=media_type, - url=mp4_variants[0]["url"] if mp4_variants else media_item.get("media_url_https", ""), - width=_deep_get(media_item, "original_info", "width"), - height=_deep_get(media_item, "original_info", "height"), - ) - ) - + media = _extract_media(actual_legacy) urls = [item.get("expanded_url", "") for item in _deep_get(actual_legacy, "entities", "urls") or []] quoted = _deep_get(actual_data, "quoted_status_result", "result") quoted_tweet = self._parse_tweet_result(quoted, depth=depth + 1) if isinstance(quoted, dict) else None + author = _extract_author(actual_user, actual_user_legacy) - actual_user_core = actual_user.get("core", {}) - user_name = actual_user_core.get("name") or actual_user_legacy.get("name") or actual_user.get("name", "Unknown") - user_screen_name = ( - actual_user_core.get("screen_name") - or actual_user_legacy.get("screen_name") - or actual_user.get("screen_name", "unknown") - ) - user_profile_image = actual_user.get("avatar", {}).get("image_url") or actual_user_legacy.get("profile_image_url_https", "") - user_verified = bool(actual_user.get("is_blue_verified") or actual_user_legacy.get("verified", False)) retweeted_by = None # type: Optional[str] if is_retweet: retweeted_by = user_core.get("screen_name") or user_legacy.get("screen_name", "unknown") @@ -946,13 +872,7 @@ class TwitterClient: return Tweet( id=actual_data.get("rest_id", ""), text=actual_legacy.get("full_text", ""), - author=Author( - id=actual_user.get("rest_id", ""), - name=user_name, - screen_name=user_screen_name, - profile_image_url=user_profile_image, - verified=user_verified, - ), + author=author, metrics=Metrics( likes=_to_int(actual_legacy.get("favorite_count"), 0), retweets=_to_int(actual_legacy.get("retweet_count"), 0), @@ -971,6 +891,56 @@ class TwitterClient: ) +def _extract_media(legacy): + # type: (Dict[str, Any]) -> List[TweetMedia] + """Extract media items from tweet legacy data.""" + media = [] # type: List[TweetMedia] + for media_item in _deep_get(legacy, "extended_entities", "media") or []: + media_type = media_item.get("type", "") + if media_type == "photo": + media.append( + TweetMedia( + type="photo", + url=media_item.get("media_url_https", ""), + width=_deep_get(media_item, "original_info", "width"), + height=_deep_get(media_item, "original_info", "height"), + ) + ) + elif media_type in {"video", "animated_gif"}: + variants = media_item.get("video_info", {}).get("variants", []) + mp4_variants = [v for v in variants if v.get("content_type") == "video/mp4"] + mp4_variants.sort(key=lambda v: v.get("bitrate", 0), reverse=True) + media.append( + TweetMedia( + type=media_type, + url=mp4_variants[0]["url"] if mp4_variants else media_item.get("media_url_https", ""), + width=_deep_get(media_item, "original_info", "width"), + height=_deep_get(media_item, "original_info", "height"), + ) + ) + return media + + +def _extract_author(user_data, user_legacy): + # type: (Dict[str, Any], Dict[str, Any]) -> Author + """Extract Author from user result data.""" + user_core = user_data.get("core", {}) + return Author( + id=user_data.get("rest_id", ""), + name=user_core.get("name") or user_legacy.get("name") or user_data.get("name", "Unknown"), + screen_name=( + user_core.get("screen_name") + or user_legacy.get("screen_name") + or user_data.get("screen_name", "unknown") + ), + profile_image_url=( + user_data.get("avatar", {}).get("image_url") + or user_legacy.get("profile_image_url_https", "") + ), + verified=bool(user_data.get("is_blue_verified") or user_legacy.get("verified", False)), + ) + + def _deep_get(data, *keys): # type: (Any, *Any) -> Any """Safely get nested dict/list values. Supports int keys for list access.""" @@ -990,10 +960,10 @@ def _deep_get(data, *keys): def _extract_cursor(content): # type: (Dict[str, Any]) -> Optional[str] - """Extract pagination cursor from timeline content.""" + """Extract Bottom pagination cursor from timeline content.""" if content.get("cursorType") == "Bottom": return content.get("value") - if content.get("entryType") == "TimelineTimelineCursor": + if content.get("entryType") == "TimelineTimelineCursor" and content.get("cursorType") == "Bottom": return content.get("value") return None diff --git a/twitter_cli/constants.py b/twitter_cli/constants.py new file mode 100644 index 0000000..f624feb --- /dev/null +++ b/twitter_cli/constants.py @@ -0,0 +1,12 @@ +"""Shared constants for twitter-cli.""" + +BEARER_TOKEN = ( + "AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs" + "%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA" +) + +USER_AGENT = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/131.0.0.0 Safari/537.36" +) diff --git a/twitter_cli/serialization.py b/twitter_cli/serialization.py index 3909954..5137cc0 100644 --- a/twitter_cli/serialization.py +++ b/twitter_cli/serialization.py @@ -1,11 +1,11 @@ -"""Serialization helpers for Tweet models.""" +"""Serialization helpers for Tweet and UserProfile models.""" from __future__ import annotations import json from typing import Any, Dict, Iterable, List, Optional -from .models import Author, Metrics, Tweet, TweetMedia +from .models import Author, Metrics, Tweet, TweetMedia, UserProfile def tweet_to_dict(tweet: Tweet) -> Dict[str, Any]: @@ -129,6 +129,34 @@ def tweets_to_json(tweets: Iterable[Tweet]) -> str: return json.dumps([tweet_to_dict(tweet) for tweet in tweets], ensure_ascii=False, indent=2) +def user_profile_to_dict(user: UserProfile) -> Dict[str, Any]: + """Convert a UserProfile dataclass into a JSON-safe dict.""" + return { + "id": user.id, + "name": user.name, + "screenName": user.screen_name, + "bio": user.bio, + "location": user.location, + "url": user.url, + "followers": user.followers_count, + "following": user.following_count, + "tweets": user.tweets_count, + "likes": user.likes_count, + "verified": user.verified, + "profileImageUrl": user.profile_image_url, + "createdAt": user.created_at, + } + + +def users_to_json(users: Iterable[UserProfile]) -> str: + """Serialize UserProfile objects to pretty JSON.""" + return json.dumps( + [user_profile_to_dict(user) for user in users], + ensure_ascii=False, + indent=2, + ) + + def _optional_int(value: Any) -> Optional[int]: """Parse an optional integer value.""" if value is None: