diff --git a/tests/test_cli.py b/tests/test_cli.py index 2eb0fd4..aa06654 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -88,3 +88,109 @@ def test_cli_bookmark_alias_works(monkeypatch) -> None: assert result.exit_code == 0 assert calls == ["123"] + + +def test_cli_whoami_command(monkeypatch) -> None: + from twitter_cli.models import UserProfile + + class FakeClient: + def fetch_me(self) -> UserProfile: + return UserProfile(id="42", name="Test User", screen_name="testuser") + + monkeypatch.setattr("twitter_cli.cli._get_client", lambda config=None: FakeClient()) + runner = CliRunner() + + result = runner.invoke(cli, ["whoami"]) + assert result.exit_code == 0 + + result_json = runner.invoke(cli, ["whoami", "--json"]) + assert result_json.exit_code == 0 + assert '"screenName": "testuser"' in result_json.output + + +def test_cli_reply_command(monkeypatch) -> None: + calls = [] + + class FakeClient: + def create_tweet(self, text: str, reply_to_id=None) -> str: + calls.append({"text": text, "reply_to_id": reply_to_id}) + return "999" + + monkeypatch.setattr("twitter_cli.cli._get_client", lambda config=None: FakeClient()) + runner = CliRunner() + + result = runner.invoke(cli, ["reply", "12345", "Nice tweet!"]) + assert result.exit_code == 0 + assert calls[0]["reply_to_id"] == "12345" + assert calls[0]["text"] == "Nice tweet!" + + +def test_cli_quote_command(monkeypatch) -> None: + calls = [] + + class FakeClient: + def quote_tweet(self, tweet_id: str, text: str) -> str: + calls.append({"tweet_id": tweet_id, "text": text}) + return "888" + + monkeypatch.setattr("twitter_cli.cli._get_client", lambda config=None: FakeClient()) + runner = CliRunner() + + result = runner.invoke(cli, ["quote", "12345", "Interesting!"]) + assert result.exit_code == 0 + assert calls[0]["tweet_id"] == "12345" + assert calls[0]["text"] == "Interesting!" + + +def test_cli_follow_command(monkeypatch) -> None: + from twitter_cli.models import UserProfile + actions = [] + + class FakeClient: + def fetch_user(self, screen_name: str) -> UserProfile: + return UserProfile(id="42", name="Alice", screen_name=screen_name) + + def follow_user(self, user_id: str) -> bool: + actions.append(("follow", user_id)) + return True + + monkeypatch.setattr("twitter_cli.cli._get_client", lambda config=None: FakeClient()) + runner = CliRunner() + + result = runner.invoke(cli, ["follow", "alice"]) + assert result.exit_code == 0 + assert actions == [("follow", "42")] + + +def test_cli_unfollow_command(monkeypatch) -> None: + from twitter_cli.models import UserProfile + actions = [] + + class FakeClient: + def fetch_user(self, screen_name: str) -> UserProfile: + return UserProfile(id="42", name="Alice", screen_name=screen_name) + + def unfollow_user(self, user_id: str) -> bool: + actions.append(("unfollow", user_id)) + return True + + monkeypatch.setattr("twitter_cli.cli._get_client", lambda config=None: FakeClient()) + runner = CliRunner() + + result = runner.invoke(cli, ["unfollow", "alice"]) + assert result.exit_code == 0 + assert actions == [("unfollow", "42")] + + +def test_cli_compact_mode(tmp_path, tweet_factory) -> None: + json_path = tmp_path / "tweets.json" + json_path.write_text(tweets_to_json([tweet_factory("1")]), encoding="utf-8") + + runner = CliRunner() + result = runner.invoke(cli, ["-c", "feed", "--input", str(json_path)]) + assert result.exit_code == 0 + # Compact output should have "author" field with @ prefix + assert '"@alice"' in result.output + # Compact output should NOT have full metrics keys + assert '"metrics"' not in result.output + diff --git a/tests/test_serialization.py b/tests/test_serialization.py index 0df50e7..bb44fa6 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -20,3 +20,32 @@ def test_tweets_json_roundtrip(tweet_factory) -> None: assert [tweet.id for tweet in restored] == ["1", "2"] assert restored[1].lang == "zh" + + +def test_compact_serialization(tweet_factory) -> None: + from twitter_cli.serialization import tweet_to_compact_dict, tweets_to_compact_json + import json + + tweet = tweet_factory( + "42", + created_at="Sat Mar 07 05:51:02 +0000 2026", + text="A" * 200, + ) + compact = tweet_to_compact_dict(tweet) + + assert compact["id"] == "42" + assert compact["author"] == "@alice" + assert compact["time"] == "Mar 07 05:51" + assert len(compact["text"]) <= 140 + assert compact["text"].endswith("...") + assert compact["likes"] == 10 + assert compact["rts"] == 2 + # Should only have 6 keys + assert set(compact.keys()) == {"id", "author", "text", "likes", "rts", "time"} + + # Test batch serialization + raw = tweets_to_compact_json([tweet]) + parsed = json.loads(raw) + assert len(parsed) == 1 + assert parsed[0]["author"] == "@alice" + diff --git a/twitter_cli/cli.py b/twitter_cli/cli.py index 8412553..4757cbf 100644 --- a/twitter_cli/cli.py +++ b/twitter_cli/cli.py @@ -12,13 +12,17 @@ Read commands: twitter list # list timeline twitter followers # followers list twitter following # following list + twitter whoami # current user profile Write commands: twitter post "text" # post a tweet + twitter reply "text" # reply to a tweet + twitter quote "text" # quote-tweet twitter delete # delete a tweet twitter like/unlike # like/unlike twitter bookmark/unbookmark # bookmark/unbookmark twitter retweet/unretweet # retweet/unretweet + twitter follow/unfollow # follow/unfollow """ from __future__ import annotations @@ -46,7 +50,13 @@ from .formatter import ( print_user_profile, print_user_table, ) -from .serialization import tweets_from_json, tweets_to_json, user_profile_to_dict, users_to_json +from .serialization import ( + tweets_from_json, + tweets_to_compact_json, + tweets_to_json, + user_profile_to_dict, + users_to_json, +) console = Console(stderr=True) @@ -160,15 +170,19 @@ def _apply_filter(tweets, do_filter, config): @click.group() @click.option("--verbose", "-v", is_flag=True, help="Enable debug logging.") +@click.option("--compact", "-c", is_flag=True, help="Compact output (minimal fields, LLM-friendly).") @click.version_option(version=__version__) -def cli(verbose): - # type: (bool) -> None +@click.pass_context +def cli(ctx, verbose, compact): + # type: (Any, bool, bool) -> None """twitter — Twitter/X CLI tool 🐦""" _setup_logging(verbose) + ctx.ensure_object(dict) + ctx.obj["compact"] = compact -def _fetch_and_display(fetch_fn, label, emoji, max_count, as_json, output_file, do_filter, config=None): - # type: (Any, str, str, Optional[int], bool, Optional[str], bool, Optional[dict]) -> None +def _fetch_and_display(fetch_fn, label, emoji, max_count, as_json, output_file, do_filter, config=None, compact=False): + # type: (Any, str, str, Optional[int], bool, Optional[str], bool, Optional[dict], bool) -> None """Common fetch-filter-display logic for timeline-like commands.""" if config is None: config = load_config() @@ -188,6 +202,10 @@ def _fetch_and_display(fetch_fn, label, emoji, max_count, as_json, output_file, Path(output_file).write_text(tweets_to_json(filtered), encoding="utf-8") console.print("💾 Saved to %s\n" % output_file) + if compact: + click.echo(tweets_to_compact_json(filtered)) + return + if as_json: click.echo(tweets_to_json(filtered)) return @@ -196,8 +214,8 @@ def _fetch_and_display(fetch_fn, label, emoji, max_count, as_json, output_file, console.print() -def _run_bookmarks_command(max_count, as_json, output_file, do_filter): - # type: (Optional[int], bool, Optional[str], bool) -> None +def _run_bookmarks_command(max_count, as_json, output_file, do_filter, compact=False): + # type: (Optional[int], bool, Optional[str], bool, bool) -> None config = load_config() def _run(): @@ -211,6 +229,7 @@ def _run_bookmarks_command(max_count, as_json, output_file, do_filter): output_file, do_filter, config, + compact=compact, ) _run_guarded(_run) @@ -230,9 +249,11 @@ def _run_bookmarks_command(max_count, as_json, output_file, do_filter): @click.option("--input", "-i", "input_file", type=str, default=None, help="Load tweets from JSON file.") @click.option("--output", "-o", "output_file", type=str, default=None, help="Save filtered tweets to JSON file.") @click.option("--filter", "do_filter", is_flag=True, help="Enable score-based filtering.") -def feed(feed_type, max_count, as_json, input_file, output_file, do_filter): - # type: (str, Optional[int], bool, Optional[str], Optional[str], bool) -> None +@click.pass_context +def feed(ctx, feed_type, max_count, as_json, input_file, output_file, do_filter): + # type: (Any, str, Optional[int], bool, Optional[str], Optional[str], bool) -> None """Fetch home timeline with optional filtering.""" + compact = ctx.obj.get("compact", False) config = load_config() try: if input_file: @@ -260,6 +281,10 @@ def feed(feed_type, max_count, as_json, input_file, output_file, do_filter): Path(output_file).write_text(tweets_to_json(filtered), encoding="utf-8") console.print("💾 Saved filtered tweets to %s\n" % output_file) + if compact: + click.echo(tweets_to_compact_json(filtered)) + return + if as_json: click.echo(tweets_to_json(filtered)) return @@ -275,10 +300,11 @@ def feed(feed_type, max_count, as_json, input_file, output_file, do_filter): @click.option("--json", "as_json", is_flag=True, help="Output as JSON.") @click.option("--output", "-o", "output_file", type=str, default=None, help="Save tweets to JSON file.") @click.option("--filter", "do_filter", is_flag=True, help="Enable score-based filtering.") -def favorites(max_count, as_json, output_file, do_filter): - # type: (Optional[int], bool, Optional[str], bool) -> None +@click.pass_context +def favorites(ctx, max_count, as_json, output_file, do_filter): + # type: (Any, Optional[int], bool, Optional[str], bool) -> None """Fetch bookmarked (favorite) tweets.""" - _run_bookmarks_command(max_count, as_json, output_file, do_filter) + _run_bookmarks_command(max_count, as_json, output_file, do_filter, compact=ctx.obj.get("compact", False)) @cli.command(name="bookmarks") @@ -286,10 +312,11 @@ def favorites(max_count, as_json, output_file, do_filter): @click.option("--json", "as_json", is_flag=True, help="Output as JSON.") @click.option("--output", "-o", "output_file", type=str, default=None, help="Save tweets to JSON file.") @click.option("--filter", "do_filter", is_flag=True, help="Enable score-based filtering.") -def bookmarks(max_count, as_json, output_file, do_filter): - # type: (Optional[int], bool, Optional[str], bool) -> None +@click.pass_context +def bookmarks(ctx, max_count, as_json, output_file, do_filter): + # type: (Any, Optional[int], bool, Optional[str], bool) -> None """Fetch bookmarked tweets.""" - _run_bookmarks_command(max_count, as_json, output_file, do_filter) + _run_bookmarks_command(max_count, as_json, output_file, do_filter, compact=ctx.obj.get("compact", False)) @cli.command() @@ -319,10 +346,12 @@ def user(screen_name, as_json): @click.option("--max", "-n", "max_count", type=int, default=None, help="Max number of tweets to fetch.") @click.option("--json", "as_json", is_flag=True, help="Output as JSON.") @click.option("--output", "-o", "output_file", type=str, default=None, help="Save tweets to JSON file.") -def user_posts(screen_name, max_count, as_json, output_file): - # type: (str, int, bool, Optional[str]) -> None +@click.pass_context +def user_posts(ctx, screen_name, max_count, as_json, output_file): + # type: (Any, str, int, bool, Optional[str]) -> None """List a user's tweets. SCREEN_NAME is the @handle (without @).""" screen_name = screen_name.lstrip("@") + compact = ctx.obj.get("compact", False) config = load_config() def _run(): client = _get_client(config) @@ -331,6 +360,7 @@ def user_posts(screen_name, max_count, as_json, output_file): _fetch_and_display( lambda count: client.fetch_user_tweets(profile.id, count), "@%s tweets" % screen_name, "📝", max_count, as_json, output_file, False, config, + compact=compact, ) _run_guarded(_run) @@ -349,15 +379,18 @@ def user_posts(screen_name, max_count, as_json, output_file): @click.option("--json", "as_json", is_flag=True, help="Output as JSON.") @click.option("--output", "-o", "output_file", type=str, default=None, help="Save tweets to JSON file.") @click.option("--filter", "do_filter", is_flag=True, help="Enable score-based filtering.") -def search(query, product, max_count, as_json, output_file, do_filter): - # type: (str, str, int, bool, Optional[str], bool) -> None +@click.pass_context +def search(ctx, query, product, max_count, as_json, output_file, do_filter): + # type: (Any, str, str, int, bool, Optional[str], bool) -> None """Search tweets by QUERY string.""" + compact = ctx.obj.get("compact", False) config = load_config() def _run(): client = _get_client(config) _fetch_and_display( lambda count: client.fetch_search(query, count, product), "'%s' (%s)" % (query, product), "🔍", max_count, as_json, output_file, do_filter, config, + compact=compact, ) _run_guarded(_run) @@ -368,10 +401,12 @@ def search(query, product, max_count, as_json, output_file, do_filter): @click.option("--json", "as_json", is_flag=True, help="Output as JSON.") @click.option("--output", "-o", "output_file", type=str, default=None, help="Save tweets to JSON file.") @click.option("--filter", "do_filter", is_flag=True, help="Enable score-based filtering.") -def likes(screen_name, max_count, as_json, output_file, do_filter): - # type: (str, int, bool, Optional[str], bool) -> None +@click.pass_context +def likes(ctx, screen_name, max_count, as_json, output_file, do_filter): + # type: (Any, str, int, bool, Optional[str], bool) -> None """Show tweets liked by a user. SCREEN_NAME is the @handle (without @).""" screen_name = screen_name.lstrip("@") + compact = ctx.obj.get("compact", False) config = load_config() def _run(): client = _get_client(config) @@ -380,6 +415,7 @@ def likes(screen_name, max_count, as_json, output_file, do_filter): _fetch_and_display( lambda count: client.fetch_user_likes(profile.id, count), "@%s likes" % screen_name, "❤️", max_count, as_json, output_file, do_filter, config, + compact=compact, ) _run_guarded(_run) @@ -388,9 +424,11 @@ def likes(screen_name, max_count, as_json, output_file, do_filter): @click.argument("tweet_id") @click.option("--max", "-n", "max_count", type=int, default=None, help="Max replies to fetch.") @click.option("--json", "as_json", is_flag=True, help="Output as JSON.") -def tweet(tweet_id, max_count, as_json): - # type: (str, int, bool) -> None +@click.pass_context +def tweet(ctx, tweet_id, max_count, as_json): + # type: (Any, str, int, bool) -> None """View a tweet and its replies. TWEET_ID is the numeric tweet ID or full URL.""" + compact = ctx.obj.get("compact", False) tweet_id = _normalize_tweet_id(tweet_id) config = load_config() try: @@ -403,6 +441,10 @@ def tweet(tweet_id, max_count, as_json): except RuntimeError as exc: _exit_with_error(exc) + if compact: + click.echo(tweets_to_compact_json(tweets)) + return + if as_json: click.echo(tweets_to_json(tweets)) return @@ -420,15 +462,18 @@ def tweet(tweet_id, max_count, as_json): @click.option("--max", "-n", "max_count", type=int, default=None, help="Max tweets to fetch.") @click.option("--json", "as_json", is_flag=True, help="Output as JSON.") @click.option("--filter", "do_filter", is_flag=True, help="Enable score-based filtering.") -def list_timeline(list_id, max_count, as_json, do_filter): - # type: (str, int, bool, bool) -> None +@click.pass_context +def list_timeline(ctx, list_id, max_count, as_json, do_filter): + # type: (Any, str, int, bool, bool) -> None """Fetch tweets from a Twitter List. LIST_ID is the numeric list ID.""" + compact = ctx.obj.get("compact", False) config = load_config() def _run(): client = _get_client(config) _fetch_and_display( lambda count: client.fetch_list_timeline(list_id, count), "list %s" % list_id, "📋", max_count, as_json, None, do_filter, config, + compact=compact, ) _run_guarded(_run) @@ -526,6 +571,98 @@ def post(text, reply_to): _exit_with_error(exc) +@cli.command(name="reply") +@click.argument("tweet_id") +@click.argument("text") +def reply_tweet(tweet_id, text): + # type: (str, str) -> None + """Reply to a tweet. TWEET_ID is the tweet to reply to, TEXT is the reply content.""" + tweet_id = _normalize_tweet_id(tweet_id) + config = load_config() + try: + client = _get_client(config) + console.print("💬 Replying to %s..." % tweet_id) + new_id = client.create_tweet(text, reply_to_id=tweet_id) + console.print("[green]✅ Reply posted![/green]") + console.print("🔗 https://x.com/i/status/%s" % new_id) + except RuntimeError as exc: + _exit_with_error(exc) + + +@cli.command(name="quote") +@click.argument("tweet_id") +@click.argument("text") +def quote_tweet(tweet_id, text): + # type: (str, str) -> None + """Quote-tweet a tweet. TWEET_ID is the tweet to quote, TEXT is the commentary.""" + tweet_id = _normalize_tweet_id(tweet_id) + config = load_config() + try: + client = _get_client(config) + console.print("🔄 Quoting tweet %s..." % tweet_id) + new_id = client.quote_tweet(tweet_id, text) + console.print("[green]✅ Quote tweet posted![/green]") + console.print("🔗 https://x.com/i/status/%s" % new_id) + except RuntimeError as exc: + _exit_with_error(exc) + + +@cli.command(name="whoami") +@click.option("--json", "as_json", is_flag=True, help="Output as JSON.") +def whoami(as_json): + # type: (bool,) -> None + """Show the currently authenticated user's profile.""" + config = load_config() + try: + client = _get_client(config) + console.print("👤 Fetching current user...") + profile = client.fetch_me() + except RuntimeError as exc: + _exit_with_error(exc) + + if as_json: + click.echo(json.dumps(user_profile_to_dict(profile), ensure_ascii=False, indent=2)) + else: + console.print() + print_user_profile(profile, console) + + +@cli.command(name="follow") +@click.argument("screen_name") +def follow_user(screen_name): + # type: (str,) -> None + """Follow a user. SCREEN_NAME is the @handle (without @).""" + screen_name = screen_name.lstrip("@") + config = load_config() + try: + client = _get_client(config) + console.print("👤 Looking up @%s..." % screen_name) + profile = client.fetch_user(screen_name) + console.print("➕ Following @%s..." % screen_name) + client.follow_user(profile.id) + console.print("[green]✅ Now following @%s[/green]" % screen_name) + except RuntimeError as exc: + _exit_with_error(exc) + + +@cli.command(name="unfollow") +@click.argument("screen_name") +def unfollow_user(screen_name): + # type: (str,) -> None + """Unfollow a user. SCREEN_NAME is the @handle (without @).""" + screen_name = screen_name.lstrip("@") + config = load_config() + try: + client = _get_client(config) + console.print("👤 Looking up @%s..." % screen_name) + profile = client.fetch_user(screen_name) + console.print("➖ Unfollowing @%s..." % screen_name) + client.unfollow_user(profile.id) + console.print("[green]✅ Unfollowed @%s[/green]" % screen_name) + except RuntimeError as exc: + _exit_with_error(exc) + + @cli.command(name="delete") @click.argument("tweet_id") @click.confirmation_option(prompt="Are you sure you want to delete this tweet?") diff --git a/twitter_cli/client.py b/twitter_cli/client.py index 16aa195..9c4d7c6 100644 --- a/twitter_cli/client.py +++ b/twitter_cli/client.py @@ -575,6 +575,76 @@ class TwitterClient: self._write_delay() return True + def fetch_me(self): + # type: () -> UserProfile + """Fetch the currently authenticated user's profile.""" + url = "https://x.com/i/api/1.1/account/multi/list.json" + data = self._api_get(url) + if isinstance(data, list) and data: + user_data = data[0].get("user", {}) + if user_data: + return UserProfile( + id=str(user_data.get("id_str", "")), + name=user_data.get("name", ""), + screen_name=user_data.get("screen_name", ""), + bio=user_data.get("description", ""), + location=user_data.get("location", ""), + url=_deep_get(user_data, "entities", "url", "urls", 0, "expanded_url") or "", + followers_count=_parse_int(user_data.get("followers_count"), 0), + following_count=_parse_int(user_data.get("friends_count"), 0), + tweets_count=_parse_int(user_data.get("statuses_count"), 0), + likes_count=_parse_int(user_data.get("favourites_count"), 0), + verified=bool(user_data.get("verified", False)), + profile_image_url=user_data.get("profile_image_url_https", ""), + created_at=user_data.get("created_at", ""), + ) + raise RuntimeError("Failed to fetch current user info") + + def quote_tweet(self, tweet_id, text): + # type: (str, str) -> str + """Quote-tweet a tweet. Returns the new tweet ID.""" + variables = { + "tweet_text": text, + "attachment_url": "https://x.com/i/status/%s" % tweet_id, + "media": {"media_entities": [], "possibly_sensitive": False}, + "semantic_annotation_ids": [], + "dark_request": False, + } + data = self._graphql_post("CreateTweet", variables, FEATURES) + self._write_delay() + result = _deep_get(data, "data", "create_tweet", "tweet_results", "result") + if result: + return result.get("rest_id", "") + raise RuntimeError("Failed to create quote tweet") + + def follow_user(self, user_id): + # type: (str) -> bool + """Follow a user by user ID. Returns True on success.""" + url = "https://x.com/i/api/1.1/friendships/create.json" + body = {"user_id": user_id} + headers = self._build_headers(url=url, method="POST") + headers["Content-Type"] = "application/x-www-form-urlencoded" + session = _get_cffi_session() + response = session.post(url, headers=headers, data=body, timeout=30) + if response.status_code >= 400: + raise RuntimeError("Failed to follow user: HTTP %d" % response.status_code) + self._write_delay() + return True + + def unfollow_user(self, user_id): + # type: (str) -> bool + """Unfollow a user by user ID. Returns True on success.""" + url = "https://x.com/i/api/1.1/friendships/destroy.json" + body = {"user_id": user_id} + headers = self._build_headers(url=url, method="POST") + headers["Content-Type"] = "application/x-www-form-urlencoded" + session = _get_cffi_session() + response = session.post(url, headers=headers, data=body, timeout=30) + if response.status_code >= 400: + raise RuntimeError("Failed to unfollow user: HTTP %d" % response.status_code) + self._write_delay() + return True + def _fetch_timeline(self, operation_name, count, get_instructions, extra_variables=None, override_base_variables=False, field_toggles=None): # type: (str, int, Callable[[Any], Any], Optional[Dict[str, Any]], bool, Optional[Dict[str, Any]]) -> List[Tweet] """Generic timeline fetcher with pagination and deduplication. diff --git a/twitter_cli/serialization.py b/twitter_cli/serialization.py index ece85cd..429a668 100644 --- a/twitter_cli/serialization.py +++ b/twitter_cli/serialization.py @@ -129,6 +129,36 @@ def tweets_to_json(tweets: Iterable[Tweet]) -> str: return json.dumps([tweet_to_dict(tweet) for tweet in tweets], ensure_ascii=False, indent=2) +def tweet_to_compact_dict(tweet: Tweet) -> Dict[str, Any]: + """Convert a Tweet into a compact dict with minimal fields for LLM consumption.""" + text = tweet.text.replace("\n", " ").strip() + if len(text) > 140: + text = text[:137] + "..." + # Short time: "Mar 07 05:51" from "Sat Mar 07 05:51:02 +0000 2026" + parts = tweet.created_at.split() + if len(parts) >= 4: + time_str = "%s %s %s" % (parts[1], parts[2], parts[3][:5]) + else: + time_str = tweet.created_at + return { + "id": tweet.id, + "author": "@%s" % tweet.author.screen_name, + "text": text, + "likes": tweet.metrics.likes, + "rts": tweet.metrics.retweets, + "time": time_str, + } + + +def tweets_to_compact_json(tweets: Iterable[Tweet]) -> str: + """Serialize Tweet objects to compact JSON (minimal fields for LLM/pipe usage).""" + return json.dumps( + [tweet_to_compact_dict(tweet) for tweet in tweets], + ensure_ascii=False, + indent=2, + ) + + def user_profile_to_dict(user: UserProfile) -> Dict[str, Any]: """Convert a UserProfile dataclass into a JSON-safe dict.""" return {