diff --git a/tests/test_cli.py b/tests/test_cli.py index ef5d962..491419f 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -321,7 +321,7 @@ def test_cli_reply_command(monkeypatch) -> None: calls = [] class FakeClient: - def create_tweet(self, text: str, reply_to_id=None) -> str: + def create_tweet(self, text: str, reply_to_id=None, media_ids=None) -> str: calls.append({"text": text, "reply_to_id": reply_to_id}) return "999" @@ -338,7 +338,7 @@ def test_cli_quote_command(monkeypatch) -> None: calls = [] class FakeClient: - def quote_tweet(self, tweet_id: str, text: str) -> str: + def quote_tweet(self, tweet_id: str, text: str, media_ids=None) -> str: calls.append({"tweet_id": tweet_id, "text": text}) return "888" @@ -353,7 +353,7 @@ def test_cli_quote_command(monkeypatch) -> None: def test_cli_post_json_output(monkeypatch) -> None: class FakeClient: - def create_tweet(self, text: str, reply_to_id=None) -> str: + def create_tweet(self, text: str, reply_to_id=None, media_ids=None) -> str: assert text == "hello" assert reply_to_id is None return "999" diff --git a/tests/test_client.py b/tests/test_client.py index 9193f89..9ef907d 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -9,6 +9,8 @@ from __future__ import annotations import copy from unittest.mock import MagicMock, patch +import pytest + from twitter_cli.client import ( FEATURES, @@ -540,3 +542,144 @@ class TestParseUserResult: assert user.following_count == 56 assert user.tweets_count == 78 assert user.likes_count == 0 + + +# ── upload_media ───────────────────────────────────────────────────────── + +class TestUploadMedia: + """Tests for TwitterClient.upload_media().""" + + def _make_client(self): + client = TwitterClient.__new__(TwitterClient) + client._auth_token = "tok" + client._ct0 = "ct0" + client._cookie_string = None + client._request_delay = 0 + client._max_retries = 3 + client._retry_base_delay = 5.0 + client._max_count = 200 + client._client_transaction = None + client._ct_init_attempted = True + return client + + @patch("twitter_cli.client._get_cffi_session") + def test_upload_media_init_append_finalize(self, mock_session, tmp_path): + """Happy path: INIT → APPEND → FINALIZE returns media_id.""" + img = tmp_path / "photo.jpg" + img.write_bytes(b"\xff\xd8\xff\xe0" + b"\x00" * 100) # fake JPEG + + mock_resp_init = MagicMock() + mock_resp_init.status_code = 200 + mock_resp_init.text = '{"media_id_string": "12345"}' + + mock_resp_append = MagicMock() + mock_resp_append.status_code = 200 + mock_resp_append.text = "" + + mock_resp_finalize = MagicMock() + mock_resp_finalize.status_code = 200 + mock_resp_finalize.text = '{"media_id_string": "12345"}' + + sess = MagicMock() + sess.post = MagicMock(side_effect=[mock_resp_init, mock_resp_append, mock_resp_finalize]) + mock_session.return_value = sess + + client = self._make_client() + media_id = client.upload_media(str(img)) + + assert media_id == "12345" + assert sess.post.call_count == 3 + + def test_upload_media_file_not_found(self): + from twitter_cli.exceptions import MediaUploadError + + client = self._make_client() + with pytest.raises(MediaUploadError, match="File not found"): + client.upload_media("/nonexistent/file.jpg") + + def test_upload_media_too_large(self, tmp_path): + from twitter_cli.exceptions import MediaUploadError + + img = tmp_path / "big.jpg" + img.write_bytes(b"\xff\xd8\xff\xe0" + b"\x00" * (6 * 1024 * 1024)) # 6 MB + + client = self._make_client() + with pytest.raises(MediaUploadError, match="File too large"): + client.upload_media(str(img)) + + def test_upload_media_unsupported_format(self, tmp_path): + from twitter_cli.exceptions import MediaUploadError + + txt = tmp_path / "notes.txt" + txt.write_text("hello") + + client = self._make_client() + with pytest.raises(MediaUploadError, match="Unsupported image format"): + client.upload_media(str(txt)) + + +# ── create_tweet with media_ids ────────────────────────────────────────── + +class TestCreateTweetWithMedia: + """Tests that media_ids are correctly passed into CreateTweet variables.""" + + @patch("twitter_cli.client._get_cffi_session") + def test_create_tweet_with_media_ids(self, mock_session): + sess = MagicMock() + mock_session.return_value = sess + + client = TwitterClient.__new__(TwitterClient) + client._auth_token = "tok" + client._ct0 = "ct0" + client._cookie_string = None + client._request_delay = 0 + client._max_retries = 0 + client._retry_base_delay = 0 + client._max_count = 200 + client._client_transaction = None + client._ct_init_attempted = True + + captured_body = {} + + def mock_graphql_post(operation_name, variables, features=None): + captured_body.update(variables) + return {"data": {"create_tweet": {"tweet_results": {"result": {"rest_id": "99"}}}}} + + client._graphql_post = mock_graphql_post + + result = client.create_tweet("test", media_ids=["111", "222"]) + assert result == "99" + + entities = captured_body["media"]["media_entities"] + assert len(entities) == 2 + assert entities[0]["media_id"] == "111" + assert entities[1]["media_id"] == "222" + + @patch("twitter_cli.client._get_cffi_session") + def test_create_tweet_without_media_ids(self, mock_session): + sess = MagicMock() + mock_session.return_value = sess + + client = TwitterClient.__new__(TwitterClient) + client._auth_token = "tok" + client._ct0 = "ct0" + client._cookie_string = None + client._request_delay = 0 + client._max_retries = 0 + client._retry_base_delay = 0 + client._max_count = 200 + client._client_transaction = None + client._ct_init_attempted = True + + captured_body = {} + + def mock_graphql_post(operation_name, variables, features=None): + captured_body.update(variables) + return {"data": {"create_tweet": {"tweet_results": {"result": {"rest_id": "88"}}}}} + + client._graphql_post = mock_graphql_post + + result = client.create_tweet("no media") + assert result == "88" + assert captured_body["media"]["media_entities"] == [] + diff --git a/twitter_cli/cli.py b/twitter_cli/cli.py index fde8c37..e5d08ba 100644 --- a/twitter_cli/cli.py +++ b/twitter_cli/cli.py @@ -18,6 +18,7 @@ Read commands: Write commands: twitter post "text" # post a tweet + twitter post "text" -i photo.jpg # post with image(s) twitter reply "text" # reply to a tweet twitter quote "text" # quote-tweet twitter delete # delete a tweet @@ -849,6 +850,24 @@ def following(screen_name, max_count, as_json, as_yaml): # ── Write commands ────────────────────────────────────────────────────── +_MAX_IMAGES = 4 # Twitter allows up to 4 images per tweet + + +def _upload_images(client, image_paths, rich_output=True): + # type: (TwitterClient, tuple, bool) -> list + """Upload images and return list of media_id strings.""" + if not image_paths: + return [] + if len(image_paths) > _MAX_IMAGES: + raise click.UsageError("Too many images: max %d, got %d" % (_MAX_IMAGES, len(image_paths))) + media_ids = [] + for i, path in enumerate(image_paths, 1): + if rich_output: + console.print("📤 Uploading image %d/%d: %s" % (i, len(image_paths), path)) + media_ids.append(client.upload_media(path)) + return media_ids + + def _write_action(emoji, action_desc, client_method, tweet_id, as_json=False, as_yaml=False): # type: (str, str, str, str, bool, bool) -> None """Generic write action helper to reduce CLI command boilerplate. @@ -874,14 +893,24 @@ def _write_action(emoji, action_desc, client_method, tweet_id, as_json=False, as @cli.command() @click.argument("text") @click.option("--reply-to", "-r", default=None, help="Reply to this tweet ID.") +@click.option("--image", "-i", "images", multiple=True, type=click.Path(exists=True), help="Attach image (up to 4). Repeatable.") @structured_output_options -def post(text, reply_to, as_json, as_yaml): - # type: (str, Optional[str], bool, bool) -> None - """Post a new tweet. TEXT is the tweet content.""" +def post(text, reply_to, images, as_json, as_yaml): + # type: (str, Optional[str], tuple, bool, bool) -> None + """Post a new tweet. TEXT is the tweet content. + + Attach images with --image / -i (up to 4): + + \b + twitter post "Hello!" --image photo.jpg + twitter post "Gallery" -i a.png -i b.png -i c.jpg + """ action = "Replying to %s" % reply_to if reply_to else "Posting tweet" + rich_output = not _structured_mode(as_json=as_json, as_yaml=as_yaml) def operation(client: TwitterClient) -> WritePayload: - tweet_id = client.create_tweet(text, reply_to_id=reply_to) + media_ids = _upload_images(client, images, rich_output=rich_output) + tweet_id = client.create_tweet(text, reply_to_id=reply_to, media_ids=media_ids or None) return {"success": True, "action": "post", "id": tweet_id, "url": "https://x.com/i/status/%s" % tweet_id} payload = _run_write_command( @@ -899,13 +928,16 @@ def post(text, reply_to, as_json, as_yaml): @cli.command(name="reply") @click.argument("tweet_id") @click.argument("text") +@click.option("--image", "-i", "images", multiple=True, type=click.Path(exists=True), help="Attach image (up to 4). Repeatable.") @structured_output_options -def reply_tweet(tweet_id, text, as_json, as_yaml): - # type: (str, str, bool, bool) -> None +def reply_tweet(tweet_id, text, images, as_json, as_yaml): + # type: (str, str, tuple, bool, bool) -> None """Reply to a tweet. TWEET_ID is the tweet to reply to, TEXT is the reply content.""" tweet_id = _normalize_tweet_id(tweet_id) + rich_output = not _structured_mode(as_json=as_json, as_yaml=as_yaml) def operation(client: TwitterClient) -> WritePayload: - new_id = client.create_tweet(text, reply_to_id=tweet_id) + media_ids = _upload_images(client, images, rich_output=rich_output) + new_id = client.create_tweet(text, reply_to_id=tweet_id, media_ids=media_ids or None) return { "success": True, "action": "reply", @@ -929,13 +961,16 @@ def reply_tweet(tweet_id, text, as_json, as_yaml): @cli.command(name="quote") @click.argument("tweet_id") @click.argument("text") +@click.option("--image", "-i", "images", multiple=True, type=click.Path(exists=True), help="Attach image (up to 4). Repeatable.") @structured_output_options -def quote_tweet(tweet_id, text, as_json, as_yaml): - # type: (str, str, bool, bool) -> None +def quote_tweet(tweet_id, text, images, as_json, as_yaml): + # type: (str, str, tuple, bool, bool) -> None """Quote-tweet a tweet. TWEET_ID is the tweet to quote, TEXT is the commentary.""" tweet_id = _normalize_tweet_id(tweet_id) + rich_output = not _structured_mode(as_json=as_json, as_yaml=as_yaml) def operation(client: TwitterClient) -> WritePayload: - new_id = client.quote_tweet(tweet_id, text) + media_ids = _upload_images(client, images, rich_output=rich_output) + new_id = client.quote_tweet(tweet_id, text, media_ids=media_ids or None) return { "success": True, "action": "quote", @@ -953,7 +988,7 @@ def quote_tweet(tweet_id, text, as_json, as_yaml): error_details={"action": "quote", "quotedId": tweet_id}, ) if payload and not _structured_mode(as_json=as_json, as_yaml=as_yaml): - console.print("🔗 %s" % payload["url"]) + console.print("🔗 %s" % payload["url"]) @cli.command(name="status") diff --git a/twitter_cli/client.py b/twitter_cli/client.py index b30c56f..9413b57 100644 --- a/twitter_cli/client.py +++ b/twitter_cli/client.py @@ -2,9 +2,11 @@ from __future__ import annotations +import base64 import json import logging import math +import mimetypes import os import random import time @@ -33,6 +35,7 @@ from .constants import ( sync_chrome_version, ) from .exceptions import ( + MediaUploadError, NotFoundError, TwitterAPIError, ) @@ -394,6 +397,10 @@ class TwitterClient: # ── Write operations ───────────────────────────────────────────── + # Supported image MIME types and max file size (5 MB) + _SUPPORTED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"} + _MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5 MB + def _write_delay(self): # type: () -> None """Sleep a random interval after write operations to avoid rate limits.""" @@ -401,12 +408,99 @@ class TwitterClient: logger.debug("Write operation delay: %.1fs", delay) time.sleep(delay) - def create_tweet(self, text, reply_to_id=None): - # type: (str, Optional[str]) -> str - """Post a new tweet. Returns the new tweet ID.""" + def upload_media(self, file_path): + # type: (str) -> str + """Upload an image file to Twitter. Returns the media_id string. + + Uses Twitter's chunked upload API (INIT → APPEND → FINALIZE). + Supports JPEG, PNG, GIF, and WebP images up to 5 MB. + """ + if not os.path.isfile(file_path): + raise MediaUploadError("File not found: %s" % file_path) + + file_size = os.path.getsize(file_path) + if file_size > self._MAX_IMAGE_SIZE: + raise MediaUploadError( + "File too large: %.1f MB (max %.0f MB)" + % (file_size / (1024 * 1024), self._MAX_IMAGE_SIZE / (1024 * 1024)) + ) + + media_type = mimetypes.guess_type(file_path)[0] or "" + if media_type not in self._SUPPORTED_IMAGE_TYPES: + raise MediaUploadError( + "Unsupported image format: %s (supported: jpeg, png, gif, webp)" % media_type + ) + + upload_url = "https://upload.twitter.com/i/media/upload.json" + session = _get_cffi_session() + + # ── INIT ───────────────────────────────────────────────────── + headers = self._build_headers(url=upload_url, method="POST") + headers["Content-Type"] = "application/x-www-form-urlencoded" + init_data = { + "command": "INIT", + "total_bytes": str(file_size), + "media_type": media_type, + } + resp = session.post(upload_url, headers=headers, data=init_data, timeout=30) + if resp.status_code >= 400: + raise MediaUploadError("INIT failed (HTTP %d): %s" % (resp.status_code, resp.text[:300])) + try: + init_result = json.loads(resp.text) + except (json.JSONDecodeError, ValueError): + raise MediaUploadError("INIT returned invalid JSON") + media_id = init_result.get("media_id_string", "") + if not media_id: + raise MediaUploadError("INIT did not return media_id") + logger.info("Media INIT: media_id=%s", media_id) + + # ── APPEND ─────────────────────────────────────────────────── + with open(file_path, "rb") as f: + media_data = base64.b64encode(f.read()).decode("ascii") + + headers = self._build_headers(url=upload_url, method="POST") + # Remove JSON content-type — curl_cffi handles multipart encoding + headers.pop("Content-Type", None) + append_data = { + "command": "APPEND", + "media_id": media_id, + "segment_index": "0", + "media_data": media_data, + } + resp = session.post(upload_url, headers=headers, data=append_data, timeout=60) + if resp.status_code >= 400: + raise MediaUploadError("APPEND failed (HTTP %d): %s" % (resp.status_code, resp.text[:300])) + logger.info("Media APPEND: segment 0 uploaded") + + # ── FINALIZE ───────────────────────────────────────────────── + headers = self._build_headers(url=upload_url, method="POST") + headers["Content-Type"] = "application/x-www-form-urlencoded" + finalize_data = { + "command": "FINALIZE", + "media_id": media_id, + } + resp = session.post(upload_url, headers=headers, data=finalize_data, timeout=30) + if resp.status_code >= 400: + raise MediaUploadError("FINALIZE failed (HTTP %d): %s" % (resp.status_code, resp.text[:300])) + logger.info("Media FINALIZE: media_id=%s ready", media_id) + + return media_id + + def create_tweet(self, text, reply_to_id=None, media_ids=None): + # type: (str, Optional[str], Optional[List[str]]) -> str + """Post a new tweet. Returns the new tweet ID. + + Args: + text: Tweet text content. + reply_to_id: Optional tweet ID to reply to. + media_ids: Optional list of media IDs (from upload_media) to attach. + """ + media_entities = [] + if media_ids: + media_entities = [{"media_id": mid, "tagged_users": []} for mid in media_ids] variables = { "tweet_text": text, - "media": {"media_entities": [], "possibly_sensitive": False}, + "media": {"media_entities": media_entities, "possibly_sensitive": False}, "semantic_annotation_ids": [], "dark_request": False, } # type: Dict[str, Any] @@ -526,13 +620,22 @@ class TwitterClient: raise TwitterAPIError(0, "Failed to fetch current user info") - def quote_tweet(self, tweet_id, text): - # type: (str, str) -> str - """Quote-tweet a tweet. Returns the new tweet ID.""" + def quote_tweet(self, tweet_id, text, media_ids=None): + # type: (str, str, Optional[List[str]]) -> str + """Quote-tweet a tweet. Returns the new tweet ID. + + Args: + tweet_id: The tweet ID to quote. + text: Commentary text. + media_ids: Optional list of media IDs (from upload_media) to attach. + """ + media_entities = [] + if media_ids: + media_entities = [{"media_id": mid, "tagged_users": []} for mid in media_ids] variables = { "tweet_text": text, "attachment_url": "https://x.com/i/status/%s" % tweet_id, - "media": {"media_entities": [], "possibly_sensitive": False}, + "media": {"media_entities": media_entities, "possibly_sensitive": False}, "semantic_annotation_ids": [], "dark_request": False, } diff --git a/twitter_cli/exceptions.py b/twitter_cli/exceptions.py index 2d9a84c..d8fd25a 100644 --- a/twitter_cli/exceptions.py +++ b/twitter_cli/exceptions.py @@ -36,6 +36,10 @@ class QueryIdError(TwitterError): """Raised when a GraphQL queryId cannot be resolved.""" +class MediaUploadError(TwitterError): + """Raised when media upload fails (file not found, too large, unsupported format, API error).""" + + class TwitterAPIError(TwitterError): """Raised on non-OK Twitter API responses with HTTP status + message."""