refactor: fix remaining code review issues on kabi-use

- _get_client: remove useless try/except that re-raised same error
- verify_cookies: increase timeout from 3s to 5s
- fetch_user: use _deep_get for URL extraction (consistent with
  _parse_user_result)
- formatter: remove no-op tweets_to_json wrapper and unused import
- _as_int/_as_float: filter.py now imports from config.py (dedup)
- CLI read commands: extract _fetch_and_display() to dedup
  favorite/search/likes/list_timeline
- _write_action: move load_config inside try block
- auth.py: add PEP 8 blank line after logger
This commit is contained in:
jackwener
2026-03-07 21:49:12 +08:00
parent df39a15d00
commit 625181b76c
5 changed files with 57 additions and 129 deletions

View File

@@ -21,6 +21,7 @@ from .constants import BEARER_TOKEN, USER_AGENT
logger = logging.getLogger(__name__)
def load_from_env() -> Optional[Dict[str, str]]:
"""Load cookies from environment variables."""
auth_token = os.environ.get("TWITTER_AUTH_TOKEN", "")
@@ -59,7 +60,7 @@ def verify_cookies(auth_token, ct0):
ctx = ssl.create_default_context()
try:
with urllib.request.urlopen(req, context=ctx, timeout=3) as resp:
with urllib.request.urlopen(req, context=ctx, timeout=5) as resp:
data = json.loads(resp.read().decode("utf-8"))
return {"screen_name": data.get("screen_name", "")}
except urllib.error.HTTPError as e:

View File

@@ -78,10 +78,7 @@ def _get_client(config=None):
# type: (Optional[Dict[str, Any]]) -> TwitterClient
"""Create an authenticated API client."""
console.print("\n🔐 Getting Twitter cookies...")
try:
cookies = get_cookies()
except RuntimeError as exc:
raise RuntimeError(str(exc))
cookies = get_cookies()
rate_limit_config = (config or {}).get("rateLimit")
return TwitterClient(cookies["auth_token"], cookies["ct0"], rate_limit_config)
@@ -118,6 +115,35 @@ def cli(verbose):
_setup_logging(verbose)
def _fetch_and_display(fetch_fn, label, emoji, max_count, as_json, output_file, do_filter):
# type: (Any, str, str, Optional[int], bool, Optional[str], bool) -> None
"""Common fetch-filter-display logic for timeline-like commands."""
config = load_config()
try:
fetch_count = _resolve_fetch_count(max_count, config.get("fetch", {}).get("count", 50))
console.print("%s Fetching %s (%d tweets)...\n" % (emoji, label, fetch_count))
start = time.time()
tweets = fetch_fn(fetch_count)
elapsed = time.time() - start
console.print("✅ Fetched %d %s in %.1fs\n" % (len(tweets), label, elapsed))
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
filtered = _apply_filter(tweets, do_filter, config)
if output_file:
Path(output_file).write_text(tweets_to_json(filtered), encoding="utf-8")
console.print("💾 Saved to %s\n" % output_file)
if as_json:
click.echo(tweets_to_json(filtered))
return
print_tweet_table(filtered, console, title="%s %s%d tweets" % (emoji, label, len(filtered)))
console.print()
@cli.command()
@click.option(
"--type",
@@ -182,30 +208,11 @@ def favorite(max_count, as_json, output_file, do_filter):
# type: (Optional[int], bool, Optional[str], bool) -> None
"""Fetch bookmarked (favorite) tweets."""
config = load_config()
try:
fetch_count = _resolve_fetch_count(max_count, config.get("fetch", {}).get("count", 50))
client = _get_client(config)
console.print("🔖 Fetching favorites (%d tweets)...\n" % fetch_count)
start = time.time()
tweets = client.fetch_bookmarks(fetch_count)
elapsed = time.time() - start
console.print("✅ Fetched %d favorites in %.1fs\n" % (len(tweets), elapsed))
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
filtered = _apply_filter(tweets, do_filter, config)
if output_file:
Path(output_file).write_text(tweets_to_json(filtered), encoding="utf-8")
console.print("💾 Saved to %s\n" % output_file)
if as_json:
click.echo(tweets_to_json(filtered))
return
print_tweet_table(filtered, console, title="🔖 Favorites — %d tweets" % len(filtered))
console.print()
client = _get_client(config)
_fetch_and_display(
lambda count: client.fetch_bookmarks(count),
"favorites", "🔖", max_count, as_json, output_file, do_filter,
)
@cli.command()
@@ -278,26 +285,11 @@ def search(query, product, max_count, as_json, do_filter):
# type: (str, str, int, bool, bool) -> None
"""Search tweets by QUERY string."""
config = load_config()
try:
fetch_count = _resolve_fetch_count(max_count, 20)
client = _get_client(config)
console.print("🔍 Searching '%s' (%s, %d tweets)...\n" % (query, product, fetch_count))
start = time.time()
tweets = client.fetch_search(query, fetch_count, product)
elapsed = time.time() - start
console.print("✅ Found %d tweets in %.1fs\n" % (len(tweets), elapsed))
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
filtered = _apply_filter(tweets, do_filter, config)
if as_json:
click.echo(tweets_to_json(filtered))
return
print_tweet_table(filtered, console, title="🔍 '%s'%d tweets" % (query, len(filtered)))
console.print()
client = _get_client(config)
_fetch_and_display(
lambda count: client.fetch_search(query, count, product),
"'%s' (%s)" % (query, product), "🔍", max_count, as_json, None, do_filter,
)
@cli.command()
@@ -310,28 +302,13 @@ def likes(screen_name, max_count, as_json, do_filter):
"""Show tweets liked by a user. SCREEN_NAME is the @handle (without @)."""
screen_name = screen_name.lstrip("@")
config = load_config()
try:
fetch_count = _resolve_fetch_count(max_count, 20)
client = _get_client(config)
console.print("👤 Fetching @%s's profile..." % screen_name)
profile = client.fetch_user(screen_name)
console.print("❤️ Fetching likes (%d)...\n" % fetch_count)
start = time.time()
tweets = client.fetch_user_likes(profile.id, fetch_count)
elapsed = time.time() - start
console.print("✅ Fetched %d liked tweets in %.1fs\n" % (len(tweets), elapsed))
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
filtered = _apply_filter(tweets, do_filter, config)
if as_json:
click.echo(tweets_to_json(filtered))
return
print_tweet_table(filtered, console, title="❤️ @%s likes — %d tweets" % (screen_name, len(filtered)))
console.print()
client = _get_client(config)
console.print("👤 Fetching @%s's profile..." % screen_name)
profile = client.fetch_user(screen_name)
_fetch_and_display(
lambda count: client.fetch_user_likes(profile.id, count),
"@%s likes" % screen_name, "❤️", max_count, as_json, None, do_filter,
)
@cli.command()
@@ -376,26 +353,11 @@ def list_timeline(list_id, max_count, as_json, do_filter):
# type: (str, int, bool, bool) -> None
"""Fetch tweets from a Twitter List. LIST_ID is the numeric list ID."""
config = load_config()
try:
fetch_count = _resolve_fetch_count(max_count, 20)
client = _get_client(config)
console.print("📋 Fetching list %s (%d tweets)...\n" % (list_id, fetch_count))
start = time.time()
tweets = client.fetch_list_timeline(list_id, fetch_count)
elapsed = time.time() - start
console.print("✅ Fetched %d tweets in %.1fs\n" % (len(tweets), elapsed))
except RuntimeError as exc:
console.print("[red]❌ %s[/red]" % exc)
sys.exit(1)
filtered = _apply_filter(tweets, do_filter, config)
if as_json:
click.echo(tweets_to_json(filtered))
return
print_tweet_table(filtered, console, title="📋 List — %d tweets" % len(filtered))
console.print()
client = _get_client(config)
_fetch_and_display(
lambda count: client.fetch_list_timeline(list_id, count),
"list %s" % list_id, "📋", max_count, as_json, None, do_filter,
)
@cli.command()
@@ -463,8 +425,8 @@ def following(screen_name, max_count, as_json):
def _write_action(emoji, action_desc, client_method, tweet_id):
# type: (str, str, str, str) -> None
"""Generic write action helper to reduce CLI command boilerplate."""
config = load_config()
try:
config = load_config()
client = _get_client(config)
console.print("%s %s %s..." % (emoji, action_desc, tweet_id))
getattr(client, client_method)(tweet_id)

View File

@@ -311,11 +311,7 @@ class TwitterClient:
screen_name=core.get("screen_name") or legacy.get("screen_name", screen_name),
bio=legacy.get("description", ""),
location=legacy.get("location", ""),
url=(
legacy.get("entities", {}).get("url", {}).get("urls", [{}])[0].get("expanded_url", "")
if legacy.get("entities", {}).get("url")
else ""
),
url=_deep_get(legacy, "entities", "url", "urls", 0, "expanded_url") or "",
followers_count=_to_int(legacy.get("followers_count"), 0),
following_count=_to_int(legacy.get("friends_count"), 0),
tweets_count=_to_int(legacy.get("statuses_count"), 0),

View File

@@ -8,12 +8,8 @@ from __future__ import annotations
from dataclasses import replace
import math
from typing import Mapping
# Type alias for filter weights dict
FilterWeights = Mapping[str, float]
from .config import _as_float, _as_int
DEFAULT_WEIGHTS = {
"likes": 1.0,
@@ -95,21 +91,3 @@ def _build_weights(raw_weights):
for key, default_value in DEFAULT_WEIGHTS.items():
merged[key] = _as_float(raw_weights.get(key), default_value)
return merged
def _as_int(value, default):
# type: (Any, int) -> int
"""Best-effort int conversion."""
try:
return int(value)
except (TypeError, ValueError):
return default
def _as_float(value, default):
# type: (Any, float) -> float
"""Best-effort float conversion."""
try:
return float(value)
except (TypeError, ValueError):
return default

View File

@@ -7,8 +7,6 @@ from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from .serialization import tweets_to_json as _tweets_to_json
def format_number(n):
# type: (int) -> str
@@ -161,13 +159,6 @@ def print_filter_stats(original_count, filtered, console=None):
" Score range: %.1f ~ %.1f" % (bottom_score, top_score)
)
def tweets_to_json(tweets):
# type: (List[Tweet]) -> str
"""Export tweets as JSON string."""
return _tweets_to_json(tweets)
def print_user_profile(user, console=None):
# type: (UserProfile, Optional[Console]) -> None
"""Print user profile as a rich panel."""