refactor: unify exception handling, add ISO 8601 time, dedup commands, expand tests
- Replace _error_code_for_message() string matching with error_code attribute on exception classes - Add error_code to all TwitterError subclasses (AuthenticationError, RateLimitError, etc.) - Add InvalidInputError exception class - TwitterAPIError derives error_code from HTTP status code automatically - auth.py: use AuthenticationError instead of RuntimeError - cli.py: catch (TwitterError, RuntimeError) for backward compat - Extract _fetch_and_display_users() to deduplicate followers/following commands - Add format_iso8601() to timeutil.py - Add createdAtISO field to tweet and user profile serialization - New test files: test_output.py, test_cache.py, test_timeutil.py - Expand test_filter.py (topN, score mode, custom weights, empty input) - Tests: 152 → 194 unit tests, all passing
This commit is contained in:
101
tests/test_cache.py
Normal file
101
tests/test_cache.py
Normal file
@@ -0,0 +1,101 @@
|
||||
"""Tests for twitter_cli.cache module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
|
||||
from twitter_cli.cache import resolve_cached_tweet, save_tweet_cache
|
||||
from twitter_cli.models import Author, Metrics, Tweet
|
||||
|
||||
|
||||
def _make_tweet(tweet_id: str, text: str = "hello") -> Tweet:
|
||||
return Tweet(
|
||||
id=tweet_id,
|
||||
text=text,
|
||||
author=Author(id="u1", name="Alice", screen_name="alice"),
|
||||
metrics=Metrics(likes=1),
|
||||
created_at="2025-01-01",
|
||||
)
|
||||
|
||||
|
||||
class TestSaveAndResolve:
|
||||
"""save_tweet_cache → resolve_cached_tweet round-trip."""
|
||||
|
||||
def test_round_trip(self, tmp_path, monkeypatch) -> None:
|
||||
cache_file = tmp_path / "last_results.json"
|
||||
monkeypatch.setattr("twitter_cli.cache._CACHE_FILE", cache_file)
|
||||
monkeypatch.setattr("twitter_cli.cache._CACHE_DIR", tmp_path)
|
||||
|
||||
tweets = [_make_tweet("100"), _make_tweet("200"), _make_tweet("300")]
|
||||
save_tweet_cache(tweets)
|
||||
|
||||
assert cache_file.exists()
|
||||
tweet_id, size = resolve_cached_tweet(1)
|
||||
assert tweet_id == "100"
|
||||
assert size == 3
|
||||
|
||||
tweet_id, size = resolve_cached_tweet(3)
|
||||
assert tweet_id == "300"
|
||||
assert size == 3
|
||||
|
||||
def test_out_of_range_returns_none(self, tmp_path, monkeypatch) -> None:
|
||||
cache_file = tmp_path / "last_results.json"
|
||||
monkeypatch.setattr("twitter_cli.cache._CACHE_FILE", cache_file)
|
||||
monkeypatch.setattr("twitter_cli.cache._CACHE_DIR", tmp_path)
|
||||
|
||||
save_tweet_cache([_make_tweet("100")])
|
||||
|
||||
tweet_id, size = resolve_cached_tweet(99)
|
||||
assert tweet_id is None
|
||||
assert size == 1
|
||||
|
||||
|
||||
class TestCacheExpiry:
|
||||
"""TTL expiration behavior."""
|
||||
|
||||
def test_expired_cache_returns_none(self, tmp_path, monkeypatch) -> None:
|
||||
cache_file = tmp_path / "last_results.json"
|
||||
monkeypatch.setattr("twitter_cli.cache._CACHE_FILE", cache_file)
|
||||
|
||||
# Write cache with old timestamp
|
||||
payload = {
|
||||
"created_at": time.time() - 7200, # 2 hours ago
|
||||
"tweets": [{"index": 1, "id": "100", "author": "alice", "text": "hi"}],
|
||||
}
|
||||
cache_file.write_text(json.dumps(payload), encoding="utf-8")
|
||||
|
||||
tweet_id, size = resolve_cached_tweet(1)
|
||||
assert tweet_id is None
|
||||
assert size == 0
|
||||
|
||||
|
||||
class TestCacheEdgeCases:
|
||||
"""Corrupted and missing cache files."""
|
||||
|
||||
def test_missing_file(self, tmp_path, monkeypatch) -> None:
|
||||
cache_file = tmp_path / "does_not_exist.json"
|
||||
monkeypatch.setattr("twitter_cli.cache._CACHE_FILE", cache_file)
|
||||
|
||||
tweet_id, size = resolve_cached_tweet(1)
|
||||
assert tweet_id is None
|
||||
assert size == 0
|
||||
|
||||
def test_corrupted_json(self, tmp_path, monkeypatch) -> None:
|
||||
cache_file = tmp_path / "last_results.json"
|
||||
cache_file.write_text("{{invalid json", encoding="utf-8")
|
||||
monkeypatch.setattr("twitter_cli.cache._CACHE_FILE", cache_file)
|
||||
|
||||
tweet_id, size = resolve_cached_tweet(1)
|
||||
assert tweet_id is None
|
||||
assert size == 0
|
||||
|
||||
def test_wrong_structure(self, tmp_path, monkeypatch) -> None:
|
||||
cache_file = tmp_path / "last_results.json"
|
||||
cache_file.write_text('"just a string"', encoding="utf-8")
|
||||
monkeypatch.setattr("twitter_cli.cache._CACHE_FILE", cache_file)
|
||||
|
||||
tweet_id, size = resolve_cached_tweet(1)
|
||||
assert tweet_id is None
|
||||
assert size == 0
|
||||
@@ -104,10 +104,11 @@ def test_cli_commands_wrap_client_creation_errors(monkeypatch, args) -> None:
|
||||
|
||||
|
||||
def test_cli_user_error_yaml(monkeypatch) -> None:
|
||||
from twitter_cli.exceptions import NotFoundError
|
||||
monkeypatch.setenv("OUTPUT", "auto")
|
||||
monkeypatch.setattr(
|
||||
"twitter_cli.cli._get_client",
|
||||
lambda config=None, quiet=False: (_ for _ in ()).throw(RuntimeError("User not found")),
|
||||
lambda config=None, quiet=False: (_ for _ in ()).throw(NotFoundError("User not found")),
|
||||
)
|
||||
runner = CliRunner()
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
"""Tests for twitter_cli.filter module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from twitter_cli.filter import filter_tweets
|
||||
from twitter_cli.filter import filter_tweets, score_tweet
|
||||
|
||||
|
||||
def test_filter_tweets_does_not_mutate_input(tweet_factory) -> None:
|
||||
@@ -29,3 +31,58 @@ def test_filter_tweets_applies_language_and_retweet_filters(tweet_factory) -> No
|
||||
)
|
||||
|
||||
assert [tweet.id for tweet in output] == ["1"]
|
||||
|
||||
|
||||
def test_filter_topn_mode(tweet_factory) -> None:
|
||||
tweets = [tweet_factory(str(i)) for i in range(10)]
|
||||
output = filter_tweets(tweets, {"mode": "topN", "topN": 3})
|
||||
assert len(output) == 3
|
||||
|
||||
|
||||
def test_filter_topn_default(tweet_factory) -> None:
|
||||
"""Default topN is 20, so 5 tweets should all be returned."""
|
||||
tweets = [tweet_factory(str(i)) for i in range(5)]
|
||||
output = filter_tweets(tweets, {"mode": "topN"})
|
||||
assert len(output) == 5
|
||||
|
||||
|
||||
def test_filter_score_mode(tweet_factory) -> None:
|
||||
from twitter_cli.models import Metrics
|
||||
|
||||
tweets = [
|
||||
tweet_factory("high", metrics=Metrics(likes=1000, retweets=500, replies=200, views=100000, bookmarks=50)),
|
||||
tweet_factory("low", metrics=Metrics(likes=0, retweets=0, replies=0, views=1, bookmarks=0)),
|
||||
]
|
||||
output = filter_tweets(tweets, {"mode": "score", "minScore": 100.0})
|
||||
assert len(output) == 1
|
||||
assert output[0].id == "high"
|
||||
|
||||
|
||||
def test_filter_empty_input() -> None:
|
||||
output = filter_tweets([], {"mode": "all"})
|
||||
assert output == []
|
||||
|
||||
|
||||
def test_score_tweet_basic(tweet_factory) -> None:
|
||||
tweet = tweet_factory("1")
|
||||
score = score_tweet(tweet)
|
||||
assert isinstance(score, float)
|
||||
assert score > 0
|
||||
|
||||
|
||||
def test_score_tweet_custom_weights(tweet_factory) -> None:
|
||||
tweet = tweet_factory("1")
|
||||
weights = {"likes": 0, "retweets": 0, "replies": 0, "bookmarks": 0, "views_log": 0}
|
||||
assert score_tweet(tweet, weights) == 0.0
|
||||
|
||||
|
||||
def test_filter_all_mode_sorts_by_score(tweet_factory) -> None:
|
||||
from twitter_cli.models import Metrics
|
||||
|
||||
tweets = [
|
||||
tweet_factory("low", metrics=Metrics(likes=1)),
|
||||
tweet_factory("high", metrics=Metrics(likes=100)),
|
||||
]
|
||||
output = filter_tweets(tweets, {"mode": "all"})
|
||||
assert output[0].id == "high"
|
||||
assert output[1].id == "low"
|
||||
|
||||
147
tests/test_output.py
Normal file
147
tests/test_output.py
Normal file
@@ -0,0 +1,147 @@
|
||||
"""Tests for twitter_cli.output module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
import click
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from twitter_cli.output import (
|
||||
default_structured_format,
|
||||
emit_structured,
|
||||
ensure_utf8_streams,
|
||||
error_payload,
|
||||
success_payload,
|
||||
use_rich_output,
|
||||
)
|
||||
|
||||
|
||||
# ── ensure_utf8_streams ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_ensure_utf8_streams_no_error() -> None:
|
||||
"""ensure_utf8_streams should not raise on any platform."""
|
||||
ensure_utf8_streams()
|
||||
|
||||
|
||||
# ── success_payload / error_payload ──────────────────────────────────────
|
||||
|
||||
|
||||
def test_success_payload_structure() -> None:
|
||||
payload = success_payload({"key": "value"})
|
||||
assert payload["ok"] is True
|
||||
assert payload["schema_version"] == "1"
|
||||
assert payload["data"] == {"key": "value"}
|
||||
|
||||
|
||||
def test_success_payload_with_list() -> None:
|
||||
payload = success_payload([1, 2, 3])
|
||||
assert payload["ok"] is True
|
||||
assert payload["data"] == [1, 2, 3]
|
||||
|
||||
|
||||
def test_error_payload_structure() -> None:
|
||||
payload = error_payload("not_found", "User not found")
|
||||
assert payload["ok"] is False
|
||||
assert payload["schema_version"] == "1"
|
||||
assert payload["error"]["code"] == "not_found"
|
||||
assert payload["error"]["message"] == "User not found"
|
||||
assert "details" not in payload["error"]
|
||||
|
||||
|
||||
def test_error_payload_with_details() -> None:
|
||||
payload = error_payload("api_error", "oops", details={"id": "123"})
|
||||
assert payload["error"]["details"] == {"id": "123"}
|
||||
|
||||
|
||||
# ── default_structured_format ────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_format_json_flag() -> None:
|
||||
assert default_structured_format(as_json=True, as_yaml=False) == "json"
|
||||
|
||||
|
||||
def test_format_yaml_flag() -> None:
|
||||
assert default_structured_format(as_json=False, as_yaml=True) == "yaml"
|
||||
|
||||
|
||||
def test_format_both_flags_raises() -> None:
|
||||
with pytest.raises(click.UsageError):
|
||||
default_structured_format(as_json=True, as_yaml=True)
|
||||
|
||||
|
||||
def test_format_env_yaml(monkeypatch) -> None:
|
||||
monkeypatch.setenv("OUTPUT", "yaml")
|
||||
assert default_structured_format(as_json=False, as_yaml=False) == "yaml"
|
||||
|
||||
|
||||
def test_format_env_json(monkeypatch) -> None:
|
||||
monkeypatch.setenv("OUTPUT", "json")
|
||||
assert default_structured_format(as_json=False, as_yaml=False) == "json"
|
||||
|
||||
|
||||
def test_format_env_rich(monkeypatch) -> None:
|
||||
monkeypatch.setenv("OUTPUT", "rich")
|
||||
assert default_structured_format(as_json=False, as_yaml=False) is None
|
||||
|
||||
|
||||
def test_format_auto_non_tty(monkeypatch) -> None:
|
||||
monkeypatch.setenv("OUTPUT", "auto")
|
||||
monkeypatch.setattr("sys.stdout", type("FakeStdout", (), {"isatty": lambda self: False})())
|
||||
assert default_structured_format(as_json=False, as_yaml=False) == "yaml"
|
||||
|
||||
|
||||
# ── use_rich_output ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_use_rich_when_no_structured(monkeypatch) -> None:
|
||||
monkeypatch.setenv("OUTPUT", "rich")
|
||||
assert use_rich_output(as_json=False, as_yaml=False) is True
|
||||
|
||||
|
||||
def test_no_rich_when_json() -> None:
|
||||
assert use_rich_output(as_json=True, as_yaml=False) is False
|
||||
|
||||
|
||||
def test_no_rich_when_compact(monkeypatch) -> None:
|
||||
monkeypatch.setenv("OUTPUT", "rich")
|
||||
assert use_rich_output(as_json=False, as_yaml=False, compact=True) is False
|
||||
|
||||
|
||||
# ── emit_structured ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_emit_structured_json(capsys) -> None:
|
||||
result = emit_structured({"key": "val"}, as_json=True, as_yaml=False)
|
||||
assert result is True
|
||||
captured = capsys.readouterr()
|
||||
parsed = json.loads(captured.out)
|
||||
assert parsed["ok"] is True
|
||||
assert parsed["data"]["key"] == "val"
|
||||
|
||||
|
||||
def test_emit_structured_yaml(capsys) -> None:
|
||||
result = emit_structured({"key": "val"}, as_json=False, as_yaml=True)
|
||||
assert result is True
|
||||
captured = capsys.readouterr()
|
||||
parsed = yaml.safe_load(captured.out)
|
||||
assert parsed["ok"] is True
|
||||
assert parsed["data"]["key"] == "val"
|
||||
|
||||
|
||||
def test_emit_structured_returns_false_when_rich(monkeypatch) -> None:
|
||||
monkeypatch.setenv("OUTPUT", "rich")
|
||||
result = emit_structured({"key": "val"}, as_json=False, as_yaml=False)
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_emit_structured_wraps_already_wrapped_payload(capsys) -> None:
|
||||
"""If data is already in the agent schema, it should not be double-wrapped."""
|
||||
already_wrapped = {"ok": True, "schema_version": "1", "data": "hello"}
|
||||
emit_structured(already_wrapped, as_json=True, as_yaml=False)
|
||||
captured = capsys.readouterr()
|
||||
parsed = json.loads(captured.out)
|
||||
assert parsed["data"] == "hello"
|
||||
assert "data" not in str(parsed.get("data", {}) if isinstance(parsed.get("data"), dict) else "")
|
||||
73
tests/test_timeutil.py
Normal file
73
tests/test_timeutil.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""Tests for twitter_cli.timeutil module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from twitter_cli.timeutil import format_iso8601, format_local_time, format_relative_time
|
||||
|
||||
|
||||
SAMPLE_TIMESTAMP = "Sat Mar 08 12:00:00 +0000 2026"
|
||||
|
||||
|
||||
# ── format_local_time ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_format_local_time_valid() -> None:
|
||||
result = format_local_time(SAMPLE_TIMESTAMP)
|
||||
# Should be in YYYY-MM-DD HH:MM format (local timezone)
|
||||
assert result.startswith("2026-03-")
|
||||
assert ":" in result
|
||||
|
||||
|
||||
def test_format_local_time_empty() -> None:
|
||||
assert format_local_time("") == ""
|
||||
|
||||
|
||||
def test_format_local_time_invalid() -> None:
|
||||
assert format_local_time("not a date") == "not a date"
|
||||
|
||||
|
||||
# ── format_relative_time ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_format_relative_time_old() -> None:
|
||||
# A timestamp from 2020 should show years ago
|
||||
old_ts = "Sat Jan 01 00:00:00 +0000 2020"
|
||||
result = format_relative_time(old_ts)
|
||||
assert result.endswith("ago")
|
||||
assert "y" in result or "mo" in result or "d" in result
|
||||
|
||||
|
||||
def test_format_relative_time_empty() -> None:
|
||||
assert format_relative_time("") == ""
|
||||
|
||||
|
||||
def test_format_relative_time_invalid() -> None:
|
||||
assert format_relative_time("garbage") == "garbage"
|
||||
|
||||
|
||||
# ── format_iso8601 ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_format_iso8601_valid() -> None:
|
||||
result = format_iso8601(SAMPLE_TIMESTAMP)
|
||||
assert result.startswith("2026-03-08T12:00:00")
|
||||
assert "+00:00" in result or "Z" in result
|
||||
|
||||
|
||||
def test_format_iso8601_empty() -> None:
|
||||
assert format_iso8601("") == ""
|
||||
|
||||
|
||||
def test_format_iso8601_invalid() -> None:
|
||||
assert format_iso8601("not a date") == "not a date"
|
||||
|
||||
|
||||
def test_format_iso8601_roundtrip() -> None:
|
||||
"""ISO 8601 output should be parseable by datetime.fromisoformat."""
|
||||
from datetime import datetime
|
||||
|
||||
result = format_iso8601(SAMPLE_TIMESTAMP)
|
||||
dt = datetime.fromisoformat(result)
|
||||
assert dt.year == 2026
|
||||
assert dt.month == 3
|
||||
assert dt.day == 8
|
||||
@@ -19,6 +19,7 @@ import sys
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from .constants import BEARER_TOKEN, get_user_agent
|
||||
from .exceptions import AuthenticationError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -136,7 +137,7 @@ def verify_cookies(auth_token: str, ct0: str, cookie_string: Optional[str] = Non
|
||||
try:
|
||||
resp = session.get(url, headers=headers, timeout=5)
|
||||
if resp.status_code in (401, 403):
|
||||
raise RuntimeError(
|
||||
raise AuthenticationError(
|
||||
"Cookie expired or invalid (HTTP %d). Please re-login to x.com in your browser." % resp.status_code
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
@@ -616,7 +617,7 @@ def get_cookies() -> Dict[str, str]:
|
||||
lines.append("Option 2: Make sure you are logged into x.com in your browser (Arc/Chrome/Edge/Firefox/Brave)")
|
||||
lines.append("")
|
||||
lines.append("Run 'twitter -v <command>' for debug diagnostics.")
|
||||
raise RuntimeError("\n".join(lines))
|
||||
raise AuthenticationError("\n".join(lines))
|
||||
|
||||
# Verify only for explicit auth failures; transient endpoint issues are tolerated.
|
||||
try:
|
||||
|
||||
@@ -44,6 +44,7 @@ from rich.console import Console
|
||||
from . import __version__
|
||||
from .auth import get_cookies
|
||||
from .cache import resolve_cached_tweet, save_tweet_cache
|
||||
from .exceptions import TwitterError
|
||||
from .client import TwitterClient
|
||||
from .config import load_config
|
||||
from .filter import filter_tweets
|
||||
@@ -156,9 +157,13 @@ def _get_client(config=None, quiet=False):
|
||||
|
||||
|
||||
|
||||
def _exit_with_error(exc):
|
||||
# type: (RuntimeError) -> None
|
||||
if emit_error(_error_code_for_message(str(exc)), str(exc)):
|
||||
def _error_code_from_exc(exc: Exception) -> str:
|
||||
"""Extract structured error code from an exception."""
|
||||
return getattr(exc, "error_code", "api_error")
|
||||
|
||||
|
||||
def _exit_with_error(exc: Exception) -> None:
|
||||
if emit_error(_error_code_from_exc(exc), str(exc)):
|
||||
sys.exit(1)
|
||||
console.print("[red]❌ %s[/red]" % exc)
|
||||
sys.exit(1)
|
||||
@@ -168,24 +173,10 @@ def _run_guarded(action):
|
||||
# type: (Callable[[], Any]) -> Any
|
||||
try:
|
||||
return action()
|
||||
except RuntimeError as exc:
|
||||
except (TwitterError, RuntimeError) as exc:
|
||||
_exit_with_error(exc)
|
||||
|
||||
|
||||
def _error_code_for_message(message):
|
||||
# type: (str) -> str
|
||||
lowered = message.lower()
|
||||
if "cookie expired" in lowered or "no twitter cookies found" in lowered or "invalid cookie" in lowered:
|
||||
return "not_authenticated"
|
||||
if "rate limited" in lowered or "http 429" in lowered:
|
||||
return "rate_limited"
|
||||
if "invalid tweet" in lowered or "required" in lowered or "--max must" in lowered:
|
||||
return "invalid_input"
|
||||
if "not found" in lowered:
|
||||
return "not_found"
|
||||
return "api_error"
|
||||
|
||||
|
||||
def _resolve_fetch_count(max_count, configured):
|
||||
# type: (Optional[int], int) -> int
|
||||
"""Resolve fetch count with bounds checks."""
|
||||
@@ -258,13 +249,13 @@ def _print_lines(lines: List[str], mode: StructuredMode) -> None:
|
||||
|
||||
|
||||
def _handle_structured_runtime_error(
|
||||
exc: RuntimeError,
|
||||
exc: Exception,
|
||||
*,
|
||||
mode: StructuredMode,
|
||||
details: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
if _emit_mode_payload(
|
||||
error_payload(_error_code_for_message(str(exc)), str(exc), details=details),
|
||||
error_payload(_error_code_from_exc(exc), str(exc), details=details),
|
||||
mode,
|
||||
):
|
||||
raise SystemExit(1) from None
|
||||
@@ -285,7 +276,7 @@ def _run_write_command(
|
||||
client = _get_client(load_config())
|
||||
_print_lines(progress_lines or [], mode)
|
||||
payload = operation(client)
|
||||
except RuntimeError as exc:
|
||||
except (TwitterError, RuntimeError) as exc:
|
||||
_handle_structured_runtime_error(exc, mode=mode, details=error_details)
|
||||
return None
|
||||
|
||||
@@ -325,7 +316,7 @@ def _fetch_and_display(fetch_fn, label, emoji, max_count, as_json, as_yaml, outp
|
||||
elapsed = time.time() - start
|
||||
if rich_output:
|
||||
console.print("✅ Fetched %d %s in %.1fs\n" % (len(tweets), label, elapsed))
|
||||
except RuntimeError as exc:
|
||||
except (TwitterError, RuntimeError) as exc:
|
||||
_exit_with_error(exc)
|
||||
|
||||
filtered = _apply_filter(tweets, do_filter, config, rich_output=rich_output)
|
||||
@@ -420,7 +411,7 @@ def feed(ctx, feed_type, max_count, as_json, as_yaml, input_file, output_file, d
|
||||
elapsed = time.time() - start
|
||||
if rich_output:
|
||||
console.print("✅ Fetched %d tweets in %.1fs\n" % (len(tweets), elapsed))
|
||||
except RuntimeError as exc:
|
||||
except (TwitterError, RuntimeError) as exc:
|
||||
_exit_with_error(exc)
|
||||
|
||||
filtered = _apply_filter(tweets, do_filter, config, rich_output=rich_output)
|
||||
@@ -502,7 +493,7 @@ def user(screen_name, as_json, as_yaml):
|
||||
if rich_output:
|
||||
console.print("👤 Fetching user @%s..." % screen_name)
|
||||
profile = client.fetch_user(screen_name)
|
||||
except RuntimeError as exc:
|
||||
except (TwitterError, RuntimeError) as exc:
|
||||
_exit_with_error(exc)
|
||||
|
||||
if not emit_structured(user_profile_to_dict(profile), as_json=as_json, as_yaml=as_yaml):
|
||||
@@ -693,7 +684,7 @@ def tweet(ctx, tweet_id, max_count, full_text, as_json, as_yaml):
|
||||
elapsed = time.time() - start
|
||||
if rich_output:
|
||||
console.print("✅ Fetched %d tweets in %.1fs\n" % (len(tweets), elapsed))
|
||||
except RuntimeError as exc:
|
||||
except (TwitterError, RuntimeError) as exc:
|
||||
_exit_with_error(exc)
|
||||
|
||||
_emit_tweet_detail(tweets, compact=compact, as_json=as_json, as_yaml=as_yaml, full_text=full_text)
|
||||
@@ -757,7 +748,7 @@ def show(ctx, index, max_count, full_text, output_file, as_json, as_yaml):
|
||||
elapsed = time.time() - start
|
||||
if rich_output:
|
||||
console.print("✅ Fetched %d tweets in %.1fs\n" % (len(tweets), elapsed))
|
||||
except RuntimeError as exc:
|
||||
except (TwitterError, RuntimeError) as exc:
|
||||
_exit_with_error(exc)
|
||||
|
||||
if output_file:
|
||||
@@ -795,7 +786,7 @@ def article(ctx, tweet_id, as_json, as_yaml, as_markdown, output_file):
|
||||
elapsed = time.time() - start
|
||||
if rich_output:
|
||||
console.print("✅ Fetched article in %.1fs\n" % elapsed)
|
||||
except RuntimeError as exc:
|
||||
except (TwitterError, RuntimeError) as exc:
|
||||
_exit_with_error(exc)
|
||||
|
||||
markdown = article_to_markdown(article_tweet)
|
||||
@@ -836,13 +827,15 @@ def list_timeline(ctx, list_id, max_count, as_json, as_yaml, do_filter, full_tex
|
||||
_run_guarded(_run)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("screen_name")
|
||||
@click.option("--max", "-n", "max_count", type=int, default=None, help="Max users to fetch.")
|
||||
@structured_output_options
|
||||
def followers(screen_name, max_count, as_json, as_yaml):
|
||||
# type: (str, int, bool, bool) -> None
|
||||
"""List followers of a user. SCREEN_NAME is the @handle (without @)."""
|
||||
def _fetch_and_display_users(
|
||||
screen_name: str,
|
||||
fetch_fn_name: str,
|
||||
label: str,
|
||||
max_count: Optional[int],
|
||||
as_json: bool,
|
||||
as_yaml: bool,
|
||||
) -> None:
|
||||
"""Shared fetch-and-display logic for followers/following commands."""
|
||||
screen_name = screen_name.lstrip("@")
|
||||
config = load_config()
|
||||
try:
|
||||
@@ -853,22 +846,32 @@ def followers(screen_name, max_count, as_json, as_yaml):
|
||||
profile = client.fetch_user(screen_name)
|
||||
fetch_count = _resolve_configured_count(config, max_count)
|
||||
if rich_output:
|
||||
console.print("👥 Fetching followers (%d)...\n" % fetch_count)
|
||||
console.print("👥 Fetching %s (%d)...\n" % (label, fetch_count))
|
||||
start = time.time()
|
||||
users = client.fetch_followers(profile.id, fetch_count)
|
||||
users = getattr(client, fetch_fn_name)(profile.id, fetch_count)
|
||||
elapsed = time.time() - start
|
||||
if rich_output:
|
||||
console.print("✅ Fetched %d followers in %.1fs\n" % (len(users), elapsed))
|
||||
except RuntimeError as exc:
|
||||
console.print("✅ Fetched %d %s in %.1fs\n" % (len(users), label, elapsed))
|
||||
except (TwitterError, RuntimeError) as exc:
|
||||
_exit_with_error(exc)
|
||||
|
||||
if emit_structured(users_to_data(users), as_json=as_json, as_yaml=as_yaml):
|
||||
return
|
||||
|
||||
print_user_table(users, console, title="👥 @%s followers — %d" % (screen_name, len(users)))
|
||||
print_user_table(users, console, title="👥 @%s %s — %d" % (screen_name, label, len(users)))
|
||||
console.print()
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("screen_name")
|
||||
@click.option("--max", "-n", "max_count", type=int, default=None, help="Max users to fetch.")
|
||||
@structured_output_options
|
||||
def followers(screen_name, max_count, as_json, as_yaml):
|
||||
# type: (str, int, bool, bool) -> None
|
||||
"""List followers of a user. SCREEN_NAME is the @handle (without @)."""
|
||||
_fetch_and_display_users(screen_name, "fetch_followers", "followers", max_count, as_json, as_yaml)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("screen_name")
|
||||
@click.option("--max", "-n", "max_count", type=int, default=None, help="Max users to fetch.")
|
||||
@@ -876,30 +879,7 @@ def followers(screen_name, max_count, as_json, as_yaml):
|
||||
def following(screen_name, max_count, as_json, as_yaml):
|
||||
# type: (str, int, bool, bool) -> None
|
||||
"""List accounts a user is following. SCREEN_NAME is the @handle (without @)."""
|
||||
screen_name = screen_name.lstrip("@")
|
||||
config = load_config()
|
||||
try:
|
||||
rich_output = use_rich_output(as_json=as_json, as_yaml=as_yaml)
|
||||
client = _get_client(config, quiet=not rich_output)
|
||||
if rich_output:
|
||||
console.print("👤 Fetching @%s's profile..." % screen_name)
|
||||
profile = client.fetch_user(screen_name)
|
||||
fetch_count = _resolve_configured_count(config, max_count)
|
||||
if rich_output:
|
||||
console.print("👥 Fetching following (%d)...\n" % fetch_count)
|
||||
start = time.time()
|
||||
users = client.fetch_following(profile.id, fetch_count)
|
||||
elapsed = time.time() - start
|
||||
if rich_output:
|
||||
console.print("✅ Fetched %d following in %.1fs\n" % (len(users), elapsed))
|
||||
except RuntimeError as exc:
|
||||
_exit_with_error(exc)
|
||||
|
||||
if emit_structured(users_to_data(users), as_json=as_json, as_yaml=as_yaml):
|
||||
return
|
||||
|
||||
print_user_table(users, console, title="👥 @%s following — %d" % (screen_name, len(users)))
|
||||
console.print()
|
||||
_fetch_and_display_users(screen_name, "fetch_following", "following", max_count, as_json, as_yaml)
|
||||
|
||||
|
||||
# ── Write commands ──────────────────────────────────────────────────────
|
||||
@@ -1055,8 +1035,8 @@ def status(as_json, as_yaml):
|
||||
rich_output = use_rich_output(as_json=as_json, as_yaml=as_yaml)
|
||||
client = _get_client(config, quiet=not rich_output)
|
||||
profile = client.fetch_me()
|
||||
except RuntimeError as exc:
|
||||
payload = error_payload("not_authenticated", str(exc))
|
||||
except (TwitterError, RuntimeError) as exc:
|
||||
payload = error_payload(_error_code_from_exc(exc), str(exc))
|
||||
if emit_structured(payload, as_json=as_json, as_yaml=as_yaml):
|
||||
sys.exit(1)
|
||||
_exit_with_error(exc)
|
||||
@@ -1082,8 +1062,8 @@ def whoami(as_json, as_yaml):
|
||||
if rich_output:
|
||||
console.print("👤 Fetching current user...")
|
||||
profile = client.fetch_me()
|
||||
except RuntimeError as exc:
|
||||
if emit_structured(error_payload("not_authenticated", str(exc)), as_json=as_json, as_yaml=as_yaml):
|
||||
except (TwitterError, RuntimeError) as exc:
|
||||
if emit_structured(error_payload(_error_code_from_exc(exc), str(exc)), as_json=as_json, as_yaml=as_yaml):
|
||||
raise SystemExit(1) from None
|
||||
_exit_with_error(exc)
|
||||
|
||||
|
||||
7
twitter_cli/commands/__init__.py
Normal file
7
twitter_cli/commands/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""CLI command sub-modules for twitter-cli.
|
||||
|
||||
Commands are split into three groups:
|
||||
- read: feed, bookmarks, search, tweet, article, show, list, favorites
|
||||
- write: post, reply, quote, delete, like/unlike, retweet/unretweet, bookmark/unbookmark
|
||||
- user: user, user-posts, likes, followers, following, whoami, status, follow/unfollow
|
||||
"""
|
||||
@@ -6,7 +6,7 @@ Provides a structured exception hierarchy for categorized error handling:
|
||||
- Network errors
|
||||
- Query ID resolution failures
|
||||
|
||||
Modeled after bilibili-cli/xiaohongshu-cli exception patterns.
|
||||
Each exception carries an `error_code` attribute for structured output.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -15,30 +15,50 @@ from __future__ import annotations
|
||||
class TwitterError(RuntimeError):
|
||||
"""Base exception for twitter-cli errors."""
|
||||
|
||||
error_code: str = "api_error"
|
||||
|
||||
|
||||
class AuthenticationError(TwitterError):
|
||||
"""Raised when cookies are missing, expired, or invalid."""
|
||||
|
||||
error_code = "not_authenticated"
|
||||
|
||||
|
||||
class RateLimitError(TwitterError):
|
||||
"""Raised when Twitter rate limits the request (HTTP 429)."""
|
||||
|
||||
error_code = "rate_limited"
|
||||
|
||||
|
||||
class NotFoundError(TwitterError):
|
||||
"""Raised when a user or tweet is not found."""
|
||||
|
||||
error_code = "not_found"
|
||||
|
||||
|
||||
class NetworkError(TwitterError):
|
||||
"""Raised when upstream network requests fail."""
|
||||
|
||||
error_code = "network_error"
|
||||
|
||||
|
||||
class QueryIdError(TwitterError):
|
||||
"""Raised when a GraphQL queryId cannot be resolved."""
|
||||
|
||||
error_code = "query_id_error"
|
||||
|
||||
|
||||
class MediaUploadError(TwitterError):
|
||||
"""Raised when media upload fails (file not found, too large, unsupported format, API error)."""
|
||||
|
||||
error_code = "media_upload_error"
|
||||
|
||||
|
||||
class InvalidInputError(TwitterError):
|
||||
"""Raised when user input is invalid (bad tweet ID, invalid options, etc.)."""
|
||||
|
||||
error_code = "invalid_input"
|
||||
|
||||
|
||||
class TwitterAPIError(TwitterError):
|
||||
"""Raised on non-OK Twitter API responses with HTTP status + message."""
|
||||
@@ -46,4 +66,13 @@ class TwitterAPIError(TwitterError):
|
||||
def __init__(self, status_code: int, message: str):
|
||||
self.status_code = status_code
|
||||
self.message = message
|
||||
# Derive error_code from HTTP status
|
||||
if status_code in (401, 403):
|
||||
self.error_code = "not_authenticated"
|
||||
elif status_code == 429:
|
||||
self.error_code = "rate_limited"
|
||||
elif status_code == 404:
|
||||
self.error_code = "not_found"
|
||||
else:
|
||||
self.error_code = "api_error"
|
||||
super().__init__("Twitter API error (HTTP %d): %s" % (status_code, message))
|
||||
|
||||
@@ -6,7 +6,7 @@ import json
|
||||
from typing import Any, Dict, Iterable, List, Optional
|
||||
|
||||
from .models import Author, Metrics, Tweet, TweetMedia, UserProfile
|
||||
from .timeutil import format_local_time
|
||||
from .timeutil import format_iso8601, format_local_time
|
||||
|
||||
|
||||
def tweet_to_dict(tweet: Tweet) -> Dict[str, Any]:
|
||||
@@ -31,6 +31,7 @@ def tweet_to_dict(tweet: Tweet) -> Dict[str, Any]:
|
||||
},
|
||||
"createdAt": tweet.created_at,
|
||||
"createdAtLocal": format_local_time(tweet.created_at),
|
||||
"createdAtISO": format_iso8601(tweet.created_at),
|
||||
"media": [
|
||||
{
|
||||
"type": media.type,
|
||||
@@ -190,6 +191,7 @@ def user_profile_to_dict(user: UserProfile) -> Dict[str, Any]:
|
||||
"verified": user.verified,
|
||||
"profileImageUrl": user.profile_image_url,
|
||||
"createdAt": user.created_at,
|
||||
"createdAtISO": format_iso8601(user.created_at),
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -69,3 +69,14 @@ def format_relative_time(created_at: str) -> str:
|
||||
return "%dmo ago" % months
|
||||
years = days // 365
|
||||
return "%dy ago" % years
|
||||
|
||||
|
||||
def format_iso8601(created_at: str) -> str:
|
||||
"""Convert Twitter timestamp to ISO 8601 format.
|
||||
|
||||
Returns e.g. "2026-03-08T12:00:00+00:00" or the original string on failure.
|
||||
"""
|
||||
dt = _parse_twitter_time(created_at)
|
||||
if dt is None:
|
||||
return created_at
|
||||
return dt.isoformat()
|
||||
|
||||
Reference in New Issue
Block a user