refactor: code review fixes

Bug fixes:
- _extract_cursor: only extract Bottom cursors, preventing Top cursor
  from corrupting pagination state
- _api_request: merge _api_get/_api_post into unified method — POST
  now has rate-limit code 88 retry (was missing)
- fetch_user_likes: add override_base_variables=True

Code quality:
- Extract BEARER_TOKEN and USER_AGENT into constants.py (was duped
  in auth.py and client.py)
- Add user_profile_to_dict/users_to_json for proper UserProfile
  serialization (followers/following JSON output was ad-hoc)
- Refactor 6 CLI write commands via _write_action helper
- Extract _extract_media and _extract_author from _parse_tweet_result
- Update CLI module docstring with all 18 commands
This commit is contained in:
jackwener
2026-03-07 20:30:59 +08:00
parent 6c73a9f0b6
commit df39a15d00
5 changed files with 170 additions and 209 deletions

View File

@@ -17,15 +17,10 @@ import urllib.error
import urllib.request
from typing import Dict, Optional
from .constants import BEARER_TOKEN, USER_AGENT
logger = logging.getLogger(__name__)
# Public bearer token (same as in client.py)
_BEARER_TOKEN = (
"AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs"
"%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA"
)
def load_from_env() -> Optional[Dict[str, str]]:
"""Load cookies from environment variables."""
auth_token = os.environ.get("TWITTER_AUTH_TOKEN", "")
@@ -49,16 +44,12 @@ def verify_cookies(auth_token, ct0):
]
headers = {
"Authorization": "Bearer %s" % _BEARER_TOKEN,
"Authorization": "Bearer %s" % BEARER_TOKEN,
"Cookie": "auth_token=%s; ct0=%s" % (auth_token, ct0),
"X-Csrf-Token": ct0,
"X-Twitter-Active-User": "yes",
"X-Twitter-Auth-Type": "OAuth2Session",
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
),
"User-Agent": USER_AGENT,
}
for url in urls:

View File

@@ -1,16 +1,24 @@
"""CLI entry point for twitter-cli.
Usage:
twitter feed # fetch home timeline (For You)
twitter feed -t following # fetch following feed
twitter feed --max 50 # custom fetch count
twitter feed --filter # enable score-based filtering
twitter feed --json # JSON output
twitter favorite # fetch bookmarks
twitter feed --input tweets.json # load existing data
twitter feed --output out.json # save filtered tweets
twitter user elonmusk # view user profile
twitter user-posts elonmusk # list user tweets
Read commands:
twitter feed # home timeline (For You)
twitter feed -t following # following feed
twitter favorite # bookmarks
twitter search "query" # search tweets
twitter user elonmusk # user profile
twitter user-posts elonmusk # user tweets
twitter likes elonmusk # user likes
twitter tweet <id> # tweet detail + replies
twitter list <id> # list timeline
twitter followers <handle> # followers list
twitter following <handle> # following list
Write commands:
twitter post "text" # post a tweet
twitter delete <id> # delete a tweet
twitter like/unlike <id> # like/unlike
twitter rt/unrt <id> # retweet/unretweet
twitter bookmark-add/rm <id> # bookmark management
"""
from __future__ import annotations
@@ -35,7 +43,7 @@ from .formatter import (
print_user_profile,
print_user_table,
)
from .serialization import tweets_from_json, tweets_to_json
from .serialization import tweets_from_json, tweets_to_json, users_to_json
console = Console()
@@ -413,10 +421,7 @@ def followers(screen_name, max_count, as_json):
sys.exit(1)
if as_json:
import json
click.echo(json.dumps([{"id": u.id, "name": u.name, "screen_name": u.screen_name,
"bio": u.bio, "followers": u.followers_count,
"following": u.following_count} for u in users], indent=2, ensure_ascii=False))
click.echo(users_to_json(users))
return
print_user_table(users, console, title="👥 @%s followers — %d" % (screen_name, len(users)))
@@ -446,10 +451,7 @@ def following(screen_name, max_count, as_json):
sys.exit(1)
if as_json:
import json
click.echo(json.dumps([{"id": u.id, "name": u.name, "screen_name": u.screen_name,
"bio": u.bio, "followers": u.followers_count,
"following": u.following_count} for u in users], indent=2, ensure_ascii=False))
click.echo(users_to_json(users))
return
print_user_table(users, console, title="👥 @%s following — %d" % (screen_name, len(users)))
@@ -458,6 +460,20 @@ def following(screen_name, max_count, as_json):
# ── Write commands ──────────────────────────────────────────────────────
def _write_action(emoji, action_desc, client_method, tweet_id):
# type: (str, str, str, str) -> None
"""Generic write action helper to reduce CLI command boilerplate."""
config = load_config()
try:
client = _get_client(config)
console.print("%s %s %s..." % (emoji, action_desc, tweet_id))
getattr(client, client_method)(tweet_id)
console.print("[green]✅ Done.[/green]")
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
@cli.command()
@click.argument("text")
@click.option("--reply-to", "-r", default=None, help="Reply to this tweet ID.")
@@ -483,15 +499,7 @@ def post(text, reply_to):
def delete_tweet(tweet_id):
# type: (str,) -> None
"""Delete a tweet. TWEET_ID is the numeric tweet ID."""
config = load_config()
try:
client = _get_client(config)
console.print("🗑️ Deleting tweet %s..." % tweet_id)
client.delete_tweet(tweet_id)
console.print("[green]✅ Tweet deleted.[/green]")
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
_write_action("🗑️", "Deleting tweet", "delete_tweet", tweet_id)
@cli.command()
@@ -499,15 +507,7 @@ def delete_tweet(tweet_id):
def like(tweet_id):
# type: (str,) -> None
"""Like a tweet. TWEET_ID is the numeric tweet ID."""
config = load_config()
try:
client = _get_client(config)
console.print("❤️ Liking tweet %s..." % tweet_id)
client.like_tweet(tweet_id)
console.print("[green]✅ Liked![/green]")
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
_write_action("❤️", "Liking tweet", "like_tweet", tweet_id)
@cli.command()
@@ -515,15 +515,7 @@ def like(tweet_id):
def unlike(tweet_id):
# type: (str,) -> None
"""Unlike a tweet. TWEET_ID is the numeric tweet ID."""
config = load_config()
try:
client = _get_client(config)
console.print("💔 Unliking tweet %s..." % tweet_id)
client.unlike_tweet(tweet_id)
console.print("[green]✅ Unliked.[/green]")
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
_write_action("💔", "Unliking tweet", "unlike_tweet", tweet_id)
@cli.command()
@@ -531,15 +523,7 @@ def unlike(tweet_id):
def rt(tweet_id):
# type: (str,) -> None
"""Retweet a tweet. TWEET_ID is the numeric tweet ID."""
config = load_config()
try:
client = _get_client(config)
console.print("🔄 Retweeting %s..." % tweet_id)
client.retweet(tweet_id)
console.print("[green]✅ Retweeted![/green]")
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
_write_action("🔄", "Retweeting", "retweet", tweet_id)
@cli.command()
@@ -547,15 +531,7 @@ def rt(tweet_id):
def unrt(tweet_id):
# type: (str,) -> None
"""Undo a retweet. TWEET_ID is the numeric tweet ID."""
config = load_config()
try:
client = _get_client(config)
console.print("🔄 Undoing retweet %s..." % tweet_id)
client.unretweet(tweet_id)
console.print("[green]✅ Retweet undone.[/green]")
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
_write_action("🔄", "Undoing retweet", "unretweet", tweet_id)
@cli.command(name="bookmark-add")
@@ -563,15 +539,7 @@ def unrt(tweet_id):
def bookmark_add(tweet_id):
# type: (str,) -> None
"""Bookmark a tweet. TWEET_ID is the numeric tweet ID."""
config = load_config()
try:
client = _get_client(config)
console.print("🔖 Bookmarking tweet %s..." % tweet_id)
client.bookmark_tweet(tweet_id)
console.print("[green]✅ Bookmarked![/green]")
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
_write_action("🔖", "Bookmarking tweet", "bookmark_tweet", tweet_id)
@cli.command(name="bookmark-rm")
@@ -579,15 +547,7 @@ def bookmark_add(tweet_id):
def bookmark_rm(tweet_id):
# type: (str,) -> None
"""Remove a tweet from bookmarks. TWEET_ID is the numeric tweet ID."""
config = load_config()
try:
client = _get_client(config)
console.print("🔖 Removing bookmark %s..." % tweet_id)
client.unbookmark_tweet(tweet_id)
console.print("[green]✅ Bookmark removed.[/green]")
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
_write_action("🔖", "Removing bookmark", "unbookmark_tweet", tweet_id)
if __name__ == "__main__":

View File

@@ -12,6 +12,7 @@ import urllib.error
import urllib.parse
import urllib.request
from .constants import BEARER_TOKEN, USER_AGENT
from .models import Author, Metrics, Tweet, TweetMedia, UserProfile
try:
@@ -26,11 +27,6 @@ except ImportError: # pragma: no cover
logger = logging.getLogger(__name__)
BEARER_TOKEN = (
"AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs"
"%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA"
)
FALLBACK_QUERY_IDS = {
# Read operations
"HomeTimeline": "c-CzHF1LboFilMpsx4ZCrQ",
@@ -100,12 +96,6 @@ FEATURES = {
"responsive_web_enhance_cards_enabled": False,
}
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
_cached_query_ids = {} # type: Dict[str, str]
_bundles_scanned = False
@@ -364,6 +354,7 @@ class TwitterClient:
"withBirdwatchNotes": False,
"withVoice": True,
},
override_base_variables=True,
)
def fetch_search(self, query, count=20, product="Top"):
@@ -635,12 +626,31 @@ class TwitterClient:
def _api_get(self, url):
# type: (str) -> Dict[str, Any]
"""Make authenticated GET request to Twitter API with retry on 429."""
"""Make authenticated GET request to Twitter API."""
return self._api_request(url, method="GET")
def _graphql_post(self, operation_name, variables, features=None):
# type: (str, Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any]
"""Issue GraphQL POST request."""
query_id = _resolve_query_id(operation_name, prefer_fallback=True)
url = "https://x.com/i/api/graphql/%s/%s" % (query_id, operation_name)
body = {"variables": variables, "queryId": query_id}
if features:
body["features"] = features
return self._api_request(url, method="POST", body=body)
def _api_request(self, url, method="GET", body=None):
# type: (str, str, Optional[Dict[str, Any]]) -> Dict[str, Any]
"""Make authenticated request to Twitter API with retry on rate limits.
Handles both GET and POST. Retries on HTTP 429 and JSON error code 88.
"""
self._ensure_client_transaction()
headers = self._build_headers(url=url)
headers = self._build_headers(url=url, method=method)
encoded_body = json.dumps(body).encode("utf-8") if body else None
for attempt in range(self._max_retries + 1):
request = urllib.request.Request(url)
request = urllib.request.Request(url, data=encoded_body, method=method)
for key, value in headers.items():
request.add_header(key, value)
@@ -656,8 +666,8 @@ class TwitterClient:
)
time.sleep(wait)
continue
body = exc.read().decode("utf-8", errors="replace")
message = "Twitter API error %d: %s" % (exc.code, body[:500])
resp_body = exc.read().decode("utf-8", errors="replace")
message = "Twitter API error %d: %s" % (exc.code, resp_body[:500])
raise TwitterAPIError(exc.code, message)
except urllib.error.URLError as exc:
raise TwitterAPIError(0, "Twitter API network error: %s" % exc.reason)
@@ -685,58 +695,6 @@ class TwitterClient:
# Should not be reached, but just in case
raise TwitterAPIError(429, "Rate limited after %d retries" % self._max_retries)
def _graphql_post(self, operation_name, variables, features=None):
# type: (str, Dict[str, Any], Optional[Dict[str, Any]]) -> Dict[str, Any]
"""Issue GraphQL POST request."""
query_id = _resolve_query_id(operation_name, prefer_fallback=True)
url = "https://x.com/i/api/graphql/%s/%s" % (query_id, operation_name)
body = {"variables": variables, "queryId": query_id}
if features:
body["features"] = features
return self._api_post(url, body)
def _api_post(self, url, body):
# type: (str, Dict[str, Any]) -> Dict[str, Any]
"""Make authenticated POST request to Twitter API."""
self._ensure_client_transaction()
headers = self._build_headers(url=url, method="POST")
data = json.dumps(body).encode("utf-8")
for attempt in range(self._max_retries + 1):
request = urllib.request.Request(url, data=data, method="POST")
for key, value in headers.items():
request.add_header(key, value)
try:
with urllib.request.urlopen(request, context=_create_ssl_context(), timeout=30) as response:
payload = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
if exc.code == 429 and attempt < self._max_retries:
wait = self._retry_base_delay * (2 ** attempt)
logger.warning(
"Rate limited (429), retrying in %.1fs (attempt %d/%d)",
wait, attempt + 1, self._max_retries,
)
time.sleep(wait)
continue
body_text = exc.read().decode("utf-8", errors="replace")
message = "Twitter API error %d: %s" % (exc.code, body_text[:500])
raise TwitterAPIError(exc.code, message)
except urllib.error.URLError as exc:
raise TwitterAPIError(0, "Twitter API network error: %s" % exc.reason)
try:
parsed = json.loads(payload)
except json.JSONDecodeError:
raise TwitterAPIError(0, "Twitter API returned invalid JSON")
if isinstance(parsed, dict) and parsed.get("errors"):
err_msg = parsed["errors"][0].get("message", "Unknown error")
raise TwitterAPIError(0, "Twitter API returned errors: %s" % err_msg)
return parsed
raise TwitterAPIError(429, "Rate limited after %d retries" % self._max_retries)
def _fetch_user_list(self, operation_name, user_id, count, get_instructions):
# type: (str, str, int, Callable[[Any], Any]) -> List[UserProfile]
"""Generic user list fetcher (for followers/following) with pagination."""
@@ -901,44 +859,12 @@ class TwitterClient:
actual_user = _deep_get(rt_core, "user_results", "result") or {}
actual_user_legacy = actual_user.get("legacy", {})
media = [] # type: List[TweetMedia]
for media_item in _deep_get(actual_legacy, "extended_entities", "media") or []:
media_type = media_item.get("type", "")
if media_type == "photo":
media.append(
TweetMedia(
type="photo",
url=media_item.get("media_url_https", ""),
width=_deep_get(media_item, "original_info", "width"),
height=_deep_get(media_item, "original_info", "height"),
)
)
elif media_type in {"video", "animated_gif"}:
variants = media_item.get("video_info", {}).get("variants", [])
mp4_variants = [item for item in variants if item.get("content_type") == "video/mp4"]
mp4_variants.sort(key=lambda item: item.get("bitrate", 0), reverse=True)
media.append(
TweetMedia(
type=media_type,
url=mp4_variants[0]["url"] if mp4_variants else media_item.get("media_url_https", ""),
width=_deep_get(media_item, "original_info", "width"),
height=_deep_get(media_item, "original_info", "height"),
)
)
media = _extract_media(actual_legacy)
urls = [item.get("expanded_url", "") for item in _deep_get(actual_legacy, "entities", "urls") or []]
quoted = _deep_get(actual_data, "quoted_status_result", "result")
quoted_tweet = self._parse_tweet_result(quoted, depth=depth + 1) if isinstance(quoted, dict) else None
author = _extract_author(actual_user, actual_user_legacy)
actual_user_core = actual_user.get("core", {})
user_name = actual_user_core.get("name") or actual_user_legacy.get("name") or actual_user.get("name", "Unknown")
user_screen_name = (
actual_user_core.get("screen_name")
or actual_user_legacy.get("screen_name")
or actual_user.get("screen_name", "unknown")
)
user_profile_image = actual_user.get("avatar", {}).get("image_url") or actual_user_legacy.get("profile_image_url_https", "")
user_verified = bool(actual_user.get("is_blue_verified") or actual_user_legacy.get("verified", False))
retweeted_by = None # type: Optional[str]
if is_retweet:
retweeted_by = user_core.get("screen_name") or user_legacy.get("screen_name", "unknown")
@@ -946,13 +872,7 @@ class TwitterClient:
return Tweet(
id=actual_data.get("rest_id", ""),
text=actual_legacy.get("full_text", ""),
author=Author(
id=actual_user.get("rest_id", ""),
name=user_name,
screen_name=user_screen_name,
profile_image_url=user_profile_image,
verified=user_verified,
),
author=author,
metrics=Metrics(
likes=_to_int(actual_legacy.get("favorite_count"), 0),
retweets=_to_int(actual_legacy.get("retweet_count"), 0),
@@ -971,6 +891,56 @@ class TwitterClient:
)
def _extract_media(legacy):
# type: (Dict[str, Any]) -> List[TweetMedia]
"""Extract media items from tweet legacy data."""
media = [] # type: List[TweetMedia]
for media_item in _deep_get(legacy, "extended_entities", "media") or []:
media_type = media_item.get("type", "")
if media_type == "photo":
media.append(
TweetMedia(
type="photo",
url=media_item.get("media_url_https", ""),
width=_deep_get(media_item, "original_info", "width"),
height=_deep_get(media_item, "original_info", "height"),
)
)
elif media_type in {"video", "animated_gif"}:
variants = media_item.get("video_info", {}).get("variants", [])
mp4_variants = [v for v in variants if v.get("content_type") == "video/mp4"]
mp4_variants.sort(key=lambda v: v.get("bitrate", 0), reverse=True)
media.append(
TweetMedia(
type=media_type,
url=mp4_variants[0]["url"] if mp4_variants else media_item.get("media_url_https", ""),
width=_deep_get(media_item, "original_info", "width"),
height=_deep_get(media_item, "original_info", "height"),
)
)
return media
def _extract_author(user_data, user_legacy):
# type: (Dict[str, Any], Dict[str, Any]) -> Author
"""Extract Author from user result data."""
user_core = user_data.get("core", {})
return Author(
id=user_data.get("rest_id", ""),
name=user_core.get("name") or user_legacy.get("name") or user_data.get("name", "Unknown"),
screen_name=(
user_core.get("screen_name")
or user_legacy.get("screen_name")
or user_data.get("screen_name", "unknown")
),
profile_image_url=(
user_data.get("avatar", {}).get("image_url")
or user_legacy.get("profile_image_url_https", "")
),
verified=bool(user_data.get("is_blue_verified") or user_legacy.get("verified", False)),
)
def _deep_get(data, *keys):
# type: (Any, *Any) -> Any
"""Safely get nested dict/list values. Supports int keys for list access."""
@@ -990,10 +960,10 @@ def _deep_get(data, *keys):
def _extract_cursor(content):
# type: (Dict[str, Any]) -> Optional[str]
"""Extract pagination cursor from timeline content."""
"""Extract Bottom pagination cursor from timeline content."""
if content.get("cursorType") == "Bottom":
return content.get("value")
if content.get("entryType") == "TimelineTimelineCursor":
if content.get("entryType") == "TimelineTimelineCursor" and content.get("cursorType") == "Bottom":
return content.get("value")
return None

12
twitter_cli/constants.py Normal file
View File

@@ -0,0 +1,12 @@
"""Shared constants for twitter-cli."""
BEARER_TOKEN = (
"AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs"
"%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA"
)
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)

View File

@@ -1,11 +1,11 @@
"""Serialization helpers for Tweet models."""
"""Serialization helpers for Tweet and UserProfile models."""
from __future__ import annotations
import json
from typing import Any, Dict, Iterable, List, Optional
from .models import Author, Metrics, Tweet, TweetMedia
from .models import Author, Metrics, Tweet, TweetMedia, UserProfile
def tweet_to_dict(tweet: Tweet) -> Dict[str, Any]:
@@ -129,6 +129,34 @@ def tweets_to_json(tweets: Iterable[Tweet]) -> str:
return json.dumps([tweet_to_dict(tweet) for tweet in tweets], ensure_ascii=False, indent=2)
def user_profile_to_dict(user: UserProfile) -> Dict[str, Any]:
"""Convert a UserProfile dataclass into a JSON-safe dict."""
return {
"id": user.id,
"name": user.name,
"screenName": user.screen_name,
"bio": user.bio,
"location": user.location,
"url": user.url,
"followers": user.followers_count,
"following": user.following_count,
"tweets": user.tweets_count,
"likes": user.likes_count,
"verified": user.verified,
"profileImageUrl": user.profile_image_url,
"createdAt": user.created_at,
}
def users_to_json(users: Iterable[UserProfile]) -> str:
"""Serialize UserProfile objects to pretty JSON."""
return json.dumps(
[user_profile_to_dict(user) for user in users],
ensure_ascii=False,
indent=2,
)
def _optional_int(value: Any) -> Optional[int]:
"""Parse an optional integer value."""
if value is None: