Initial commit: twitter-cli v0.1.0

This commit is contained in:
jackwener
2026-03-04 17:56:42 +08:00
commit 16752c3115
14 changed files with 2133 additions and 0 deletions

3
twitter_cli/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""twitter-cli: A CLI for Twitter/X."""
__version__ = "0.1.0"

125
twitter_cli/auth.py Normal file
View File

@@ -0,0 +1,125 @@
"""Cookie authentication for Twitter/X.
Supports:
1. Environment variables: TWITTER_AUTH_TOKEN + TWITTER_CT0
2. Auto-extract from browser via browser-cookie3 (subprocess)
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
from typing import Dict, Optional
def load_from_env() -> Optional[Dict[str, str]]:
"""Load cookies from environment variables."""
auth_token = os.environ.get("TWITTER_AUTH_TOKEN", "")
ct0 = os.environ.get("TWITTER_CT0", "")
if auth_token and ct0:
return {"auth_token": auth_token, "ct0": ct0}
return None
def extract_from_browser(browser: str = "chrome") -> Optional[Dict[str, str]]:
"""Auto-extract cookies from local browser using browser-cookie3.
Runs in a subprocess to avoid SQLite database lock issues when the
browser is running.
"""
extract_script = '''
import json, sys
try:
import browser_cookie3
except ImportError:
print(json.dumps({"error": "browser-cookie3 not installed"}))
sys.exit(1)
browser_funcs = {
"chrome": browser_cookie3.chrome,
"firefox": browser_cookie3.firefox,
"edge": browser_cookie3.edge,
"brave": browser_cookie3.brave,
}
browser_name = "%s"
fn = browser_funcs.get(browser_name)
if not fn:
print(json.dumps({"error": "Unsupported browser: " + browser_name}))
sys.exit(1)
try:
jar = fn()
except Exception as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)
result = {}
for cookie in jar:
domain = cookie.domain or ""
if domain.endswith(".x.com") or domain.endswith(".twitter.com") or domain in ("x.com", "twitter.com", ".x.com", ".twitter.com"):
if cookie.name == "auth_token":
result["auth_token"] = cookie.value
elif cookie.name == "ct0":
result["ct0"] = cookie.value
if "auth_token" in result and "ct0" in result:
print(json.dumps(result))
else:
print(json.dumps({"error": "Could not find auth_token and ct0 cookies. Make sure you are logged into x.com in " + browser_name + "."}))
sys.exit(1)
''' % browser
try:
result = subprocess.run(
[sys.executable, "-c", extract_script],
capture_output=True,
text=True,
timeout=15,
)
output = result.stdout.strip()
if not output:
stderr = result.stderr.strip()
if stderr:
# Maybe browser-cookie3 not installed, try with uv
result2 = subprocess.run(
["uv", "run", "--with", "browser-cookie3", "python3", "-c", extract_script],
capture_output=True,
text=True,
timeout=30,
)
output = result2.stdout.strip()
if not output:
return None
data = json.loads(output)
if "error" in data:
return None
return {"auth_token": data["auth_token"], "ct0": data["ct0"]}
except (subprocess.TimeoutExpired, json.JSONDecodeError, KeyError, FileNotFoundError):
return None
def get_cookies(browser: str = "chrome") -> Dict[str, str]:
"""Get Twitter cookies. Priority: env vars -> browser extraction.
Returns dict with 'auth_token' and 'ct0' keys.
Raises RuntimeError if no cookies found.
"""
# 1. Try environment variables
env_cookies = load_from_env()
if env_cookies:
return env_cookies
# 2. Try browser extraction
browser_cookies = extract_from_browser(browser)
if browser_cookies:
return browser_cookies
raise RuntimeError(
"No Twitter cookies found.\n"
"Option 1: Set TWITTER_AUTH_TOKEN and TWITTER_CT0 environment variables\n"
"Option 2: Make sure you are logged into x.com in your browser"
)

290
twitter_cli/cli.py Normal file
View File

@@ -0,0 +1,290 @@
"""CLI entry point for twitter-cli.
Usage:
twitter feed # full pipeline: fetch → filter → AI summarize
twitter feed --count 50 # custom fetch count
twitter feed --no-summary # skip AI summary
twitter feed --no-filter # skip filtering
twitter feed --json # JSON output
twitter feed --browser firefox # specify browser for cookie extraction
twitter bookmarks # fetch bookmarks
twitter bookmarks --count 30
twitter feed --input tweets.json # summarize existing data
twitter feed --output out.json # save filtered tweets
"""
from __future__ import annotations
import json
import logging
import sys
import time
from pathlib import Path
from typing import List
import click
from rich.console import Console
from . import __version__
from .auth import get_cookies
from .client import TwitterClient
from .config import load_config
from .filter import filter_tweets
from .formatter import (
print_filter_stats,
print_tweet_table,
tweets_to_json,
)
from .models import Author, Metrics, Tweet, TweetMedia
from .summarizer import summarize
console = Console()
def _setup_logging(verbose):
# type: (bool) -> None
level = logging.DEBUG if verbose else logging.WARNING
logging.basicConfig(
level=level,
format="%(levelname)s %(name)s: %(message)s",
stream=sys.stderr,
)
def _load_tweets_from_json(path):
# type: (str) -> List[Tweet]
"""Load tweets from a JSON file (previously exported)."""
raw = Path(path).read_text(encoding="utf-8")
items = json.loads(raw)
tweets = []
for d in items:
author_data = d.get("author", {})
metrics_data = d.get("metrics", {})
media_data = d.get("media", [])
author = Author(
id=author_data.get("id", ""),
name=author_data.get("name", ""),
screen_name=author_data.get("screenName", ""),
profile_image_url=author_data.get("profileImageUrl", ""),
verified=author_data.get("verified", False),
)
metrics = Metrics(
likes=metrics_data.get("likes", 0),
retweets=metrics_data.get("retweets", 0),
replies=metrics_data.get("replies", 0),
quotes=metrics_data.get("quotes", 0),
views=metrics_data.get("views", 0),
bookmarks=metrics_data.get("bookmarks", 0),
)
media = [
TweetMedia(
type=m.get("type", ""),
url=m.get("url", ""),
width=m.get("width"),
height=m.get("height"),
)
for m in media_data
]
qt_data = d.get("quotedTweet")
quoted_tweet = None
if qt_data:
qt_author = qt_data.get("author", {})
quoted_tweet = Tweet(
id=qt_data.get("id", ""),
text=qt_data.get("text", ""),
author=Author(
id="",
name=qt_author.get("name", ""),
screen_name=qt_author.get("screenName", ""),
),
metrics=Metrics(),
created_at="",
)
tweets.append(Tweet(
id=d.get("id", ""),
text=d.get("text", ""),
author=author,
metrics=metrics,
created_at=d.get("createdAt", ""),
media=media,
urls=d.get("urls", []),
is_retweet=d.get("isRetweet", False),
lang=d.get("lang", ""),
retweeted_by=d.get("retweetedBy"),
quoted_tweet=quoted_tweet,
score=d.get("score", 0.0),
))
return tweets
@click.group()
@click.option("--verbose", "-v", is_flag=True, help="Enable debug logging.")
@click.version_option(version=__version__)
def cli(verbose):
# type: (bool) -> None
"""twitter — Twitter/X CLI tool 🐦"""
_setup_logging(verbose)
# ===== Feed =====
@cli.command()
@click.option("--count", "-n", type=int, default=None, help="Number of tweets to fetch.")
@click.option("--json", "as_json", is_flag=True, help="Output as JSON.")
@click.option("--browser", "-b", default="chrome", help="Browser to extract cookies from.")
@click.option("--input", "-i", "input_file", type=str, default=None, help="Load tweets from JSON file.")
@click.option("--output", "-o", "output_file", type=str, default=None, help="Save filtered tweets to JSON file.")
@click.option("--no-filter", is_flag=True, help="Skip filtering.")
@click.option("--no-summary", is_flag=True, help="Skip AI summary.")
def feed(count, as_json, browser, input_file, output_file, no_filter, no_summary):
# type: (int, bool, str, str, str, bool, bool) -> None
"""Fetch home timeline — full pipeline: fetch → filter → AI summarize."""
config = load_config()
# Step 1: Get tweets
if input_file:
console.print("📂 Loading tweets from %s..." % input_file)
tweets = _load_tweets_from_json(input_file)
console.print(" Loaded %d tweets" % len(tweets))
else:
fetch_count = count or config.get("fetch", {}).get("count", 50)
console.print("\n🔐 Getting Twitter cookies...")
try:
cookies = get_cookies(browser)
except RuntimeError as e:
console.print("[red]❌ %s[/red]" % e)
sys.exit(1)
client = TwitterClient(cookies["auth_token"], cookies["ct0"])
console.print("📡 Fetching home timeline (%d tweets)...\n" % fetch_count)
start = time.time()
tweets = client.fetch_home_timeline(fetch_count)
elapsed = time.time() - start
console.print("✅ Fetched %d tweets in %.1fs\n" % (len(tweets), elapsed))
# Step 2: Filter
if no_filter:
filtered = tweets
else:
filter_config = config.get("filter", {})
original_count = len(tweets)
filtered = filter_tweets(tweets, filter_config)
print_filter_stats(original_count, filtered, console)
console.print()
# Save filtered tweets
if output_file:
Path(output_file).write_text(tweets_to_json(filtered), encoding="utf-8")
console.print("💾 Saved filtered tweets to %s\n" % output_file)
# Output
if as_json:
click.echo(tweets_to_json(filtered))
return
print_tweet_table(filtered, console)
console.print()
# Step 3: AI Summary
if no_summary:
return
ai_config = config.get("ai", {})
if not ai_config.get("api_key"):
console.print(
"[yellow]⚠️ AI summary skipped: no API key configured.[/yellow]\n"
" Set ai.api_key in config.yaml or export AI_API_KEY=your_key"
)
return
try:
console.print("🤖 Calling AI (%s/%s)..." % (ai_config.get("provider", "openai"), ai_config.get("model", "")))
summary = summarize(filtered, ai_config)
console.print("\n" + "" * 50)
console.print("📝 AI Summary")
console.print("" * 50 + "\n")
console.print(summary)
console.print()
except Exception as e:
console.print("[red]❌ AI summary failed: %s[/red]" % e)
# ===== Bookmarks =====
@cli.command()
@click.option("--count", "-n", type=int, default=None, help="Number of tweets to fetch.")
@click.option("--json", "as_json", is_flag=True, help="Output as JSON.")
@click.option("--browser", "-b", default="chrome", help="Browser to extract cookies from.")
@click.option("--output", "-o", "output_file", type=str, default=None, help="Save tweets to JSON file.")
@click.option("--no-filter", is_flag=True, help="Skip filtering.")
@click.option("--no-summary", is_flag=True, help="Skip AI summary.")
def bookmarks(count, as_json, browser, output_file, no_filter, no_summary):
# type: (int, bool, str, str, bool, bool) -> None
"""Fetch bookmarked tweets."""
config = load_config()
fetch_count = count or 50
console.print("\n🔐 Getting Twitter cookies...")
try:
cookies = get_cookies(browser)
except RuntimeError as e:
console.print("[red]❌ %s[/red]" % e)
sys.exit(1)
client = TwitterClient(cookies["auth_token"], cookies["ct0"])
console.print("🔖 Fetching bookmarks (%d tweets)...\n" % fetch_count)
start = time.time()
tweets = client.fetch_bookmarks(fetch_count)
elapsed = time.time() - start
console.print("✅ Fetched %d bookmarks in %.1fs\n" % (len(tweets), elapsed))
# Filter
if no_filter:
filtered = tweets
else:
filter_config = config.get("filter", {})
original_count = len(tweets)
filtered = filter_tweets(tweets, filter_config)
print_filter_stats(original_count, filtered, console)
console.print()
# Save
if output_file:
Path(output_file).write_text(tweets_to_json(filtered), encoding="utf-8")
console.print("💾 Saved to %s\n" % output_file)
# Output
if as_json:
click.echo(tweets_to_json(filtered))
return
print_tweet_table(filtered, console, title="🔖 Bookmarks — %d tweets" % len(filtered))
console.print()
# AI Summary
if no_summary:
return
ai_config = config.get("ai", {})
if not ai_config.get("api_key"):
console.print(
"[yellow]⚠️ AI summary skipped: no API key configured.[/yellow]"
)
return
try:
console.print("🤖 Calling AI...")
summary = summarize(filtered, ai_config)
console.print("\n" + "" * 50)
console.print("📝 AI Summary")
console.print("" * 50 + "\n")
console.print(summary)
except Exception as e:
console.print("[red]❌ AI summary failed: %s[/red]" % e)
if __name__ == "__main__":
cli()

470
twitter_cli/client.py Normal file
View File

@@ -0,0 +1,470 @@
"""Twitter GraphQL API client.
Uses the same internal GraphQL endpoint that the Twitter web app uses,
authenticated via cookies (auth_token + ct0). QueryId is resolved
dynamically using a three-tier strategy.
"""
from __future__ import annotations
import json
import logging
import math
import re
import ssl
import urllib.request
from typing import Any, Callable, Dict, List, Optional, Tuple
from .models import Author, Metrics, Tweet, TweetMedia
logger = logging.getLogger(__name__)
# Public bearer token shared by all Twitter web clients
BEARER_TOKEN = (
"AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs"
"%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA"
)
# Last-resort fallback query IDs
FALLBACK_QUERY_IDS = {
"HomeTimeline": "HJFjzBgCs16TqxewQOeLNg",
"Bookmarks": "VFdMm9iVZxlU6hD86gfW_A",
}
# Community-maintained API definition (auto-updated daily)
TWITTER_OPENAPI_URL = (
"https://raw.githubusercontent.com/fa0311/twitter-openapi/"
"main/src/config/placeholder.json"
)
# Default features flags required by the GraphQL endpoint
FEATURES = {
"rweb_tipjar_consumption_enabled": True,
"responsive_web_graphql_exclude_directive_enabled": True,
"verified_phone_label_enabled": False,
"creator_subscriptions_tweet_preview_api_enabled": True,
"responsive_web_graphql_timeline_navigation_enabled": True,
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
"communities_web_enable_tweet_community_results_fetch": True,
"c9s_tweet_anatomy_moderator_badge_enabled": True,
"articles_preview_enabled": True,
"responsive_web_edit_tweet_api_enabled": True,
"graphql_is_translatable_rweb_tweet_is_translatable_enabled": True,
"view_counts_everywhere_api_enabled": True,
"longform_notetweets_consumption_enabled": True,
"responsive_web_twitter_article_tweet_consumption_enabled": True,
"tweet_awards_web_tipping_enabled": False,
"creator_subscriptions_quote_tweet_preview_enabled": False,
"freedom_of_speech_not_reach_fetch_enabled": True,
"standardized_nudges_misinfo": True,
"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True,
"rweb_video_timestamps_enabled": True,
"longform_notetweets_rich_text_read_enabled": True,
"longform_notetweets_inline_media_enabled": True,
"responsive_web_enhance_cards_enabled": False,
}
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
# Module-level cache for query IDs
_cached_query_ids = {} # type: Dict[str, str]
_bundles_scanned = False
def _create_ssl_context():
# type: () -> ssl.SSLContext
"""Create a permissive SSL context for urllib."""
ctx = ssl.create_default_context()
return ctx
def _url_fetch(url, headers=None):
# type: (str, Optional[Dict[str, str]]) -> str
"""Simple URL fetch using urllib."""
req = urllib.request.Request(url)
if headers:
for k, v in headers.items():
req.add_header(k, v)
ctx = _create_ssl_context()
with urllib.request.urlopen(req, context=ctx, timeout=30) as resp:
return resp.read().decode("utf-8")
def _scan_bundles():
# type: () -> None
"""Tier 1: Scan Twitter's main-page JS bundles to extract queryId/operationName pairs."""
global _bundles_scanned
if _bundles_scanned:
return
_bundles_scanned = True
try:
html = _url_fetch("https://x.com", {"user-agent": USER_AGENT})
script_pattern = re.compile(
r'(?:src|href)=["\']'
r'(https://abs\.twimg\.com/responsive-web/client-web[^"\']+\.js)'
r'["\']'
)
script_urls = script_pattern.findall(html)
for url in script_urls:
try:
js = _url_fetch(url)
op_pattern = re.compile(
r'queryId:\s*"([A-Za-z0-9_-]+)"[^}]{0,200}'
r'operationName:\s*"([^"]+)"'
)
for m in op_pattern.finditer(js):
qid, name = m.group(1), m.group(2)
if name not in _cached_query_ids:
_cached_query_ids[name] = qid
except Exception:
continue
count = len(_cached_query_ids)
logger.info("Scanned %d JS bundles, found %d operations", len(script_urls), count)
except Exception as e:
logger.warning("Failed to scan JS bundles: %s", e)
def _fetch_from_github(operation_name):
# type: (str) -> Optional[str]
"""Tier 2: Fetch queryId from community-maintained twitter-openapi."""
try:
logger.info("Fetching latest queryId from GitHub (twitter-openapi)...")
data_str = _url_fetch(TWITTER_OPENAPI_URL)
data = json.loads(data_str)
op = data.get(operation_name, {})
qid = op.get("queryId")
if qid:
logger.info("Found %s queryId from GitHub: %s", operation_name, qid)
return qid
return None
except Exception as e:
logger.warning("GitHub lookup failed: %s", e)
return None
def _resolve_query_id(operation_name):
# type: (str) -> str
"""Resolve queryId using three-tier strategy: bundle scan -> GitHub -> fallback."""
if operation_name in _cached_query_ids:
return _cached_query_ids[operation_name]
logger.info("Auto-detecting %s queryId...", operation_name)
# Tier 1: JS bundle scan
_scan_bundles()
if operation_name in _cached_query_ids:
logger.info("Found %s queryId: %s", operation_name, _cached_query_ids[operation_name])
return _cached_query_ids[operation_name]
# Tier 2: GitHub
github_id = _fetch_from_github(operation_name)
if github_id:
_cached_query_ids[operation_name] = github_id
return github_id
# Tier 3: Hardcoded fallback
fallback = FALLBACK_QUERY_IDS.get(operation_name)
if fallback:
logger.info("Using hardcoded fallback queryId for %s: %s", operation_name, fallback)
_cached_query_ids[operation_name] = fallback
return fallback
raise RuntimeError(
'Cannot resolve queryId for "%s" — all detection methods failed' % operation_name
)
class TwitterClient:
"""Twitter GraphQL API client using cookie authentication."""
def __init__(self, auth_token, ct0):
# type: (str, str) -> None
self._auth_token = auth_token
self._ct0 = ct0
def fetch_home_timeline(self, count=20):
# type: (int) -> List[Tweet]
"""Fetch home timeline tweets."""
query_id = _resolve_query_id("HomeTimeline")
return self._fetch_timeline(
query_id,
"HomeTimeline",
count,
lambda data: _deep_get(data, "data", "home", "home_timeline_urt", "instructions"),
)
def fetch_bookmarks(self, count=50):
# type: (int) -> List[Tweet]
"""Fetch bookmarked tweets."""
query_id = _resolve_query_id("Bookmarks")
def get_instructions(data):
# type: (Any) -> Any
result = _deep_get(data, "data", "bookmark_timeline", "timeline", "instructions")
if result is None:
result = _deep_get(data, "data", "bookmark_timeline_v2", "timeline", "instructions")
return result
return self._fetch_timeline(query_id, "Bookmarks", count, get_instructions)
def _fetch_timeline(self, query_id, operation_name, count, get_instructions, extra_variables=None):
# type: (str, str, int, Callable, Optional[Dict[str, Any]]) -> List[Tweet]
"""Generic timeline fetcher with pagination and deduplication."""
tweets = [] # type: List[Tweet]
cursor = None # type: Optional[str]
attempts = 0
max_attempts = int(math.ceil(count / 20.0)) + 2
while len(tweets) < count and attempts < max_attempts:
attempts += 1
variables = {
"count": min(count - len(tweets) + 5, 40),
"includePromotedContent": False,
"latestControlAvailable": True,
"requestContext": "launch",
} # type: Dict[str, Any]
if extra_variables:
variables.update(extra_variables)
if cursor:
variables["cursor"] = cursor
url = "https://x.com/i/api/graphql/%s/%s?" % (query_id, operation_name)
url += "variables=%s&features=%s" % (
urllib.request.quote(json.dumps(variables)),
urllib.request.quote(json.dumps(FEATURES)),
)
data = self._api_get(url)
new_tweets, next_cursor = self._parse_timeline_response(data, get_instructions)
seen_ids = {t.id for t in tweets}
for tweet in new_tweets:
if tweet.id not in seen_ids:
tweets.append(tweet)
seen_ids.add(tweet.id)
if not next_cursor or not new_tweets:
break
cursor = next_cursor
return tweets[:count]
def _build_headers(self):
# type: () -> Dict[str, str]
return {
"Authorization": "Bearer %s" % BEARER_TOKEN,
"Cookie": "auth_token=%s; ct0=%s" % (self._auth_token, self._ct0),
"X-Csrf-Token": self._ct0,
"X-Twitter-Active-User": "yes",
"X-Twitter-Auth-Type": "OAuth2Session",
"X-Twitter-Client-Language": "en",
"Content-Type": "application/json",
"User-Agent": USER_AGENT,
"Referer": "https://x.com/home",
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9",
}
def _api_get(self, url):
# type: (str) -> Any
"""Make authenticated GET request to Twitter API."""
headers = self._build_headers()
req = urllib.request.Request(url)
for k, v in headers.items():
req.add_header(k, v)
ctx = _create_ssl_context()
try:
with urllib.request.urlopen(req, context=ctx, timeout=30) as resp:
body = resp.read().decode("utf-8")
return json.loads(body)
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError("Twitter API error %d: %s" % (e.code, body[:500]))
def _parse_timeline_response(self, data, get_instructions):
# type: (Any, Callable) -> Tuple[List[Tweet], Optional[str]]
"""Parse timeline GraphQL response into tweets + next cursor."""
tweets = [] # type: List[Tweet]
next_cursor = None # type: Optional[str]
try:
instructions = get_instructions(data)
if not isinstance(instructions, list):
logger.warning("No instructions found in response")
return tweets, next_cursor
for instruction in instructions:
entries = instruction.get("entries") or instruction.get("moduleItems") or []
for entry in entries:
content = entry.get("content", {})
# Handle cursor entries
if content.get("cursorType") == "Bottom" or content.get("entryType") == "TimelineTimelineCursor":
val = content.get("value")
if val:
next_cursor = val
continue
# Handle single tweet entries
item_content = content.get("itemContent", {})
tweet_results = item_content.get("tweet_results", {})
result = tweet_results.get("result")
if result:
tweet = self._parse_tweet_result(result)
if tweet:
tweets.append(tweet)
# Handle conversation module (tweet threads)
items = content.get("items", [])
for item in items:
nested = (
item.get("item", {})
.get("itemContent", {})
.get("tweet_results", {})
.get("result")
)
if nested:
tweet = self._parse_tweet_result(nested)
if tweet:
tweets.append(tweet)
except Exception as e:
logger.warning("Error parsing timeline response: %s", e)
return tweets, next_cursor
def _parse_tweet_result(self, result):
# type: (Dict[str, Any]) -> Optional[Tweet]
"""Parse a single TweetResult from GraphQL response."""
try:
tweet_data = result
# Handle TweetWithVisibilityResults wrapper
if result.get("__typename") == "TweetWithVisibilityResults" and result.get("tweet"):
tweet_data = result["tweet"]
if tweet_data.get("__typename") == "TweetTombstone":
return None
if not tweet_data.get("legacy") or not tweet_data.get("core"):
return None
legacy = tweet_data["legacy"]
user = tweet_data["core"]["user_results"]["result"]
user_legacy = user.get("legacy", {})
user_core = user.get("core", {})
# Check if this is a retweet
is_retweet = bool(legacy.get("retweeted_status_result", {}).get("result"))
actual_data = tweet_data
actual_legacy = legacy
actual_user = user
actual_user_legacy = user_legacy
if is_retweet:
rt_result = legacy["retweeted_status_result"]["result"]
# Handle wrapped retweet
if rt_result.get("__typename") == "TweetWithVisibilityResults" and rt_result.get("tweet"):
rt_result = rt_result["tweet"]
if rt_result.get("legacy") and rt_result.get("core"):
actual_data = rt_result
actual_legacy = rt_result["legacy"]
actual_user = rt_result["core"]["user_results"]["result"]
actual_user_legacy = actual_user.get("legacy", {})
# Parse media
media = [] # type: List[TweetMedia]
ext_media = actual_legacy.get("extended_entities", {}).get("media", [])
for m in ext_media:
m_type = m.get("type", "")
if m_type == "photo":
media.append(TweetMedia(
type="photo",
url=m.get("media_url_https", ""),
width=_deep_get(m, "original_info", "width"),
height=_deep_get(m, "original_info", "height"),
))
elif m_type in ("video", "animated_gif"):
variants = m.get("video_info", {}).get("variants", [])
mp4_variants = [v for v in variants if v.get("content_type") == "video/mp4"]
mp4_variants.sort(key=lambda v: v.get("bitrate", 0), reverse=True)
video_url = mp4_variants[0]["url"] if mp4_variants else m.get("media_url_https", "")
media.append(TweetMedia(
type=m_type,
url=video_url,
width=_deep_get(m, "original_info", "width"),
height=_deep_get(m, "original_info", "height"),
))
# Parse URLs
urls = [u.get("expanded_url", "") for u in actual_legacy.get("entities", {}).get("urls", [])]
# Parse quoted tweet
quoted_tweet = None # type: Optional[Tweet]
quoted_result = actual_data.get("quoted_status_result", {}).get("result")
if quoted_result:
quoted_tweet = self._parse_tweet_result(quoted_result)
# Extract user info — try user.core (new API), then user.legacy (old API)
au = actual_user
aul = actual_user_legacy
auc = au.get("core", {})
user_name = auc.get("name") or aul.get("name") or au.get("name", "Unknown")
user_screen_name = auc.get("screen_name") or aul.get("screen_name") or au.get("screen_name", "unknown")
user_profile_image = au.get("avatar", {}).get("image_url") or aul.get("profile_image_url_https", "")
user_verified = au.get("is_blue_verified") or aul.get("verified", False)
# Retweeted by info
rt_screen_name = None # type: Optional[str]
if is_retweet:
rt_screen_name = user_core.get("screen_name") or user_legacy.get("screen_name", "unknown")
return Tweet(
id=actual_data.get("rest_id", ""),
text=actual_legacy.get("full_text", ""),
author=Author(
id=au.get("rest_id", ""),
name=user_name,
screen_name=user_screen_name,
profile_image_url=user_profile_image,
verified=bool(user_verified),
),
metrics=Metrics(
likes=actual_legacy.get("favorite_count", 0),
retweets=actual_legacy.get("retweet_count", 0),
replies=actual_legacy.get("reply_count", 0),
quotes=actual_legacy.get("quote_count", 0),
views=int(actual_data.get("views", {}).get("count", "0") or "0"),
bookmarks=actual_legacy.get("bookmark_count", 0),
),
created_at=actual_legacy.get("created_at", ""),
media=media,
urls=urls,
is_retweet=is_retweet,
retweeted_by=rt_screen_name,
quoted_tweet=quoted_tweet,
lang=actual_legacy.get("lang", ""),
)
except Exception as e:
logger.warning("Failed to parse tweet: %s", e)
return None
def _deep_get(d, *keys):
# type: (Any, *str) -> Any
"""Safely get a nested value from a dict."""
for key in keys:
if isinstance(d, dict):
d = d.get(key)
else:
return None
return d

175
twitter_cli/config.py Normal file
View File

@@ -0,0 +1,175 @@
"""Configuration loader — reads config.yaml and merges with defaults.
Uses a simple built-in YAML parser to avoid adding PyYAML as a dependency.
"""
from __future__ import annotations
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Union
# Default configuration
DEFAULT_CONFIG = {
"fetch": {
"count": 50,
},
"filter": {
"mode": "topN",
"topN": 20,
"minScore": 50,
"lang": [],
"excludeRetweets": False,
"weights": {
"likes": 1.0,
"retweets": 3.0,
"replies": 2.0,
"bookmarks": 5.0,
"views_log": 0.5,
},
},
"ai": {
"provider": "openai",
"api_key": "",
"model": "doubao-seed-2.0-code",
"base_url": "https://ark.cn-beijing.volces.com/api/coding",
"language": "zh-CN",
},
} # type: Dict[str, Any]
def _parse_value(s):
# type: (str) -> Union[str, int, float, bool]
"""Parse a scalar YAML value."""
if s == "true":
return True
if s == "false":
return False
# Remove surrounding quotes
if (s.startswith('"') and s.endswith('"')) or (s.startswith("'") and s.endswith("'")):
return s[1:-1]
# Try number
try:
if "." in s:
return float(s)
return int(s)
except ValueError:
return s
def _parse_yaml(text):
# type: (str) -> Dict[str, Any]
"""Minimal YAML parser for our flat config structure.
Supports: scalars, inline arrays [...], indented "- item" arrays,
nested objects via indentation.
"""
result = {} # type: Dict[str, Any]
lines = text.split("\n")
stack = [{"indent": -1, "obj": result}] # type: List[Dict[str, Any]]
for line in lines:
# Strip comments and trailing whitespace
trimmed = re.sub(r"#.*$", "", line).rstrip()
if not trimmed or not trimmed.strip():
continue
indent = len(line) - len(line.lstrip())
content = trimmed.strip()
# Handle "- item" array entries
if content.startswith("- "):
parent = stack[-1]["obj"]
keys = list(parent.keys())
if keys:
last_key = keys[-1]
if not isinstance(parent[last_key], list):
parent[last_key] = []
parent[last_key].append(_parse_value(content[2:].strip()))
continue
colon_idx = content.find(":")
if colon_idx == -1:
continue
key = content[:colon_idx].strip()
raw_value = content[colon_idx + 1:].strip()
# Pop stack to find parent at correct indentation
while len(stack) > 1 and stack[-1]["indent"] >= indent:
stack.pop()
parent = stack[-1]["obj"]
if raw_value == "" or raw_value == "|":
# Nested object
child = {} # type: Dict[str, Any]
parent[key] = child
stack.append({"indent": indent, "obj": child})
elif raw_value.startswith("[") and raw_value.endswith("]"):
# Inline array
inner = raw_value[1:-1].strip()
if inner == "":
parent[key] = []
else:
parent[key] = [_parse_value(s.strip()) for s in inner.split(",")]
else:
parent[key] = _parse_value(raw_value)
return result
def _deep_merge(target, source):
# type: (Dict[str, Any], Dict[str, Any]) -> Dict[str, Any]
"""Deep merge source into target (source values override target)."""
result = dict(target)
for key in source:
if (
isinstance(source[key], dict)
and isinstance(result.get(key), dict)
):
result[key] = _deep_merge(result[key], source[key])
else:
result[key] = source[key]
return result
def load_config(config_path=None):
# type: (str) -> Dict[str, Any]
"""Load config from config.yaml, merged with defaults."""
if config_path is None:
# Look in current directory first, then script directory
candidates = [
Path.cwd() / "config.yaml",
Path(__file__).parent.parent / "config.yaml",
]
for p in candidates:
if p.exists():
config_path = str(p)
break
if config_path and Path(config_path).exists():
try:
raw = Path(config_path).read_text(encoding="utf-8")
parsed = _parse_yaml(raw)
config = _deep_merge(DEFAULT_CONFIG, parsed)
except Exception:
config = dict(DEFAULT_CONFIG)
else:
config = dict(DEFAULT_CONFIG)
# Ensure nested dicts exist
config.setdefault("fetch", DEFAULT_CONFIG["fetch"])
config.setdefault("filter", DEFAULT_CONFIG["filter"])
config.setdefault("ai", DEFAULT_CONFIG["ai"])
# Deep-copy filter weights if needed
if "filter" in config and "weights" not in config["filter"]:
config["filter"]["weights"] = dict(DEFAULT_CONFIG["filter"]["weights"])
# AI API key fallback to env var
ai = config.get("ai", {})
if not ai.get("api_key"):
ai["api_key"] = os.environ.get("AI_API_KEY", "")
return config

90
twitter_cli/filter.py Normal file
View File

@@ -0,0 +1,90 @@
"""Tweet filtering and engagement scoring.
Scores tweets by a weighted engagement formula and filters by
configurable rules (topN, min score, language, etc.).
"""
from __future__ import annotations
import math
from typing import Dict, List
from .models import Tweet
# Type alias for filter weights dict
FilterWeights = Dict[str, float]
DEFAULT_WEIGHTS = {
"likes": 1.0,
"retweets": 3.0,
"replies": 2.0,
"bookmarks": 5.0,
"views_log": 0.5,
}
def score_tweet(tweet, weights=None):
# type: (Tweet, FilterWeights) -> float
"""Calculate engagement score for a single tweet.
Formula:
score = w_likes × likes
+ w_retweets × retweets
+ w_replies × replies
+ w_bookmarks × bookmarks
+ w_views_log × log10(views)
"""
if weights is None:
weights = DEFAULT_WEIGHTS
m = tweet.metrics
return (
weights.get("likes", 1.0) * m.likes
+ weights.get("retweets", 3.0) * m.retweets
+ weights.get("replies", 2.0) * m.replies
+ weights.get("bookmarks", 5.0) * m.bookmarks
+ weights.get("views_log", 0.5) * math.log10(max(m.views, 1))
)
def filter_tweets(tweets, config):
# type: (List[Tweet], dict) -> List[Tweet]
"""Filter and rank tweets according to config.
Config keys:
mode: "topN" | "score" | "all"
topN: int
minScore: float
lang: list[str] (empty = no filter)
excludeRetweets: bool
weights: dict
"""
filtered = list(tweets)
# 1. Language filter
lang_filter = config.get("lang", [])
if lang_filter:
filtered = [t for t in filtered if t.lang in lang_filter]
# 2. Exclude retweets
if config.get("excludeRetweets", False):
filtered = [t for t in filtered if not t.is_retweet]
# 3. Score all tweets
weights = config.get("weights", DEFAULT_WEIGHTS)
for t in filtered:
t.score = round(score_tweet(t, weights), 1)
# 4. Sort by score (descending)
filtered.sort(key=lambda t: t.score, reverse=True)
# 5. Apply filter mode
mode = config.get("mode", "topN")
if mode == "topN":
top_n = config.get("topN", 20)
return filtered[:top_n]
elif mode == "score":
min_score = config.get("minScore", 50)
return [t for t in filtered if t.score >= min_score]
else:
return filtered

207
twitter_cli/formatter.py Normal file
View File

@@ -0,0 +1,207 @@
"""Tweet formatter for terminal output (rich) and JSON export."""
from __future__ import annotations
import json
from typing import List, Optional
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from .models import Tweet
def format_number(n):
# type: (int) -> str
"""Format number with K/M suffixes."""
if n >= 1_000_000:
return "%.1fM" % (n / 1_000_000)
if n >= 1_000:
return "%.1fK" % (n / 1_000)
return str(n)
def print_tweet_table(tweets, console=None, title=None):
# type: (List[Tweet], Optional[Console], Optional[str]) -> None
"""Print tweets as a rich table."""
if console is None:
console = Console()
if not title:
title = "📱 Twitter — %d tweets" % len(tweets)
table = Table(title=title, show_lines=True, expand=True)
table.add_column("#", style="dim", width=3, justify="right")
table.add_column("Author", style="cyan", width=18, no_wrap=True)
table.add_column("Tweet", ratio=3)
table.add_column("Stats", style="green", width=22, no_wrap=True)
table.add_column("Score", style="yellow", width=6, justify="right")
for i, tweet in enumerate(tweets):
# Author
verified = "" if tweet.author.verified else ""
author_text = "@%s%s" % (tweet.author.screen_name, verified)
if tweet.is_retweet and tweet.retweeted_by:
author_text += "\n🔄 @%s" % tweet.retweeted_by
# Tweet text (truncated)
text = tweet.text.replace("\n", " ").strip()
if len(text) > 120:
text = text[:117] + "..."
# Media indicators
if tweet.media:
media_icons = []
for m in tweet.media:
if m.type == "photo":
media_icons.append("📷")
elif m.type == "video":
media_icons.append("📹")
else:
media_icons.append("🎞️")
text += " " + " ".join(media_icons)
# Quoted tweet
if tweet.quoted_tweet:
qt = tweet.quoted_tweet
qt_text = qt.text.replace("\n", " ")[:60]
text += "\n┌ @%s: %s" % (qt.author.screen_name, qt_text)
# Stats
stats = (
"❤️ %s 🔄 %s\n💬 %s 👁️ %s"
% (
format_number(tweet.metrics.likes),
format_number(tweet.metrics.retweets),
format_number(tweet.metrics.replies),
format_number(tweet.metrics.views),
)
)
# Score
score_str = "%.1f" % tweet.score if tweet.score else "-"
table.add_row(str(i + 1), author_text, text, stats, score_str)
console.print(table)
def print_tweet_detail(tweet, console=None):
# type: (Tweet, Optional[Console]) -> None
"""Print a single tweet in detail using a rich panel."""
if console is None:
console = Console()
verified = "" if tweet.author.verified else ""
header = "@%s%s (%s)" % (tweet.author.screen_name, verified, tweet.author.name)
body_parts = []
if tweet.is_retweet and tweet.retweeted_by:
body_parts.append("🔄 Retweeted by @%s\n" % tweet.retweeted_by)
body_parts.append(tweet.text)
if tweet.media:
body_parts.append("")
for m in tweet.media:
icon = "📷" if m.type == "photo" else ("📹" if m.type == "video" else "🎞️")
body_parts.append("%s %s: %s" % (icon, m.type, m.url))
if tweet.urls:
body_parts.append("")
for url in tweet.urls:
body_parts.append("🔗 %s" % url)
if tweet.quoted_tweet:
qt = tweet.quoted_tweet
body_parts.append("")
body_parts.append("┌── Quoted @%s ──" % qt.author.screen_name)
body_parts.append(qt.text[:200])
body_parts.append("")
body_parts.append(
"❤️ %s 🔄 %s 💬 %s 🔖 %s 👁️ %s"
% (
format_number(tweet.metrics.likes),
format_number(tweet.metrics.retweets),
format_number(tweet.metrics.replies),
format_number(tweet.metrics.bookmarks),
format_number(tweet.metrics.views),
)
)
body_parts.append(
"🕐 %s · https://x.com/%s/status/%s"
% (tweet.created_at, tweet.author.screen_name, tweet.id)
)
console.print(Panel(
"\n".join(body_parts),
title=header,
border_style="blue",
expand=True,
))
def print_filter_stats(original_count, filtered, console=None):
# type: (int, List[Tweet], Optional[Console]) -> None
"""Print filter statistics."""
if console is None:
console = Console()
console.print(
"📊 Filter: %d%d tweets" % (original_count, len(filtered))
)
if filtered:
top_score = filtered[0].score
bottom_score = filtered[-1].score
console.print(
" Score range: %.1f ~ %.1f" % (bottom_score, top_score)
)
def tweets_to_json(tweets):
# type: (List[Tweet]) -> str
"""Export tweets as JSON string."""
result = []
for t in tweets:
d = {
"id": t.id,
"text": t.text,
"author": {
"id": t.author.id,
"name": t.author.name,
"screenName": t.author.screen_name,
"profileImageUrl": t.author.profile_image_url,
"verified": t.author.verified,
},
"metrics": {
"likes": t.metrics.likes,
"retweets": t.metrics.retweets,
"replies": t.metrics.replies,
"quotes": t.metrics.quotes,
"views": t.metrics.views,
"bookmarks": t.metrics.bookmarks,
},
"createdAt": t.created_at,
"media": [
{"type": m.type, "url": m.url, "width": m.width, "height": m.height}
for m in t.media
],
"urls": t.urls,
"isRetweet": t.is_retweet,
"retweetedBy": t.retweeted_by,
"lang": t.lang,
"score": t.score,
}
if t.quoted_tweet:
qt = t.quoted_tweet
d["quotedTweet"] = {
"id": qt.id,
"text": qt.text,
"author": {"screenName": qt.author.screen_name, "name": qt.author.name},
}
result.append(d)
return json.dumps(result, ensure_ascii=False, indent=2)

52
twitter_cli/models.py Normal file
View File

@@ -0,0 +1,52 @@
"""Data models for twitter-cli.
Defines Tweet, Author, Metrics, and TweetMedia as simple dataclasses.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class Author:
id: str
name: str
screen_name: str
profile_image_url: str = ""
verified: bool = False
@dataclass
class Metrics:
likes: int = 0
retweets: int = 0
replies: int = 0
quotes: int = 0
views: int = 0
bookmarks: int = 0
@dataclass
class TweetMedia:
type: str # "photo" | "video" | "animated_gif"
url: str
width: Optional[int] = None
height: Optional[int] = None
@dataclass
class Tweet:
id: str
text: str
author: Author
metrics: Metrics
created_at: str
media: List[TweetMedia] = field(default_factory=list)
urls: List[str] = field(default_factory=list)
is_retweet: bool = False
lang: str = ""
retweeted_by: Optional[str] = None
quoted_tweet: Optional[Tweet] = None
score: float = 0.0

164
twitter_cli/summarizer.py Normal file
View File

@@ -0,0 +1,164 @@
"""AI summarization module.
Supports OpenAI-compatible (doubao, deepseek, openai) and Anthropic APIs.
Uses urllib.request for zero extra dependencies.
"""
from __future__ import annotations
import json
import logging
import ssl
import urllib.request
from typing import Any, Dict, List
from .models import Tweet
logger = logging.getLogger(__name__)
SYSTEM_MESSAGE = "你是一个专业的 Twitter/X 信息流分析师,擅长提炼关键信息和发现趋势。"
def _build_prompt(tweets, language="zh-CN"):
# type: (List[Tweet], str) -> str
"""Build the summarization prompt."""
lines = []
for i, t in enumerate(tweets):
score_str = " [score: %.1f]" % t.score if t.score else ""
rt = " (RT by @%s)" % t.retweeted_by if t.is_retweet and t.retweeted_by else ""
media_str = ""
if t.media:
media_str = " [%s]" % ", ".join(m.type for m in t.media)
url_str = ""
if t.urls:
url_str = "\n Links: %s" % ", ".join(t.urls)
quoted = ""
if t.quoted_tweet:
qt = t.quoted_tweet
quoted = "\n Quoting @%s: %s..." % (qt.author.screen_name, qt.text[:100].replace("\n", " "))
text_preview = t.text.replace("\n", " ")[:300]
lines.append(
'%d. @%s (%s)%s%s\n'
' "%s"\n'
' ❤️%d 🔄%d 💬%d 🔖%d 👁️%d%s%s%s'
% (
i + 1, t.author.screen_name, t.author.name, rt, score_str,
text_preview,
t.metrics.likes, t.metrics.retweets, t.metrics.replies,
t.metrics.bookmarks, t.metrics.views,
media_str, url_str, quoted,
)
)
tweet_summaries = "\n\n".join(lines)
if language.startswith("zh"):
lang_inst = "请用中文输出。"
else:
lang_inst = "Please output in %s." % language
return (
"你是一个 Twitter/X 信息流分析师。请对以下 %d 条推文进行摘要总结。\n\n"
"要求:\n"
"1. 按主题分组AI & 编程、Crypto、工具推荐、生活观点等\n"
"2. 每组列出关键推文和核心观点,标注作者 @handle\n"
"3. 标注数据亮点(高赞/高收藏推文用 🔥 标记)\n"
"4. 最后用 2-3 句话总结今天 timeline 的整体趋势\n"
"5. %s\n\n"
"推文数据:\n\n%s"
) % (len(tweets), lang_inst, tweet_summaries)
def _call_openai(prompt, config):
# type: (str, Dict[str, Any]) -> str
"""Call OpenAI-compatible API."""
url = config.get("base_url", "").rstrip("/")
if not url.endswith("/chat/completions"):
if not url.endswith("/v1"):
url += "/v1"
url += "/chat/completions"
payload = json.dumps({
"model": config.get("model", ""),
"messages": [
{"role": "system", "content": SYSTEM_MESSAGE},
{"role": "user", "content": prompt},
],
"temperature": 0.3,
"max_tokens": 4096,
}).encode("utf-8")
req = urllib.request.Request(url, data=payload)
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", "Bearer %s" % config.get("api_key", ""))
ctx = ssl.create_default_context()
with urllib.request.urlopen(req, context=ctx, timeout=120) as resp:
data = json.loads(resp.read().decode("utf-8"))
choices = data.get("choices", [])
if choices:
return choices[0].get("message", {}).get("content", "")
return ""
def _call_anthropic(prompt, config):
# type: (str, Dict[str, Any]) -> str
"""Call Anthropic Messages API."""
url = config.get("base_url", "").rstrip("/")
if not url.endswith("/messages"):
if not url.endswith("/v1"):
url += "/v1"
url += "/messages"
payload = json.dumps({
"model": config.get("model", ""),
"system": SYSTEM_MESSAGE,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": 4096,
}).encode("utf-8")
req = urllib.request.Request(url, data=payload)
req.add_header("Content-Type", "application/json")
req.add_header("x-api-key", config.get("api_key", ""))
req.add_header("anthropic-version", "2023-06-01")
ctx = ssl.create_default_context()
with urllib.request.urlopen(req, context=ctx, timeout=120) as resp:
data = json.loads(resp.read().decode("utf-8"))
content_blocks = data.get("content", [])
for block in content_blocks:
if block.get("type") == "text":
return block.get("text", "")
return ""
def summarize(tweets, config):
# type: (List[Tweet], Dict[str, Any]) -> str
"""Summarize tweets using the configured AI provider.
Config keys: provider, api_key, model, base_url, language
"""
api_key = config.get("api_key", "")
if not api_key:
raise RuntimeError(
"AI API key not configured.\n"
"Set ai.api_key in config.yaml or export AI_API_KEY=your_key"
)
if not tweets:
return "No tweets to summarize."
language = config.get("language", "zh-CN")
prompt = _build_prompt(tweets, language)
provider = config.get("provider", "openai")
logger.info("Calling AI (%s/%s)...", provider, config.get("model", ""))
if provider == "anthropic":
return _call_anthropic(prompt, config)
else:
return _call_openai(prompt, config)