fix: tighten pagination and platform-specific auth
This commit is contained in:
@@ -243,6 +243,36 @@ def test_iter_chrome_cookie_files_env_override(monkeypatch, tmp_path) -> None:
|
||||
assert "Profile 5" in paths[0]
|
||||
|
||||
|
||||
def test_iter_chrome_cookie_files_edge_linux_uses_microsoft_edge_path(monkeypatch, tmp_path) -> None:
|
||||
monkeypatch.setattr(auth.sys, "platform", "linux")
|
||||
edge_dir = tmp_path / ".config" / "microsoft-edge"
|
||||
(edge_dir / "Default").mkdir(parents=True)
|
||||
(edge_dir / "Default" / "Cookies").touch()
|
||||
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
monkeypatch.delenv("TWITTER_CHROME_PROFILE", raising=False)
|
||||
|
||||
paths = auth._iter_chrome_cookie_files("edge")
|
||||
|
||||
assert len(paths) == 1
|
||||
assert paths[0].endswith(".config/microsoft-edge/Default/Cookies")
|
||||
|
||||
|
||||
def test_iter_chrome_cookie_files_edge_windows_uses_user_data(monkeypatch, tmp_path) -> None:
|
||||
monkeypatch.setattr(auth.sys, "platform", "win32")
|
||||
monkeypatch.setenv("LOCALAPPDATA", str(tmp_path))
|
||||
monkeypatch.delenv("TWITTER_CHROME_PROFILE", raising=False)
|
||||
|
||||
edge_dir = tmp_path / "Microsoft" / "Edge" / "User Data" / "Default"
|
||||
edge_dir.mkdir(parents=True)
|
||||
(edge_dir / "Cookies").touch()
|
||||
|
||||
paths = auth._iter_chrome_cookie_files("edge")
|
||||
|
||||
assert len(paths) == 1
|
||||
assert "Microsoft/Edge/User Data/Default/Cookies".replace("/", os.sep) in paths[0]
|
||||
|
||||
|
||||
def test_extract_in_process_tries_multiple_profiles(monkeypatch, tmp_path) -> None:
|
||||
"""When Default has no Twitter cookies but Profile 1 does, it should find them."""
|
||||
|
||||
@@ -366,4 +396,3 @@ def test_extract_in_process_returns_diagnostics_on_failure(monkeypatch) -> None:
|
||||
|
||||
assert cookies is None
|
||||
assert any("cookie decryption" in d for d in diagnostics)
|
||||
|
||||
|
||||
@@ -20,6 +20,18 @@ def test_cli_user_command_works_with_client_factory(monkeypatch) -> None:
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_get_client_for_output_does_not_swallow_real_type_error(monkeypatch) -> None:
|
||||
def _broken_get_client(config=None, quiet=False):
|
||||
raise TypeError("real bug")
|
||||
|
||||
monkeypatch.setattr("twitter_cli.cli._get_client", _broken_get_client)
|
||||
|
||||
with pytest.raises(TypeError, match="real bug"):
|
||||
from twitter_cli.cli import _get_client_for_output
|
||||
|
||||
_get_client_for_output({})
|
||||
|
||||
|
||||
def test_cli_feed_json_input_path(tmp_path, tweet_factory) -> None:
|
||||
json_path = tmp_path / "tweets.json"
|
||||
json_path.write_text(tweets_to_json([tweet_factory("1")]), encoding="utf-8")
|
||||
|
||||
@@ -276,6 +276,8 @@ class TestBuildHeaders:
|
||||
assert "sec-ch-ua" in headers
|
||||
|
||||
@patch("twitter_cli.client.get_sec_ch_ua_platform", return_value='"Linux"')
|
||||
@patch("twitter_cli.client.get_sec_ch_ua_platform_version", return_value='""')
|
||||
@patch("twitter_cli.client.get_sec_ch_ua_arch", return_value='"x86"')
|
||||
@patch("twitter_cli.client.get_accept_language", return_value="zh-CN,zh;q=0.9,en;q=0.8")
|
||||
@patch("twitter_cli.client.get_twitter_client_language", return_value="zh")
|
||||
@patch("twitter_cli.client._get_cffi_session")
|
||||
@@ -286,6 +288,8 @@ class TestBuildHeaders:
|
||||
mock_session,
|
||||
mock_client_language,
|
||||
mock_accept_language,
|
||||
mock_arch,
|
||||
mock_platform_version,
|
||||
mock_platform,
|
||||
):
|
||||
mock_session.return_value = MagicMock()
|
||||
@@ -307,6 +311,8 @@ class TestBuildHeaders:
|
||||
assert headers["X-Twitter-Client-Language"] == "zh"
|
||||
assert headers["Accept-Language"] == "zh-CN,zh;q=0.9,en;q=0.8"
|
||||
assert headers["sec-ch-ua-platform"] == '"Linux"'
|
||||
assert headers["sec-ch-ua-arch"] == '"x86"'
|
||||
assert headers["sec-ch-ua-platform-version"] == '""'
|
||||
|
||||
|
||||
class TestPaginationBehavior:
|
||||
@@ -356,6 +362,49 @@ class TestPaginationBehavior:
|
||||
assert tweets == []
|
||||
assert calls == [None, "cursor-same"]
|
||||
|
||||
def test_user_list_continues_when_cursor_advances_without_new_users(self):
|
||||
client = TwitterClient.__new__(TwitterClient)
|
||||
client._request_delay = 0.0
|
||||
client._max_count = 200
|
||||
|
||||
responses = iter(
|
||||
[
|
||||
{"page": 1},
|
||||
{"page": 2},
|
||||
]
|
||||
)
|
||||
|
||||
def _graphql_get(operation_name, variables, features):
|
||||
return next(responses)
|
||||
|
||||
def _parse_user_result(data):
|
||||
return MagicMock(id=data["id"], screen_name=data["screen_name"])
|
||||
|
||||
def _get_instructions(data):
|
||||
if data["page"] == 1:
|
||||
return [
|
||||
{"entries": [{"content": {"entryType": "TimelineTimelineCursor", "cursorType": "Bottom", "value": "cursor-2"}}]}
|
||||
]
|
||||
return [
|
||||
{
|
||||
"entries": [
|
||||
{
|
||||
"content": {
|
||||
"entryType": "TimelineTimelineItem",
|
||||
"itemContent": {"user_results": {"result": {"id": "user-1", "screen_name": "alice"}}},
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
client._graphql_get = _graphql_get
|
||||
client._parse_user_result = _parse_user_result
|
||||
|
||||
users = client._fetch_user_list("Followers", "1", 1, _get_instructions)
|
||||
|
||||
assert [user.screen_name for user in users] == ["alice"]
|
||||
|
||||
|
||||
# ── TwitterClient._parse_tweet_result ─────────────────────────────────────
|
||||
|
||||
|
||||
@@ -189,7 +189,7 @@ def _extract_cookies_from_jar(jar: Any, source: str = "unknown") -> Optional[Dic
|
||||
_CHROMIUM_BASE_DIRS: Dict[str, str] = {
|
||||
"chrome": os.path.join("Google", "Chrome"),
|
||||
"arc": os.path.join("Arc", "User Data"),
|
||||
"edge": os.path.join("Microsoft Edge"),
|
||||
"edge": "Microsoft Edge",
|
||||
"brave": os.path.join("BraveSoftware", "Brave-Browser"),
|
||||
}
|
||||
|
||||
@@ -207,7 +207,13 @@ def _iter_chrome_cookie_files(browser_name: str) -> List[str]:
|
||||
if sys.platform == "darwin":
|
||||
root = os.path.join(os.path.expanduser("~"), "Library", "Application Support", base_dir)
|
||||
elif sys.platform == "win32":
|
||||
root = os.path.join(os.environ.get("LOCALAPPDATA", ""), base_dir, "User Data")
|
||||
if browser_name == "edge":
|
||||
root = os.path.join(os.environ.get("LOCALAPPDATA", ""), "Microsoft", "Edge", "User Data")
|
||||
else:
|
||||
root = os.path.join(os.environ.get("LOCALAPPDATA", ""), base_dir)
|
||||
else:
|
||||
if browser_name == "edge":
|
||||
root = os.path.join(os.path.expanduser("~"), ".config", "microsoft-edge")
|
||||
else:
|
||||
root = os.path.join(os.path.expanduser("~"), ".config", base_dir)
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ from __future__ import annotations
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import inspect
|
||||
import sys
|
||||
import time
|
||||
import urllib.parse
|
||||
@@ -148,8 +149,12 @@ def _get_client_for_output(config=None, quiet=False):
|
||||
# type: (Optional[Dict[str, Any]], bool) -> TwitterClient
|
||||
"""Call _get_client while staying compatible with monkeypatched legacy signatures."""
|
||||
try:
|
||||
signature = inspect.signature(_get_client)
|
||||
except (TypeError, ValueError):
|
||||
signature = None
|
||||
|
||||
if signature and "quiet" in signature.parameters:
|
||||
return _get_client(config, quiet=quiet)
|
||||
except TypeError:
|
||||
return _get_client(config)
|
||||
|
||||
|
||||
|
||||
@@ -18,16 +18,16 @@ from x_client_transaction.utils import generate_headers as _gen_ct_headers, get_
|
||||
|
||||
from .constants import (
|
||||
BEARER_TOKEN,
|
||||
SEC_CH_UA_ARCH,
|
||||
SEC_CH_UA_BITNESS,
|
||||
SEC_CH_UA_MOBILE,
|
||||
SEC_CH_UA_MODEL,
|
||||
SEC_CH_UA_PLATFORM_VERSION,
|
||||
get_accept_language,
|
||||
get_sec_ch_ua_arch,
|
||||
get_sec_ch_ua,
|
||||
get_sec_ch_ua_full_version,
|
||||
get_sec_ch_ua_full_version_list,
|
||||
get_sec_ch_ua_platform,
|
||||
get_sec_ch_ua_platform_version,
|
||||
get_twitter_client_language,
|
||||
get_user_agent,
|
||||
sync_chrome_version,
|
||||
@@ -645,10 +645,16 @@ class TwitterClient:
|
||||
seen_ids.add(user.id)
|
||||
users.append(user)
|
||||
|
||||
if not next_cursor or not new_users:
|
||||
if not next_cursor:
|
||||
break
|
||||
if next_cursor == cursor:
|
||||
logger.debug("User list pagination stopped because cursor did not advance: %s", next_cursor)
|
||||
break
|
||||
cursor = next_cursor
|
||||
|
||||
if not new_users:
|
||||
logger.debug("User list page returned no users but exposed next cursor; continuing pagination")
|
||||
|
||||
if len(users) < count and self._request_delay > 0:
|
||||
time.sleep(self._request_delay * random.uniform(0.7, 1.5))
|
||||
|
||||
@@ -903,12 +909,12 @@ class TwitterClient:
|
||||
"sec-ch-ua": get_sec_ch_ua(),
|
||||
"sec-ch-ua-mobile": SEC_CH_UA_MOBILE,
|
||||
"sec-ch-ua-platform": get_sec_ch_ua_platform(),
|
||||
"sec-ch-ua-arch": SEC_CH_UA_ARCH,
|
||||
"sec-ch-ua-arch": get_sec_ch_ua_arch(),
|
||||
"sec-ch-ua-bitness": SEC_CH_UA_BITNESS,
|
||||
"sec-ch-ua-full-version": get_sec_ch_ua_full_version(),
|
||||
"sec-ch-ua-full-version-list": get_sec_ch_ua_full_version_list(),
|
||||
"sec-ch-ua-model": SEC_CH_UA_MODEL,
|
||||
"sec-ch-ua-platform-version": SEC_CH_UA_PLATFORM_VERSION,
|
||||
"sec-ch-ua-platform-version": get_sec_ch_ua_platform_version(),
|
||||
"Sec-Fetch-Dest": "empty",
|
||||
"Sec-Fetch-Mode": "cors",
|
||||
"Sec-Fetch-Site": "same-origin",
|
||||
|
||||
@@ -95,13 +95,32 @@ def get_sec_ch_ua_platform():
|
||||
return '"Linux"'
|
||||
|
||||
|
||||
def get_sec_ch_ua_arch():
|
||||
# type: () -> str
|
||||
machine = (os.uname().machine if hasattr(os, "uname") else "").lower()
|
||||
if "arm" in machine or "aarch" in machine:
|
||||
return '"arm"'
|
||||
if "86" in machine or "amd64" in machine or "x64" in machine:
|
||||
return '"x86"'
|
||||
return '""'
|
||||
|
||||
|
||||
def get_sec_ch_ua_platform_version():
|
||||
# type: () -> str
|
||||
if sys.platform == "darwin":
|
||||
return '"15.0.0"'
|
||||
if sys.platform.startswith("win"):
|
||||
return '"10.0.0"'
|
||||
return '""'
|
||||
|
||||
|
||||
# Static Client Hints
|
||||
SEC_CH_UA_MOBILE = "?0"
|
||||
SEC_CH_UA_PLATFORM = get_sec_ch_ua_platform()
|
||||
SEC_CH_UA_ARCH = '"arm"' if sys.platform == "darwin" else '"x86"'
|
||||
SEC_CH_UA_ARCH = get_sec_ch_ua_arch()
|
||||
SEC_CH_UA_BITNESS = '"64"'
|
||||
SEC_CH_UA_MODEL = '""'
|
||||
SEC_CH_UA_PLATFORM_VERSION = '"15.0.0"' if sys.platform == "darwin" else '"10.0.0"'
|
||||
SEC_CH_UA_PLATFORM_VERSION = get_sec_ch_ua_platform_version()
|
||||
|
||||
# Legacy aliases — modules that import these get the default value.
|
||||
# _build_headers() should use get_user_agent() / get_sec_ch_ua() instead.
|
||||
|
||||
Reference in New Issue
Block a user