fix: harden pagination auth and runtime headers
This commit is contained in:
@@ -237,35 +237,59 @@ print(json.dumps({
|
||||
sys.exit(1)
|
||||
'''
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c", extract_script],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=15,
|
||||
)
|
||||
output = result.stdout.strip()
|
||||
if not output:
|
||||
stderr = result.stderr.strip()
|
||||
if stderr:
|
||||
logger.debug("Cookie extraction stderr from current env: %s", stderr[:300])
|
||||
# Maybe browser-cookie3 not installed, try with uv.
|
||||
result2 = subprocess.run(
|
||||
["uv", "run", "--with", "browser-cookie3", "python3", "-c", extract_script],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
output = result2.stdout.strip()
|
||||
if not output:
|
||||
logger.debug("Cookie extraction stderr from uv fallback: %s", result2.stderr.strip()[:300])
|
||||
return None
|
||||
def _run_extract_command(cmd, timeout, label):
|
||||
# type: (list[str], int, str) -> tuple[Optional[dict], bool]
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout,
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.debug("Cookie extraction %s timed out", label)
|
||||
return None, False
|
||||
except FileNotFoundError as exc:
|
||||
logger.debug("Cookie extraction %s launcher missing: %s", label, exc)
|
||||
return None, False
|
||||
|
||||
output = result.stdout.strip()
|
||||
stderr = result.stderr.strip()
|
||||
if stderr:
|
||||
logger.debug("Cookie extraction stderr from %s: %s", label, stderr[:300])
|
||||
if not output:
|
||||
logger.debug("Cookie extraction from %s produced no stdout", label)
|
||||
return None, True
|
||||
|
||||
try:
|
||||
data = json.loads(output)
|
||||
except json.JSONDecodeError as exc:
|
||||
logger.debug("Cookie extraction %s returned invalid JSON: %s", label, exc)
|
||||
return None, True
|
||||
|
||||
data = json.loads(output)
|
||||
if "error" in data:
|
||||
attempts = data.get("attempts") or []
|
||||
if attempts:
|
||||
logger.debug("Subprocess extraction attempts: %s", ", ".join(str(item) for item in attempts))
|
||||
logger.debug("Subprocess extraction attempts (%s): %s", label, ", ".join(str(item) for item in attempts))
|
||||
retryable = data.get("error") == "browser-cookie3 not installed"
|
||||
return None, retryable
|
||||
|
||||
return data, False
|
||||
|
||||
try:
|
||||
data, retry_with_uv = _run_extract_command(
|
||||
[sys.executable, "-c", extract_script],
|
||||
timeout=15,
|
||||
label="current env",
|
||||
)
|
||||
if data is None and retry_with_uv:
|
||||
data, _ = _run_extract_command(
|
||||
["uv", "run", "--with", "browser-cookie3", "python", "-c", extract_script],
|
||||
timeout=30,
|
||||
label="uv fallback",
|
||||
)
|
||||
|
||||
if data is None:
|
||||
return None
|
||||
logger.info("Found cookies in %s (subprocess)", data.get("browser", "unknown"))
|
||||
|
||||
@@ -277,18 +301,9 @@ sys.exit(1)
|
||||
cookies["cookie_string"] = cookie_str
|
||||
logger.info("Extracted %d total cookies for full browser fingerprint", len(all_cookies))
|
||||
return cookies
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.debug("Cookie extraction subprocess timed out")
|
||||
return None
|
||||
except json.JSONDecodeError as exc:
|
||||
logger.debug("Cookie extraction subprocess returned invalid JSON: %s", exc)
|
||||
return None
|
||||
except KeyError as exc:
|
||||
logger.debug("Cookie extraction subprocess returned incomplete payload: %s", exc)
|
||||
return None
|
||||
except FileNotFoundError as exc:
|
||||
logger.debug("Cookie extraction subprocess launcher missing: %s", exc)
|
||||
return None
|
||||
|
||||
|
||||
def extract_from_browser() -> Optional[Dict[str, str]]:
|
||||
|
||||
@@ -196,6 +196,26 @@ def _fetch_and_display(fetch_fn, label, emoji, max_count, as_json, output_file,
|
||||
console.print()
|
||||
|
||||
|
||||
def _run_bookmarks_command(max_count, as_json, output_file, do_filter):
|
||||
# type: (Optional[int], bool, Optional[str], bool) -> None
|
||||
config = load_config()
|
||||
|
||||
def _run():
|
||||
client = _get_client(config)
|
||||
_fetch_and_display(
|
||||
lambda count: client.fetch_bookmarks(count),
|
||||
"bookmarks",
|
||||
"🔖",
|
||||
max_count,
|
||||
as_json,
|
||||
output_file,
|
||||
do_filter,
|
||||
config,
|
||||
)
|
||||
|
||||
_run_guarded(_run)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option(
|
||||
"--type",
|
||||
@@ -258,14 +278,7 @@ def feed(feed_type, max_count, as_json, input_file, output_file, do_filter):
|
||||
def favorites(max_count, as_json, output_file, do_filter):
|
||||
# type: (Optional[int], bool, Optional[str], bool) -> None
|
||||
"""Fetch bookmarked (favorite) tweets."""
|
||||
config = load_config()
|
||||
def _run():
|
||||
client = _get_client(config)
|
||||
_fetch_and_display(
|
||||
lambda count: client.fetch_bookmarks(count),
|
||||
"bookmarks", "🔖", max_count, as_json, output_file, do_filter, config,
|
||||
)
|
||||
_run_guarded(_run)
|
||||
_run_bookmarks_command(max_count, as_json, output_file, do_filter)
|
||||
|
||||
|
||||
@cli.command(name="bookmarks")
|
||||
@@ -276,7 +289,7 @@ def favorites(max_count, as_json, output_file, do_filter):
|
||||
def bookmarks(max_count, as_json, output_file, do_filter):
|
||||
# type: (Optional[int], bool, Optional[str], bool) -> None
|
||||
"""Fetch bookmarked tweets."""
|
||||
favorites.callback(max_count=max_count, as_json=as_json, output_file=output_file, do_filter=do_filter)
|
||||
_run_bookmarks_command(max_count, as_json, output_file, do_filter)
|
||||
|
||||
|
||||
@cli.command()
|
||||
|
||||
@@ -16,8 +16,14 @@ from x_client_transaction import ClientTransaction
|
||||
from x_client_transaction.utils import generate_headers as _gen_ct_headers, get_ondemand_file_url
|
||||
|
||||
from .constants import (
|
||||
BEARER_TOKEN, SEC_CH_UA_MOBILE, SEC_CH_UA_PLATFORM,
|
||||
get_sec_ch_ua, get_user_agent, sync_chrome_version,
|
||||
BEARER_TOKEN,
|
||||
SEC_CH_UA_MOBILE,
|
||||
get_accept_language,
|
||||
get_sec_ch_ua,
|
||||
get_sec_ch_ua_platform,
|
||||
get_twitter_client_language,
|
||||
get_user_agent,
|
||||
sync_chrome_version,
|
||||
)
|
||||
from .models import Author, Metrics, Tweet, TweetMedia, UserProfile
|
||||
|
||||
@@ -614,10 +620,16 @@ class TwitterClient:
|
||||
seen_ids.add(tweet.id)
|
||||
tweets.append(tweet)
|
||||
|
||||
if not next_cursor or not new_tweets:
|
||||
if not next_cursor:
|
||||
break
|
||||
if next_cursor == cursor:
|
||||
logger.debug("Timeline pagination stopped because cursor did not advance: %s", next_cursor)
|
||||
break
|
||||
cursor = next_cursor
|
||||
|
||||
if not new_tweets:
|
||||
logger.debug("Timeline page returned no tweets but exposed next cursor; continuing pagination")
|
||||
|
||||
# Rate-limit: sleep between paginated requests with jitter
|
||||
if len(tweets) < count and self._request_delay > 0:
|
||||
jitter = self._request_delay * random.uniform(0.7, 1.5)
|
||||
@@ -688,15 +700,15 @@ class TwitterClient:
|
||||
"X-Csrf-Token": self._ct0,
|
||||
"X-Twitter-Active-User": "yes",
|
||||
"X-Twitter-Auth-Type": "OAuth2Session",
|
||||
"X-Twitter-Client-Language": "en",
|
||||
"X-Twitter-Client-Language": get_twitter_client_language(),
|
||||
"User-Agent": get_user_agent(),
|
||||
"Origin": "https://x.com",
|
||||
"Referer": "https://x.com",
|
||||
"Accept": "*/*",
|
||||
"Accept-Language": "en-US,en;q=0.9",
|
||||
"Accept-Language": get_accept_language(),
|
||||
"sec-ch-ua": get_sec_ch_ua(),
|
||||
"sec-ch-ua-mobile": SEC_CH_UA_MOBILE,
|
||||
"sec-ch-ua-platform": SEC_CH_UA_PLATFORM,
|
||||
"sec-ch-ua-platform": get_sec_ch_ua_platform(),
|
||||
"Sec-Fetch-Dest": "empty",
|
||||
"Sec-Fetch-Mode": "cors",
|
||||
"Sec-Fetch-Site": "same-origin",
|
||||
@@ -902,10 +914,10 @@ class TwitterClient:
|
||||
bio=legacy.get("description", ""),
|
||||
location=legacy.get("location", ""),
|
||||
url=_deep_get(legacy, "entities", "url", "urls", 0, "expanded_url") or "",
|
||||
followers_count=legacy.get("followers_count", 0),
|
||||
following_count=legacy.get("friends_count", 0),
|
||||
tweets_count=legacy.get("statuses_count", 0),
|
||||
likes_count=legacy.get("favourites_count", 0),
|
||||
followers_count=_parse_int(legacy.get("followers_count"), 0),
|
||||
following_count=_parse_int(legacy.get("friends_count"), 0),
|
||||
tweets_count=_parse_int(legacy.get("statuses_count"), 0),
|
||||
likes_count=_parse_int(legacy.get("favourites_count"), 0),
|
||||
verified=user_data.get("is_blue_verified", False) or legacy.get("verified", False),
|
||||
profile_image_url=legacy.get("profile_image_url_https", ""),
|
||||
created_at=legacy.get("created_at", ""),
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
"""Shared constants for twitter-cli."""
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
BEARER_TOKEN = (
|
||||
"AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs"
|
||||
@@ -26,10 +28,16 @@ def sync_chrome_version(impersonate_target):
|
||||
|
||||
def get_user_agent():
|
||||
# type: () -> str
|
||||
if sys.platform == "darwin":
|
||||
platform = "Macintosh; Intel Mac OS X 10_15_7"
|
||||
elif sys.platform.startswith("win"):
|
||||
platform = "Windows NT 10.0; Win64; x64"
|
||||
else:
|
||||
platform = "X11; Linux x86_64"
|
||||
return (
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||||
"Mozilla/5.0 (%s) "
|
||||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||
"Chrome/%s.0.0.0 Safari/537.36" % _chrome_version
|
||||
"Chrome/%s.0.0.0 Safari/537.36" % (platform, _chrome_version)
|
||||
)
|
||||
|
||||
|
||||
@@ -40,9 +48,44 @@ def get_sec_ch_ua():
|
||||
)
|
||||
|
||||
|
||||
def _get_locale_tag():
|
||||
# type: () -> str
|
||||
raw = (
|
||||
os.environ.get("LC_ALL")
|
||||
or os.environ.get("LC_MESSAGES")
|
||||
or os.environ.get("LANG")
|
||||
or "en_US.UTF-8"
|
||||
)
|
||||
tag = raw.split(".", 1)[0].replace("_", "-")
|
||||
return tag or "en-US"
|
||||
|
||||
|
||||
def get_accept_language():
|
||||
# type: () -> str
|
||||
tag = _get_locale_tag()
|
||||
language = tag.split("-", 1)[0] or "en"
|
||||
if tag == language:
|
||||
return "%s,%s;q=0.9,en;q=0.8" % (tag, language)
|
||||
return "%s,%s;q=0.9,en;q=0.8" % (tag, language)
|
||||
|
||||
|
||||
def get_twitter_client_language():
|
||||
# type: () -> str
|
||||
return _get_locale_tag().split("-", 1)[0] or "en"
|
||||
|
||||
|
||||
def get_sec_ch_ua_platform():
|
||||
# type: () -> str
|
||||
if sys.platform == "darwin":
|
||||
return '"macOS"'
|
||||
if sys.platform.startswith("win"):
|
||||
return '"Windows"'
|
||||
return '"Linux"'
|
||||
|
||||
|
||||
# Static Client Hints
|
||||
SEC_CH_UA_MOBILE = "?0"
|
||||
SEC_CH_UA_PLATFORM = '"macOS"'
|
||||
SEC_CH_UA_PLATFORM = get_sec_ch_ua_platform()
|
||||
|
||||
# Legacy aliases — modules that import these get the default value.
|
||||
# _build_headers() should use get_user_agent() / get_sec_ch_ua() instead.
|
||||
|
||||
Reference in New Issue
Block a user