Files
twitter-cli-cookiefile/twitter_cli/parser.py
Lucius 7816f8d813 Feed cursor pagination (#49)
* Expose promoted tweets in feed output

* Add cursor-based feed pagination output
2026-04-10 01:20:18 +08:00

532 lines
21 KiB
Python

"""Response parsing for Twitter GraphQL API.
Converts raw GraphQL response JSON into domain model objects
(Tweet, UserProfile, Author, etc.).
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Callable, Dict, List, Optional, Tuple # noqa: F401
from .models import Author, Metrics, Tweet, TweetMedia, UserProfile
logger = logging.getLogger(__name__)
# ── Utility helpers ──────────────────────────────────────────────────────
def _deep_get(data, *keys):
# type: (Any, *Any) -> Any
"""Safely get nested dict/list values. Supports int keys for list access."""
current = data
for key in keys:
if isinstance(key, int):
if isinstance(current, list) and 0 <= key < len(current):
current = current[key]
else:
return None
elif isinstance(current, dict):
current = current.get(key)
else:
return None
return current
def _parse_int(value, default):
# type: (Any, int) -> int
"""Best-effort integer conversion. Handles commas and float strings."""
try:
text = str(value).replace(",", "").strip()
if not text:
return default
return int(float(text))
except (TypeError, ValueError):
return default
def _extract_cursor(content):
# type: (Dict[str, Any]) -> Optional[str]
"""Extract Bottom pagination cursor from timeline content."""
if content.get("cursorType") == "Bottom":
return content.get("value")
return None
# ── Media / Author extraction ────────────────────────────────────────────
def _extract_media(legacy):
# type: (Dict[str, Any]) -> List[TweetMedia]
"""Extract media items from tweet legacy data."""
media = [] # type: List[TweetMedia]
for media_item in _deep_get(legacy, "extended_entities", "media") or []:
media_type = media_item.get("type", "")
if media_type == "photo":
media.append(
TweetMedia(
type="photo",
url=media_item.get("media_url_https", ""),
width=_deep_get(media_item, "original_info", "width"),
height=_deep_get(media_item, "original_info", "height"),
)
)
elif media_type in {"video", "animated_gif"}:
variants = media_item.get("video_info", {}).get("variants", [])
mp4_variants = [v for v in variants if v.get("content_type") == "video/mp4"]
mp4_variants.sort(key=lambda v: v.get("bitrate", 0), reverse=True)
media.append(
TweetMedia(
type=media_type,
url=mp4_variants[0]["url"] if mp4_variants else media_item.get("media_url_https", ""),
width=_deep_get(media_item, "original_info", "width"),
height=_deep_get(media_item, "original_info", "height"),
)
)
return media
def _extract_author(user_data, user_legacy):
# type: (Dict[str, Any], Dict[str, Any]) -> Author
"""Extract Author from user result data."""
user_core = user_data.get("core", {})
return Author(
id=user_data.get("rest_id", ""),
name=user_core.get("name") or user_legacy.get("name") or user_data.get("name", "Unknown"),
screen_name=(
user_core.get("screen_name")
or user_legacy.get("screen_name")
or user_data.get("screen_name", "unknown")
),
profile_image_url=(
user_data.get("avatar", {}).get("image_url")
or user_legacy.get("profile_image_url_https", "")
),
verified=bool(user_data.get("is_blue_verified") or user_legacy.get("verified", False)),
)
# ── Article parsing ──────────────────────────────────────────────────────
def _find_article_image_url(value):
# type: (Any) -> Optional[str]
"""Best-effort extraction of the original image URL from article entity data."""
if isinstance(value, dict):
for key in (
"original_img_url",
"originalImgUrl",
"original_url",
"originalUrl",
"media_url_https",
"mediaUrlHttps",
"media_url",
"mediaUrl",
"url",
"src",
"uri",
):
candidate = value.get(key)
if isinstance(candidate, str) and candidate.strip():
lowered = candidate.lower()
if (
lowered.startswith("https://pbs.twimg.com/")
or lowered.endswith((".jpg", ".jpeg", ".png", ".gif", ".webp"))
or any(ext in lowered for ext in (".jpg?", ".jpeg?", ".png?", ".gif?", ".webp?"))
):
return candidate.strip()
for nested in value.values():
found = _find_article_image_url(nested)
if found:
return found
return None
if isinstance(value, list):
for item in value:
found = _find_article_image_url(item)
if found:
return found
return None
def _normalize_article_entity_map(entity_map):
# type: (Any) -> Dict[str, Any]
"""Normalize Draft.js entityMap that may arrive as dict or [{key, value}, ...]."""
if isinstance(entity_map, dict):
return {str(key): value for key, value in entity_map.items()}
if isinstance(entity_map, list):
normalized = {} # type: Dict[str, Any]
for item in entity_map:
if not isinstance(item, dict):
continue
key = item.get("key")
value = item.get("value")
if key is None or value is None:
continue
normalized[str(key)] = value
return normalized
return {}
def _extract_article_media_url_map(article_results):
# type: (Dict[str, Any]) -> Dict[str, str]
"""Map article media ids/keys to original image URLs when entities reference IDs only."""
media_url_map = {} # type: Dict[str, str]
media_candidates = [] # type: List[Any]
cover_media = article_results.get("cover_media")
if cover_media:
media_candidates.append(cover_media)
media_candidates.extend(article_results.get("media_entities") or [])
for media in media_candidates:
if not isinstance(media, dict):
continue
media_info = media.get("media_info") or {}
image_url = _find_article_image_url(media_info) or _find_article_image_url(media)
if not image_url:
continue
for key in ("media_id", "media_key", "id"):
candidate = media.get(key)
if isinstance(candidate, str) and candidate:
media_url_map[candidate] = image_url
return media_url_map
def _extract_atomic_markdown(block, entity_map):
# type: (Dict[str, Any], Dict[str, Any]) -> List[str]
"""Extract embedded markdown/code payloads from atomic Draft.js entities."""
parts = [] # type: List[str]
for entity_range in block.get("entityRanges", []) or []:
if not isinstance(entity_range, dict):
continue
entity_key = entity_range.get("key")
entity = entity_map.get(str(entity_key)) if entity_key is not None else None
if not isinstance(entity, dict):
continue
if str(entity.get("type") or "").upper() != "MARKDOWN":
continue
markdown = _deep_get(entity, "data", "markdown")
if isinstance(markdown, str) and markdown.strip():
parts.append(markdown.strip())
return parts
def _render_article_text_block(block, entity_map):
# type: (Dict[str, Any], Dict[str, Any]) -> str
"""Render a Draft.js text block, converting inline hyperlinks to Markdown."""
text = block.get("text", "")
if not isinstance(text, str) or not text:
return ""
entity_ranges = block.get("entityRanges", []) or []
if not entity_ranges:
return text
rendered = text
ranges = []
for entity_range in entity_ranges:
if not isinstance(entity_range, dict):
continue
entity_key = entity_range.get("key")
entity = entity_map.get(str(entity_key)) if entity_key is not None else None
if not isinstance(entity, dict):
continue
if str(entity.get("type") or "").upper() != "LINK":
continue
offset = entity_range.get("offset")
length = entity_range.get("length")
if not isinstance(offset, int) or not isinstance(length, int) or length <= 0:
continue
url = _deep_get(entity, "data", "url")
if not isinstance(url, str) or not url.strip():
continue
ranges.append((offset, length, url.strip()))
for offset, length, url in sorted(ranges, reverse=True):
if offset < 0 or offset + length > len(rendered):
continue
label = rendered[offset:offset + length]
if not label:
continue
# Escape markdown special chars: ] in labels and ) in URLs
safe_label = label.replace("[", "\\[").replace("]", "\\]")
safe_url = url.replace(")", "%29")
rendered = "%s[%s](%s)%s" % (
rendered[:offset],
safe_label,
safe_url,
rendered[offset + length:],
)
return rendered
def _find_article_caption(value):
# type: (Any) -> Optional[str]
"""Best-effort extraction of image caption/alt text from article entity data."""
if isinstance(value, dict):
for key in ("caption", "alt", "alt_text", "altText", "title", "name"):
candidate = value.get(key)
if isinstance(candidate, str) and candidate.strip():
return candidate.strip()
for nested in value.values():
found = _find_article_caption(nested)
if found:
return found
return None
if isinstance(value, list):
for item in value:
found = _find_article_caption(item)
if found:
return found
return None
def _extract_article_images(block, entity_map, media_url_map):
# type: (Dict[str, Any], Dict[str, Any], Dict[str, str]) -> List[str]
"""Convert atomic Draft.js image entities to Markdown image lines."""
parts = [] # type: List[str]
for entity_range in block.get("entityRanges", []) or []:
if not isinstance(entity_range, dict):
continue
entity_key = entity_range.get("key")
entity = entity_map.get(str(entity_key)) if entity_key is not None else None
if not isinstance(entity, dict):
continue
image_url = _find_article_image_url(entity)
if not image_url:
media_items = _deep_get(entity, "data", "mediaItems") or []
for media_item in media_items:
media_id = media_item.get("mediaId") if isinstance(media_item, dict) else None
if isinstance(media_id, str) and media_id in media_url_map:
image_url = media_url_map[media_id]
break
if not image_url:
continue
caption = _find_article_caption(entity) or ""
parts.append("![%s](%s)" % (caption, image_url))
return parts
def _parse_article(tweet_data):
# type: (Dict[str, Any]) -> Dict[str, Any]
"""Extract Twitter Article data (long-form content) from a tweet.
Returns dict with 'article_title' and 'article_text' keys (None if not an article).
Converts draft.js content blocks to Markdown.
"""
article_results = _deep_get(tweet_data, "article", "article_results", "result")
if not article_results:
return {"article_title": None, "article_text": None}
title = article_results.get("title") # type: Optional[str]
content_state = article_results.get("content_state", {})
blocks = content_state.get("blocks", [])
if not blocks:
return {"article_title": title, "article_text": None}
entity_map = _normalize_article_entity_map(content_state.get("entityMap", {}))
media_url_map = _extract_article_media_url_map(article_results)
# Convert draft.js blocks to Markdown
parts = [] # type: List[str]
ordered_counter = 0
for block in blocks:
block_type = block.get("type", "unstyled") # type: str
if block_type == "atomic":
parts.extend(_extract_atomic_markdown(block, entity_map))
parts.extend(_extract_article_images(block, entity_map, media_url_map))
ordered_counter = 0
continue
text = _render_article_text_block(block, entity_map)
if not text:
continue
if block_type != "ordered-list-item":
ordered_counter = 0
if block_type == "header-one":
parts.append("# %s" % text)
elif block_type == "header-two":
parts.append("## %s" % text)
elif block_type == "header-three":
parts.append("### %s" % text)
elif block_type == "blockquote":
parts.append("> %s" % text)
elif block_type == "unordered-list-item":
parts.append("- %s" % text)
elif block_type == "ordered-list-item":
ordered_counter += 1
parts.append("%d. %s" % (ordered_counter, text))
elif block_type == "code-block":
parts.append("```\n%s\n```" % text)
else:
parts.append(text)
return {
"article_title": title,
"article_text": "\n\n".join(parts) if parts else None,
}
# ── User parsing ─────────────────────────────────────────────────────────
def parse_user_result(user_data):
# type: (Dict[str, Any]) -> Optional[UserProfile]
"""Parse a user result object into UserProfile."""
if user_data.get("__typename") == "UserUnavailable":
return None
legacy = user_data.get("legacy", {})
if not legacy:
return None
return UserProfile(
id=user_data.get("rest_id", ""),
name=legacy.get("name", ""),
screen_name=legacy.get("screen_name", ""),
bio=legacy.get("description", ""),
location=legacy.get("location", ""),
url=_deep_get(legacy, "entities", "url", "urls", 0, "expanded_url") or "",
followers_count=_parse_int(legacy.get("followers_count"), 0),
following_count=_parse_int(legacy.get("friends_count"), 0),
tweets_count=_parse_int(legacy.get("statuses_count"), 0),
likes_count=_parse_int(legacy.get("favourites_count"), 0),
verified=user_data.get("is_blue_verified", False) or legacy.get("verified", False),
profile_image_url=legacy.get("profile_image_url_https", ""),
created_at=legacy.get("created_at", ""),
)
# ── Tweet parsing ────────────────────────────────────────────────────────
def _unwrap_visibility(result):
# type: (Dict[str, Any]) -> Tuple[Dict[str, Any], bool]
"""Unwrap TweetWithVisibilityResults, returning (inner_data, is_subscriber_only)."""
if result.get("__typename") == "TweetWithVisibilityResults" and result.get("tweet"):
return result["tweet"], bool(result.get("tweetInterstitial"))
return result, False
def parse_tweet_result(result, depth=0):
# type: (Dict[str, Any], int) -> Optional[Tweet]
"""Parse a single TweetResult into a Tweet dataclass."""
if depth > 2:
return None
tweet_data, is_subscriber_only = _unwrap_visibility(result)
if tweet_data.get("__typename") == "TweetTombstone":
return None
legacy = tweet_data.get("legacy")
core = tweet_data.get("core")
if not isinstance(legacy, dict) or not isinstance(core, dict):
return None
user = _deep_get(core, "user_results", "result") or {}
user_legacy = user.get("legacy", {})
user_core = user.get("core", {})
is_retweet = bool(_deep_get(legacy, "retweeted_status_result", "result"))
actual_data = tweet_data
actual_legacy = legacy
actual_user = user
actual_user_legacy = user_legacy
if is_retweet:
retweet_result = _deep_get(legacy, "retweeted_status_result", "result") or {}
retweet_result, retweet_subscriber_only = _unwrap_visibility(retweet_result)
rt_legacy = retweet_result.get("legacy")
rt_core = retweet_result.get("core")
if isinstance(rt_legacy, dict) and isinstance(rt_core, dict):
actual_data = retweet_result
actual_legacy = rt_legacy
actual_user = _deep_get(rt_core, "user_results", "result") or {}
actual_user_legacy = actual_user.get("legacy", {})
media = _extract_media(actual_legacy)
urls = [item.get("expanded_url", "") for item in _deep_get(actual_legacy, "entities", "urls") or []]
quoted = _deep_get(actual_data, "quoted_status_result", "result")
quoted_tweet = parse_tweet_result(quoted, depth=depth + 1) if isinstance(quoted, dict) else None
author = _extract_author(actual_user, actual_user_legacy)
retweeted_by = None # type: Optional[str]
if is_retweet:
retweeted_by = user_core.get("screen_name") or user_legacy.get("screen_name", "unknown")
# Prefer note_tweet full text for long tweets ("Show More")
note_text = _deep_get(actual_data, "note_tweet", "note_tweet_results", "result", "text")
return Tweet(
id=actual_data.get("rest_id", ""),
text=note_text or actual_legacy.get("full_text", ""),
author=author,
metrics=Metrics(
likes=_parse_int(actual_legacy.get("favorite_count"), 0),
retweets=_parse_int(actual_legacy.get("retweet_count"), 0),
replies=_parse_int(actual_legacy.get("reply_count"), 0),
quotes=_parse_int(actual_legacy.get("quote_count"), 0),
views=_parse_int(_deep_get(actual_data, "views", "count"), 0),
bookmarks=_parse_int(actual_legacy.get("bookmark_count"), 0),
),
created_at=actual_legacy.get("created_at", ""),
media=media,
urls=urls,
is_retweet=is_retweet,
retweeted_by=retweeted_by,
quoted_tweet=quoted_tweet,
lang=actual_legacy.get("lang", ""),
is_subscriber_only=(is_subscriber_only or retweet_subscriber_only) if is_retweet else is_subscriber_only,
**_parse_article(actual_data),
)
# ── Timeline response parsing ───────────────────────────────────────────
def parse_timeline_response(data, get_instructions):
# type: (Any, Callable[[Any], Any]) -> Tuple[List[Tweet], Optional[str]]
"""Parse timeline GraphQL response into tweets and next cursor."""
tweets = [] # type: List[Tweet]
next_cursor = None # type: Optional[str]
instructions = get_instructions(data)
if not isinstance(instructions, list):
logger.warning("No timeline instructions found")
return tweets, next_cursor
for instruction in instructions:
entries = instruction.get("entries") or instruction.get("moduleItems") or []
for entry in entries:
content = entry.get("content", {})
next_cursor = _extract_cursor(content) or next_cursor
item_content = content.get("itemContent", {})
result = _deep_get(item_content, "tweet_results", "result")
if result:
tweet = parse_tweet_result(result)
if tweet:
tweet.is_promoted = bool(
str(entry.get("entryId") or "").startswith("promoted-")
or item_content.get("promotedMetadata")
)
tweets.append(tweet)
for nested_item in content.get("items", []):
nested_result = _deep_get(
nested_item,
"item",
"itemContent",
"tweet_results",
"result",
)
if nested_result:
tweet = parse_tweet_result(nested_result)
if tweet:
nested_item_content = _deep_get(nested_item, "item", "itemContent") or {}
tweet.is_promoted = bool(
str(_deep_get(nested_item, "entryId") or "").startswith("promoted-")
or nested_item_content.get("promotedMetadata")
)
tweets.append(tweet)
return tweets, next_cursor