feat: anti-detection hardening with curl_cffi TLS impersonation and request jitter

This commit is contained in:
jackwener
2026-03-09 17:11:59 +08:00
parent 3e13ed3749
commit 27d73efee5
6 changed files with 369 additions and 88 deletions

View File

@@ -5,27 +5,25 @@ from __future__ import annotations
import json
import logging
import math
import random
import re
import time
import ssl
import urllib.error
import urllib.parse
import urllib.request
import bs4
import requests as _requests_lib
from curl_cffi import requests as _cffi_requests
from x_client_transaction import ClientTransaction
from x_client_transaction.utils import generate_headers as _gen_ct_headers, get_ondemand_file_url
from .constants import BEARER_TOKEN, USER_AGENT, SEC_CH_UA, SEC_CH_UA_MOBILE, SEC_CH_UA_PLATFORM
from .models import Author, Metrics, Tweet, TweetMedia, UserProfile
try:
import bs4
import requests as _requests_lib
from x_client_transaction import ClientTransaction
from x_client_transaction.utils import generate_headers as _gen_ct_headers, get_ondemand_file_url
_HAS_XCLIENT = True
except ImportError: # pragma: no cover
_HAS_XCLIENT = False
logger = logging.getLogger(__name__)
# Shared curl_cffi session — impersonates Chrome 133 TLS/JA3/HTTP2 fingerprint
_cffi_session = None # type: Optional[Any] # lazy init
FALLBACK_QUERY_IDS = {
# Read operations
@@ -110,25 +108,22 @@ class TwitterAPIError(RuntimeError):
self.status_code = status_code
# Reuse a single SSL context across all requests (avoids re-reading CA certs)
_SSL_CTX = ssl.create_default_context()
def _create_ssl_context():
# type: () -> ssl.SSLContext
"""Return shared SSL context."""
return _SSL_CTX
def _get_cffi_session():
# type: () -> Any
"""Return shared curl_cffi session with Chrome impersonation."""
global _cffi_session
if _cffi_session is None:
_cffi_session = _cffi_requests.Session(impersonate="chrome133")
return _cffi_session
def _url_fetch(url, headers=None):
# type: (str, Optional[Dict[str, str]]) -> str
"""Simple URL fetch for metadata/bootstrap lookups."""
req = urllib.request.Request(url)
if headers:
for key, value in headers.items():
req.add_header(key, value)
with urllib.request.urlopen(req, context=_create_ssl_context(), timeout=30) as response:
return response.read().decode("utf-8")
"""URL fetch using curl_cffi for proper TLS fingerprint."""
session = _get_cffi_session()
resp = session.get(url, headers=headers or {}, timeout=30)
resp.raise_for_status()
return resp.text
def _build_graphql_url(query_id, operation_name, variables, features, field_toggles=None):
@@ -245,12 +240,14 @@ class TwitterClient:
self._auth_token = auth_token
self._ct0 = ct0
rl = rate_limit_config or {}
self._request_delay = float(rl.get("requestDelay", 1.5))
self._request_delay = float(rl.get("requestDelay", 2.5))
self._max_retries = int(rl.get("maxRetries", 3))
self._retry_base_delay = float(rl.get("retryBaseDelay", 5.0))
self._max_count = min(int(rl.get("maxCount", 200)), _ABSOLUTE_MAX_COUNT)
self._client_transaction = None # type: Optional[Any] # lazy init
self._client_transaction = None # type: Optional[Any]
self._ct_init_attempted = False
# Eagerly initialize ClientTransaction on construction
self._ensure_client_transaction()
def fetch_home_timeline(self, count=20):
# type: (int) -> List[Tweet]
@@ -549,10 +546,11 @@ class TwitterClient:
break
cursor = next_cursor
# Rate-limit: sleep between paginated requests
# Rate-limit: sleep between paginated requests with jitter
if len(tweets) < count and self._request_delay > 0:
logger.debug("Sleeping %.1fs between requests", self._request_delay)
time.sleep(self._request_delay)
jitter = self._request_delay * random.uniform(0.7, 1.5)
logger.debug("Sleeping %.1fs between requests", jitter)
time.sleep(jitter)
return tweets[:count]
@@ -577,8 +575,8 @@ class TwitterClient:
def _ensure_client_transaction(self):
# type: () -> None
"""Lazily initialize ClientTransaction for x-client-transaction-id header."""
if self._ct_init_attempted or not _HAS_XCLIENT:
"""Initialize ClientTransaction for x-client-transaction-id header."""
if self._ct_init_attempted:
return
self._ct_init_attempted = True
try:
@@ -607,6 +605,7 @@ class TwitterClient:
"X-Twitter-Auth-Type": "OAuth2Session",
"X-Twitter-Client-Language": "en",
"User-Agent": USER_AGENT,
"Origin": "https://x.com",
"Referer": "https://x.com",
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9",
@@ -664,38 +663,44 @@ class TwitterClient:
# type: (str, str, Optional[Dict[str, Any]]) -> Dict[str, Any]
"""Make authenticated request to Twitter API with retry on rate limits.
Uses curl_cffi for Chrome TLS/JA3/HTTP2 fingerprint impersonation.
Handles both GET and POST. Retries on HTTP 429 and JSON error code 88.
"""
self._ensure_client_transaction()
headers = self._build_headers(url=url, method=method)
encoded_body = json.dumps(body).encode("utf-8") if body else None
session = _get_cffi_session()
json_body = body # curl_cffi handles JSON serialization
for attempt in range(self._max_retries + 1):
request = urllib.request.Request(url, data=encoded_body, method=method)
for key, value in headers.items():
request.add_header(key, value)
try:
with urllib.request.urlopen(request, context=_create_ssl_context(), timeout=30) as response:
payload = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
if exc.code == 429 and attempt < self._max_retries:
wait = self._retry_base_delay * (2 ** attempt)
if method == "POST":
response = session.post(
url, headers=headers, json=json_body, timeout=30,
)
else:
response = session.get(url, headers=headers, timeout=30)
status_code = response.status_code
if status_code == 429 and attempt < self._max_retries:
wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2)
logger.warning(
"Rate limited (429), retrying in %.1fs (attempt %d/%d)",
wait, attempt + 1, self._max_retries,
)
time.sleep(wait)
continue
resp_body = exc.read().decode("utf-8", errors="replace")
message = "Twitter API error %d: %s" % (exc.code, resp_body[:500])
raise TwitterAPIError(exc.code, message)
except urllib.error.URLError as exc:
raise TwitterAPIError(0, "Twitter API network error: %s" % exc.reason)
if status_code >= 400:
message = "Twitter API error %d: %s" % (status_code, response.text[:500])
raise TwitterAPIError(status_code, message)
payload = response.text
except TwitterAPIError:
raise
except Exception as exc:
raise TwitterAPIError(0, "Twitter API network error: %s" % exc)
try:
parsed = json.loads(payload)
except json.JSONDecodeError:
except (json.JSONDecodeError, ValueError):
raise TwitterAPIError(0, "Twitter API returned invalid JSON")
if isinstance(parsed, dict) and parsed.get("errors"):
@@ -703,7 +708,7 @@ class TwitterClient:
# Rate limit can also surface as a JSON error (code 88)
err_code = parsed["errors"][0].get("code", 0)
if err_code == 88 and attempt < self._max_retries:
wait = self._retry_base_delay * (2 ** attempt)
wait = self._retry_base_delay * (2 ** attempt) + random.uniform(0, 2)
logger.warning(
"Rate limited (code 88), retrying in %.1fs (attempt %d/%d)",
wait, attempt + 1, self._max_retries,
@@ -773,7 +778,7 @@ class TwitterClient:
cursor = next_cursor
if len(users) < count and self._request_delay > 0:
time.sleep(self._request_delay)
time.sleep(self._request_delay * random.uniform(0.7, 1.5))
return users[:count]