fix: harden auth flow and sync browser support docs

This commit is contained in:
jackwener
2026-03-10 11:02:32 +08:00
parent db7d7e8874
commit 19ab11d6a4
8 changed files with 572 additions and 170 deletions

View File

@@ -36,6 +36,12 @@ def load_from_env() -> Optional[Dict[str, str]]:
ct0 = os.environ.get("TWITTER_CT0", "")
if auth_token and ct0:
return {"auth_token": auth_token, "ct0": ct0}
if auth_token or ct0:
logger.debug(
"Environment cookies incomplete: auth_token=%s ct0=%s",
bool(auth_token),
bool(ct0),
)
return None
@@ -68,8 +74,15 @@ def verify_cookies(auth_token, ct0, cookie_string=None):
# Reuse the shared curl_cffi session for consistent TLS fingerprint
session = _get_cffi_session()
attempts = []
logger.debug(
"Verifying Twitter cookies with %s cookie header",
"full forwarded" if cookie_string else "minimal",
)
for url in urls:
endpoint = url.split("/")[-1]
try:
resp = session.get(url, headers=headers, timeout=5)
if resp.status_code in (401, 403):
@@ -78,28 +91,37 @@ def verify_cookies(auth_token, ct0, cookie_string=None):
)
if resp.status_code == 200:
data = resp.json()
attempts.append("%s=200" % endpoint)
logger.debug("Cookie verification succeeded via %s", endpoint)
return {"screen_name": data.get("screen_name", "")}
attempts.append("%s=%d" % (endpoint, resp.status_code))
logger.debug("Verification endpoint %s returned HTTP %d, trying next...", url, resp.status_code)
continue
except RuntimeError:
raise
except Exception as e:
attempts.append("%s=%s" % (endpoint, type(e).__name__))
logger.debug("Verification endpoint %s failed: %s", url, e)
continue
# All endpoints failed with non-auth errors — proceed without verification
logger.info("Cookie verification skipped (no working endpoint), will verify on first API call")
logger.info(
"Cookie verification skipped (attempts: %s), will verify on first API call",
", ".join(attempts) if attempts else "none",
)
return {}
def _extract_cookies_from_jar(jar):
# type: (Any) -> Optional[Dict[str, str]]
def _extract_cookies_from_jar(jar, source="unknown"):
# type: (Any, str) -> Optional[Dict[str, str]]
"""Extract Twitter cookies from a cookie jar."""
result = {} # type: Dict[str, str]
all_cookies = {} # type: Dict[str, str]
twitter_cookie_count = 0
for cookie in jar:
domain = cookie.domain or ""
if _is_twitter_domain(domain):
twitter_cookie_count += 1
if cookie.name == "auth_token":
result["auth_token"] = cookie.value
elif cookie.name == "ct0":
@@ -112,6 +134,13 @@ def _extract_cookies_from_jar(jar):
cookies["cookie_string"] = "; ".join("%s=%s" % (k, v) for k, v in all_cookies.items())
logger.info("Extracted %d total cookies for full browser fingerprint", len(all_cookies))
return cookies
logger.debug(
"Cookie jar %s did not contain usable Twitter auth cookies (twitter_cookies=%d, auth_token=%s, ct0=%s)",
source,
twitter_cookie_count,
"auth_token" in result,
"ct0" in result,
)
return None
@@ -136,17 +165,22 @@ def _extract_in_process():
("firefox", browser_cookie3.firefox),
("brave", browser_cookie3.brave),
]
attempts = []
for name, fn in browsers:
try:
jar = fn()
except Exception as e:
logger.debug("%s in-process extraction failed: %s", name, e)
attempts.append("%s=%s" % (name, type(e).__name__))
continue
cookies = _extract_cookies_from_jar(jar)
cookies = _extract_cookies_from_jar(jar, source="%s(in-process)" % name)
if cookies:
logger.info("Found cookies in %s (in-process)", name)
return cookies
attempts.append("%s=no-cookies" % name)
if attempts:
logger.debug("In-process extraction attempts: %s", ", ".join(attempts))
return None
@@ -168,11 +202,13 @@ browsers = [
("firefox", browser_cookie3.firefox),
("brave", browser_cookie3.brave),
]
attempts = []
for name, fn in browsers:
try:
jar = fn()
except Exception:
except Exception as exc:
attempts.append(f"{name}={type(exc).__name__}")
continue
result = {}
all_cookies = {}
@@ -190,8 +226,14 @@ for name, fn in browsers:
result["all_cookies"] = all_cookies
print(json.dumps(result))
sys.exit(0)
attempts.append(
f"{name}=no-cookies(auth_token={'auth_token' in result},ct0={'ct0' in result})"
)
print(json.dumps({"error": "No Twitter cookies found in any browser. Make sure you are logged into x.com."}))
print(json.dumps({
"error": "No Twitter cookies found in any browser. Make sure you are logged into x.com.",
"attempts": attempts,
}))
sys.exit(1)
'''
@@ -221,6 +263,9 @@ sys.exit(1)
data = json.loads(output)
if "error" in data:
attempts = data.get("attempts") or []
if attempts:
logger.debug("Subprocess extraction attempts: %s", ", ".join(str(item) for item in attempts))
return None
logger.info("Found cookies in %s (subprocess)", data.get("browser", "unknown"))
@@ -232,7 +277,17 @@ sys.exit(1)
cookies["cookie_string"] = cookie_str
logger.info("Extracted %d total cookies for full browser fingerprint", len(all_cookies))
return cookies
except (subprocess.TimeoutExpired, json.JSONDecodeError, KeyError, FileNotFoundError):
except subprocess.TimeoutExpired:
logger.debug("Cookie extraction subprocess timed out")
return None
except json.JSONDecodeError as exc:
logger.debug("Cookie extraction subprocess returned invalid JSON: %s", exc)
return None
except KeyError as exc:
logger.debug("Cookie extraction subprocess returned incomplete payload: %s", exc)
return None
except FileNotFoundError as exc:
logger.debug("Cookie extraction subprocess launcher missing: %s", exc)
return None
@@ -250,11 +305,14 @@ def extract_from_browser() -> Optional[Dict[str, str]]:
# 2. Subprocess fallback (handles SQLite lock, but fails on macOS Keychain)
logger.debug("In-process extraction failed, trying subprocess fallback")
return _extract_via_subprocess()
cookies = _extract_via_subprocess()
if not cookies:
logger.warning("Twitter cookie extraction failed in both in-process and subprocess modes")
return cookies
def get_cookies() -> Dict[str, str]:
"""Get Twitter cookies. Priority: env vars -> cache file -> browser extraction.
"""Get Twitter cookies. Priority: env vars -> browser extraction.
Raises RuntimeError if no cookies found.
"""
@@ -265,17 +323,10 @@ def get_cookies() -> Dict[str, str]:
if cookies:
logger.info("Loaded cookies from environment variables")
# 2. Try cached cookies (file cache with TTL)
if not cookies:
cookies = _load_cookie_cache()
if cookies:
logger.info("Loaded cookies from cache")
# 3. Try browser extraction (auto-detect)
# 2. Try browser extraction (auto-detect)
if not cookies:
logger.debug("Attempting browser cookie extraction")
cookies = extract_from_browser()
if cookies:
_save_cookie_cache(cookies)
if not cookies:
raise RuntimeError(
@@ -288,66 +339,12 @@ def get_cookies() -> Dict[str, str]:
try:
verify_cookies(cookies["auth_token"], cookies["ct0"], cookies.get("cookie_string"))
except RuntimeError:
# Auth failure — invalidate cache and re-extract from browser
logger.info("Cookie verification failed, invalidating cache and re-extracting")
invalidate_cookie_cache()
# Auth failure — re-extract from browser and retry verification
logger.info("Cookie verification failed, re-extracting from browser")
fresh_cookies = extract_from_browser()
if fresh_cookies:
_save_cookie_cache(fresh_cookies)
# Verify fresh cookies — if this also fails, let it raise
verify_cookies(fresh_cookies["auth_token"], fresh_cookies["ct0"], fresh_cookies.get("cookie_string"))
return fresh_cookies
raise
return cookies
# ── Cookie file cache ───────────────────────────────────────────────────
_CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "twitter-cli")
_CACHE_FILE = os.path.join(_CACHE_DIR, "cookies.json")
_CACHE_TTL_SECONDS = 24 * 3600 # 24 hours
def _load_cookie_cache():
# type: () -> Optional[Dict[str, str]]
"""Load cookies from file cache if within TTL."""
try:
if not os.path.exists(_CACHE_FILE):
return None
import time as _time
mtime = os.path.getmtime(_CACHE_FILE)
if _time.time() - mtime > _CACHE_TTL_SECONDS:
logger.debug("Cookie cache expired (>%ds)", _CACHE_TTL_SECONDS)
return None
with open(_CACHE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict) and "auth_token" in data and "ct0" in data:
return data
except Exception as exc:
logger.debug("Failed to load cookie cache: %s", exc)
return None
def _save_cookie_cache(cookies):
# type: (Dict[str, str]) -> None
"""Save cookies to file cache."""
try:
os.makedirs(_CACHE_DIR, exist_ok=True)
with open(_CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(cookies, f, ensure_ascii=False)
# Restrict permissions — cookies are sensitive
os.chmod(_CACHE_FILE, 0o600)
logger.info("Saved cookies to cache (%s)", _CACHE_FILE)
except Exception as exc:
logger.debug("Failed to save cookie cache: %s", exc)
def invalidate_cookie_cache():
# type: () -> None
"""Delete the cookie cache file."""
try:
if os.path.exists(_CACHE_FILE):
os.remove(_CACHE_FILE)
logger.info("Cookie cache invalidated")
except Exception as exc:
logger.debug("Failed to invalidate cookie cache: %s", exc)