feat: add rate limiting, retry with backoff, and max count cap
- Add configurable request delay between paginated API calls (default 1.5s) - Add retry with exponential backoff on HTTP 429 and Twitter error code 88 - Add hard max count cap (default 200, absolute ceiling 500) - Add rateLimit config section with requestDelay, maxRetries, retryBaseDelay, maxCount - Add normalization tests for rateLimit config
This commit is contained in:
@@ -6,6 +6,7 @@ import json
|
||||
import logging
|
||||
import math
|
||||
import re
|
||||
import time
|
||||
import ssl
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
@@ -201,13 +202,22 @@ def _resolve_query_id(operation_name, prefer_fallback=True):
|
||||
raise RuntimeError('Cannot resolve queryId for "%s"' % operation_name)
|
||||
|
||||
|
||||
# Hard ceiling to prevent accidental massive fetches
|
||||
_ABSOLUTE_MAX_COUNT = 500
|
||||
|
||||
|
||||
class TwitterClient:
|
||||
"""Twitter GraphQL API client using cookie authentication."""
|
||||
|
||||
def __init__(self, auth_token, ct0):
|
||||
# type: (str, str) -> None
|
||||
def __init__(self, auth_token, ct0, rate_limit_config=None):
|
||||
# type: (str, str, Optional[Dict[str, Any]]) -> None
|
||||
self._auth_token = auth_token
|
||||
self._ct0 = ct0
|
||||
rl = rate_limit_config or {}
|
||||
self._request_delay = float(rl.get("requestDelay", 1.5))
|
||||
self._max_retries = int(rl.get("maxRetries", 3))
|
||||
self._retry_base_delay = float(rl.get("retryBaseDelay", 5.0))
|
||||
self._max_count = min(int(rl.get("maxCount", 200)), _ABSOLUTE_MAX_COUNT)
|
||||
|
||||
def fetch_home_timeline(self, count=20):
|
||||
# type: (int) -> List[Tweet]
|
||||
@@ -308,6 +318,9 @@ class TwitterClient:
|
||||
if count <= 0:
|
||||
return []
|
||||
|
||||
# Enforce max count cap
|
||||
count = min(count, self._max_count)
|
||||
|
||||
tweets = [] # type: List[Tweet]
|
||||
seen_ids = set() # type: Set[str]
|
||||
cursor = None # type: Optional[str]
|
||||
@@ -339,6 +352,11 @@ class TwitterClient:
|
||||
break
|
||||
cursor = next_cursor
|
||||
|
||||
# Rate-limit: sleep between paginated requests
|
||||
if len(tweets) < count and self._request_delay > 0:
|
||||
logger.debug("Sleeping %.1fs between requests", self._request_delay)
|
||||
time.sleep(self._request_delay)
|
||||
|
||||
return tweets[:count]
|
||||
|
||||
def _graphql_get(self, operation_name, variables, features):
|
||||
@@ -379,31 +397,54 @@ class TwitterClient:
|
||||
|
||||
def _api_get(self, url):
|
||||
# type: (str) -> Dict[str, Any]
|
||||
"""Make authenticated GET request to Twitter API."""
|
||||
"""Make authenticated GET request to Twitter API with retry on 429."""
|
||||
headers = self._build_headers()
|
||||
request = urllib.request.Request(url)
|
||||
for key, value in headers.items():
|
||||
request.add_header(key, value)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(request, context=_create_ssl_context(), timeout=30) as response:
|
||||
payload = response.read().decode("utf-8")
|
||||
except urllib.error.HTTPError as exc:
|
||||
body = exc.read().decode("utf-8", errors="replace")
|
||||
message = "Twitter API error %d: %s" % (exc.code, body[:500])
|
||||
raise TwitterAPIError(exc.code, message)
|
||||
except urllib.error.URLError as exc:
|
||||
raise TwitterAPIError(0, "Twitter API network error: %s" % exc.reason)
|
||||
for attempt in range(self._max_retries + 1):
|
||||
request = urllib.request.Request(url)
|
||||
for key, value in headers.items():
|
||||
request.add_header(key, value)
|
||||
|
||||
try:
|
||||
parsed = json.loads(payload)
|
||||
except json.JSONDecodeError:
|
||||
raise TwitterAPIError(0, "Twitter API returned invalid JSON")
|
||||
try:
|
||||
with urllib.request.urlopen(request, context=_create_ssl_context(), timeout=30) as response:
|
||||
payload = response.read().decode("utf-8")
|
||||
except urllib.error.HTTPError as exc:
|
||||
if exc.code == 429 and attempt < self._max_retries:
|
||||
wait = self._retry_base_delay * (2 ** attempt)
|
||||
logger.warning(
|
||||
"Rate limited (429), retrying in %.1fs (attempt %d/%d)",
|
||||
wait, attempt + 1, self._max_retries,
|
||||
)
|
||||
time.sleep(wait)
|
||||
continue
|
||||
body = exc.read().decode("utf-8", errors="replace")
|
||||
message = "Twitter API error %d: %s" % (exc.code, body[:500])
|
||||
raise TwitterAPIError(exc.code, message)
|
||||
except urllib.error.URLError as exc:
|
||||
raise TwitterAPIError(0, "Twitter API network error: %s" % exc.reason)
|
||||
|
||||
if isinstance(parsed, dict) and parsed.get("errors"):
|
||||
message = parsed["errors"][0].get("message", "Unknown error")
|
||||
raise TwitterAPIError(0, "Twitter API returned errors: %s" % message)
|
||||
return parsed
|
||||
try:
|
||||
parsed = json.loads(payload)
|
||||
except json.JSONDecodeError:
|
||||
raise TwitterAPIError(0, "Twitter API returned invalid JSON")
|
||||
|
||||
if isinstance(parsed, dict) and parsed.get("errors"):
|
||||
err_msg = parsed["errors"][0].get("message", "Unknown error")
|
||||
# Rate limit can also surface as a JSON error (code 88)
|
||||
err_code = parsed["errors"][0].get("code", 0)
|
||||
if err_code == 88 and attempt < self._max_retries:
|
||||
wait = self._retry_base_delay * (2 ** attempt)
|
||||
logger.warning(
|
||||
"Rate limited (code 88), retrying in %.1fs (attempt %d/%d)",
|
||||
wait, attempt + 1, self._max_retries,
|
||||
)
|
||||
time.sleep(wait)
|
||||
continue
|
||||
raise TwitterAPIError(0, "Twitter API returned errors: %s" % err_msg)
|
||||
return parsed
|
||||
|
||||
# Should not be reached, but just in case
|
||||
raise TwitterAPIError(429, "Rate limited after %d retries" % self._max_retries)
|
||||
|
||||
def _parse_timeline_response(self, data, get_instructions):
|
||||
# type: (Any, Callable[[Any], Any]) -> Tuple[List[Tweet], Optional[str]]
|
||||
|
||||
Reference in New Issue
Block a user