feat: add image upload support for post/reply/quote commands

- Add upload_media() method to TwitterClient (INIT/APPEND/FINALIZE flow
  via upload.twitter.com, supports JPEG/PNG/GIF/WebP up to 5MB)
- Extend create_tweet() and quote_tweet() with optional media_ids param
- Add --image/-i option to post, reply, and quote CLI commands (max 4)
- Add MediaUploadError exception for upload-specific error handling
- Add 6 unit tests covering upload flow, validation, and media_ids

Co-Authored-By: Catafal <67582323+Catafal@users.noreply.github.com>
This commit is contained in:
jackwener
2026-03-13 01:57:27 +08:00
parent 7d1b519c85
commit 69cb85a1c2
5 changed files with 307 additions and 22 deletions

View File

@@ -321,7 +321,7 @@ def test_cli_reply_command(monkeypatch) -> None:
calls = []
class FakeClient:
def create_tweet(self, text: str, reply_to_id=None) -> str:
def create_tweet(self, text: str, reply_to_id=None, media_ids=None) -> str:
calls.append({"text": text, "reply_to_id": reply_to_id})
return "999"
@@ -338,7 +338,7 @@ def test_cli_quote_command(monkeypatch) -> None:
calls = []
class FakeClient:
def quote_tweet(self, tweet_id: str, text: str) -> str:
def quote_tweet(self, tweet_id: str, text: str, media_ids=None) -> str:
calls.append({"tweet_id": tweet_id, "text": text})
return "888"
@@ -353,7 +353,7 @@ def test_cli_quote_command(monkeypatch) -> None:
def test_cli_post_json_output(monkeypatch) -> None:
class FakeClient:
def create_tweet(self, text: str, reply_to_id=None) -> str:
def create_tweet(self, text: str, reply_to_id=None, media_ids=None) -> str:
assert text == "hello"
assert reply_to_id is None
return "999"

View File

@@ -9,6 +9,8 @@ from __future__ import annotations
import copy
from unittest.mock import MagicMock, patch
import pytest
from twitter_cli.client import (
FEATURES,
@@ -540,3 +542,144 @@ class TestParseUserResult:
assert user.following_count == 56
assert user.tweets_count == 78
assert user.likes_count == 0
# ── upload_media ─────────────────────────────────────────────────────────
class TestUploadMedia:
"""Tests for TwitterClient.upload_media()."""
def _make_client(self):
client = TwitterClient.__new__(TwitterClient)
client._auth_token = "tok"
client._ct0 = "ct0"
client._cookie_string = None
client._request_delay = 0
client._max_retries = 3
client._retry_base_delay = 5.0
client._max_count = 200
client._client_transaction = None
client._ct_init_attempted = True
return client
@patch("twitter_cli.client._get_cffi_session")
def test_upload_media_init_append_finalize(self, mock_session, tmp_path):
"""Happy path: INIT → APPEND → FINALIZE returns media_id."""
img = tmp_path / "photo.jpg"
img.write_bytes(b"\xff\xd8\xff\xe0" + b"\x00" * 100) # fake JPEG
mock_resp_init = MagicMock()
mock_resp_init.status_code = 200
mock_resp_init.text = '{"media_id_string": "12345"}'
mock_resp_append = MagicMock()
mock_resp_append.status_code = 200
mock_resp_append.text = ""
mock_resp_finalize = MagicMock()
mock_resp_finalize.status_code = 200
mock_resp_finalize.text = '{"media_id_string": "12345"}'
sess = MagicMock()
sess.post = MagicMock(side_effect=[mock_resp_init, mock_resp_append, mock_resp_finalize])
mock_session.return_value = sess
client = self._make_client()
media_id = client.upload_media(str(img))
assert media_id == "12345"
assert sess.post.call_count == 3
def test_upload_media_file_not_found(self):
from twitter_cli.exceptions import MediaUploadError
client = self._make_client()
with pytest.raises(MediaUploadError, match="File not found"):
client.upload_media("/nonexistent/file.jpg")
def test_upload_media_too_large(self, tmp_path):
from twitter_cli.exceptions import MediaUploadError
img = tmp_path / "big.jpg"
img.write_bytes(b"\xff\xd8\xff\xe0" + b"\x00" * (6 * 1024 * 1024)) # 6 MB
client = self._make_client()
with pytest.raises(MediaUploadError, match="File too large"):
client.upload_media(str(img))
def test_upload_media_unsupported_format(self, tmp_path):
from twitter_cli.exceptions import MediaUploadError
txt = tmp_path / "notes.txt"
txt.write_text("hello")
client = self._make_client()
with pytest.raises(MediaUploadError, match="Unsupported image format"):
client.upload_media(str(txt))
# ── create_tweet with media_ids ──────────────────────────────────────────
class TestCreateTweetWithMedia:
"""Tests that media_ids are correctly passed into CreateTweet variables."""
@patch("twitter_cli.client._get_cffi_session")
def test_create_tweet_with_media_ids(self, mock_session):
sess = MagicMock()
mock_session.return_value = sess
client = TwitterClient.__new__(TwitterClient)
client._auth_token = "tok"
client._ct0 = "ct0"
client._cookie_string = None
client._request_delay = 0
client._max_retries = 0
client._retry_base_delay = 0
client._max_count = 200
client._client_transaction = None
client._ct_init_attempted = True
captured_body = {}
def mock_graphql_post(operation_name, variables, features=None):
captured_body.update(variables)
return {"data": {"create_tweet": {"tweet_results": {"result": {"rest_id": "99"}}}}}
client._graphql_post = mock_graphql_post
result = client.create_tweet("test", media_ids=["111", "222"])
assert result == "99"
entities = captured_body["media"]["media_entities"]
assert len(entities) == 2
assert entities[0]["media_id"] == "111"
assert entities[1]["media_id"] == "222"
@patch("twitter_cli.client._get_cffi_session")
def test_create_tweet_without_media_ids(self, mock_session):
sess = MagicMock()
mock_session.return_value = sess
client = TwitterClient.__new__(TwitterClient)
client._auth_token = "tok"
client._ct0 = "ct0"
client._cookie_string = None
client._request_delay = 0
client._max_retries = 0
client._retry_base_delay = 0
client._max_count = 200
client._client_transaction = None
client._ct_init_attempted = True
captured_body = {}
def mock_graphql_post(operation_name, variables, features=None):
captured_body.update(variables)
return {"data": {"create_tweet": {"tweet_results": {"result": {"rest_id": "88"}}}}}
client._graphql_post = mock_graphql_post
result = client.create_tweet("no media")
assert result == "88"
assert captured_body["media"]["media_entities"] == []

View File

@@ -18,6 +18,7 @@ Read commands:
Write commands:
twitter post "text" # post a tweet
twitter post "text" -i photo.jpg # post with image(s)
twitter reply <id> "text" # reply to a tweet
twitter quote <id> "text" # quote-tweet
twitter delete <id> # delete a tweet
@@ -849,6 +850,24 @@ def following(screen_name, max_count, as_json, as_yaml):
# ── Write commands ──────────────────────────────────────────────────────
_MAX_IMAGES = 4 # Twitter allows up to 4 images per tweet
def _upload_images(client, image_paths, rich_output=True):
# type: (TwitterClient, tuple, bool) -> list
"""Upload images and return list of media_id strings."""
if not image_paths:
return []
if len(image_paths) > _MAX_IMAGES:
raise click.UsageError("Too many images: max %d, got %d" % (_MAX_IMAGES, len(image_paths)))
media_ids = []
for i, path in enumerate(image_paths, 1):
if rich_output:
console.print("📤 Uploading image %d/%d: %s" % (i, len(image_paths), path))
media_ids.append(client.upload_media(path))
return media_ids
def _write_action(emoji, action_desc, client_method, tweet_id, as_json=False, as_yaml=False):
# type: (str, str, str, str, bool, bool) -> None
"""Generic write action helper to reduce CLI command boilerplate.
@@ -874,14 +893,24 @@ def _write_action(emoji, action_desc, client_method, tweet_id, as_json=False, as
@cli.command()
@click.argument("text")
@click.option("--reply-to", "-r", default=None, help="Reply to this tweet ID.")
@click.option("--image", "-i", "images", multiple=True, type=click.Path(exists=True), help="Attach image (up to 4). Repeatable.")
@structured_output_options
def post(text, reply_to, as_json, as_yaml):
# type: (str, Optional[str], bool, bool) -> None
"""Post a new tweet. TEXT is the tweet content."""
def post(text, reply_to, images, as_json, as_yaml):
# type: (str, Optional[str], tuple, bool, bool) -> None
"""Post a new tweet. TEXT is the tweet content.
Attach images with --image / -i (up to 4):
\b
twitter post "Hello!" --image photo.jpg
twitter post "Gallery" -i a.png -i b.png -i c.jpg
"""
action = "Replying to %s" % reply_to if reply_to else "Posting tweet"
rich_output = not _structured_mode(as_json=as_json, as_yaml=as_yaml)
def operation(client: TwitterClient) -> WritePayload:
tweet_id = client.create_tweet(text, reply_to_id=reply_to)
media_ids = _upload_images(client, images, rich_output=rich_output)
tweet_id = client.create_tweet(text, reply_to_id=reply_to, media_ids=media_ids or None)
return {"success": True, "action": "post", "id": tweet_id, "url": "https://x.com/i/status/%s" % tweet_id}
payload = _run_write_command(
@@ -899,13 +928,16 @@ def post(text, reply_to, as_json, as_yaml):
@cli.command(name="reply")
@click.argument("tweet_id")
@click.argument("text")
@click.option("--image", "-i", "images", multiple=True, type=click.Path(exists=True), help="Attach image (up to 4). Repeatable.")
@structured_output_options
def reply_tweet(tweet_id, text, as_json, as_yaml):
# type: (str, str, bool, bool) -> None
def reply_tweet(tweet_id, text, images, as_json, as_yaml):
# type: (str, str, tuple, bool, bool) -> None
"""Reply to a tweet. TWEET_ID is the tweet to reply to, TEXT is the reply content."""
tweet_id = _normalize_tweet_id(tweet_id)
rich_output = not _structured_mode(as_json=as_json, as_yaml=as_yaml)
def operation(client: TwitterClient) -> WritePayload:
new_id = client.create_tweet(text, reply_to_id=tweet_id)
media_ids = _upload_images(client, images, rich_output=rich_output)
new_id = client.create_tweet(text, reply_to_id=tweet_id, media_ids=media_ids or None)
return {
"success": True,
"action": "reply",
@@ -929,13 +961,16 @@ def reply_tweet(tweet_id, text, as_json, as_yaml):
@cli.command(name="quote")
@click.argument("tweet_id")
@click.argument("text")
@click.option("--image", "-i", "images", multiple=True, type=click.Path(exists=True), help="Attach image (up to 4). Repeatable.")
@structured_output_options
def quote_tweet(tweet_id, text, as_json, as_yaml):
# type: (str, str, bool, bool) -> None
def quote_tweet(tweet_id, text, images, as_json, as_yaml):
# type: (str, str, tuple, bool, bool) -> None
"""Quote-tweet a tweet. TWEET_ID is the tweet to quote, TEXT is the commentary."""
tweet_id = _normalize_tweet_id(tweet_id)
rich_output = not _structured_mode(as_json=as_json, as_yaml=as_yaml)
def operation(client: TwitterClient) -> WritePayload:
new_id = client.quote_tweet(tweet_id, text)
media_ids = _upload_images(client, images, rich_output=rich_output)
new_id = client.quote_tweet(tweet_id, text, media_ids=media_ids or None)
return {
"success": True,
"action": "quote",
@@ -953,7 +988,7 @@ def quote_tweet(tweet_id, text, as_json, as_yaml):
error_details={"action": "quote", "quotedId": tweet_id},
)
if payload and not _structured_mode(as_json=as_json, as_yaml=as_yaml):
console.print("🔗 %s" % payload["url"])
console.print("🔗 %s" % payload["url"])
@cli.command(name="status")

View File

@@ -2,9 +2,11 @@
from __future__ import annotations
import base64
import json
import logging
import math
import mimetypes
import os
import random
import time
@@ -33,6 +35,7 @@ from .constants import (
sync_chrome_version,
)
from .exceptions import (
MediaUploadError,
NotFoundError,
TwitterAPIError,
)
@@ -394,6 +397,10 @@ class TwitterClient:
# ── Write operations ─────────────────────────────────────────────
# Supported image MIME types and max file size (5 MB)
_SUPPORTED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
_MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5 MB
def _write_delay(self):
# type: () -> None
"""Sleep a random interval after write operations to avoid rate limits."""
@@ -401,12 +408,99 @@ class TwitterClient:
logger.debug("Write operation delay: %.1fs", delay)
time.sleep(delay)
def create_tweet(self, text, reply_to_id=None):
# type: (str, Optional[str]) -> str
"""Post a new tweet. Returns the new tweet ID."""
def upload_media(self, file_path):
# type: (str) -> str
"""Upload an image file to Twitter. Returns the media_id string.
Uses Twitter's chunked upload API (INIT → APPEND → FINALIZE).
Supports JPEG, PNG, GIF, and WebP images up to 5 MB.
"""
if not os.path.isfile(file_path):
raise MediaUploadError("File not found: %s" % file_path)
file_size = os.path.getsize(file_path)
if file_size > self._MAX_IMAGE_SIZE:
raise MediaUploadError(
"File too large: %.1f MB (max %.0f MB)"
% (file_size / (1024 * 1024), self._MAX_IMAGE_SIZE / (1024 * 1024))
)
media_type = mimetypes.guess_type(file_path)[0] or ""
if media_type not in self._SUPPORTED_IMAGE_TYPES:
raise MediaUploadError(
"Unsupported image format: %s (supported: jpeg, png, gif, webp)" % media_type
)
upload_url = "https://upload.twitter.com/i/media/upload.json"
session = _get_cffi_session()
# ── INIT ─────────────────────────────────────────────────────
headers = self._build_headers(url=upload_url, method="POST")
headers["Content-Type"] = "application/x-www-form-urlencoded"
init_data = {
"command": "INIT",
"total_bytes": str(file_size),
"media_type": media_type,
}
resp = session.post(upload_url, headers=headers, data=init_data, timeout=30)
if resp.status_code >= 400:
raise MediaUploadError("INIT failed (HTTP %d): %s" % (resp.status_code, resp.text[:300]))
try:
init_result = json.loads(resp.text)
except (json.JSONDecodeError, ValueError):
raise MediaUploadError("INIT returned invalid JSON")
media_id = init_result.get("media_id_string", "")
if not media_id:
raise MediaUploadError("INIT did not return media_id")
logger.info("Media INIT: media_id=%s", media_id)
# ── APPEND ───────────────────────────────────────────────────
with open(file_path, "rb") as f:
media_data = base64.b64encode(f.read()).decode("ascii")
headers = self._build_headers(url=upload_url, method="POST")
# Remove JSON content-type — curl_cffi handles multipart encoding
headers.pop("Content-Type", None)
append_data = {
"command": "APPEND",
"media_id": media_id,
"segment_index": "0",
"media_data": media_data,
}
resp = session.post(upload_url, headers=headers, data=append_data, timeout=60)
if resp.status_code >= 400:
raise MediaUploadError("APPEND failed (HTTP %d): %s" % (resp.status_code, resp.text[:300]))
logger.info("Media APPEND: segment 0 uploaded")
# ── FINALIZE ─────────────────────────────────────────────────
headers = self._build_headers(url=upload_url, method="POST")
headers["Content-Type"] = "application/x-www-form-urlencoded"
finalize_data = {
"command": "FINALIZE",
"media_id": media_id,
}
resp = session.post(upload_url, headers=headers, data=finalize_data, timeout=30)
if resp.status_code >= 400:
raise MediaUploadError("FINALIZE failed (HTTP %d): %s" % (resp.status_code, resp.text[:300]))
logger.info("Media FINALIZE: media_id=%s ready", media_id)
return media_id
def create_tweet(self, text, reply_to_id=None, media_ids=None):
# type: (str, Optional[str], Optional[List[str]]) -> str
"""Post a new tweet. Returns the new tweet ID.
Args:
text: Tweet text content.
reply_to_id: Optional tweet ID to reply to.
media_ids: Optional list of media IDs (from upload_media) to attach.
"""
media_entities = []
if media_ids:
media_entities = [{"media_id": mid, "tagged_users": []} for mid in media_ids]
variables = {
"tweet_text": text,
"media": {"media_entities": [], "possibly_sensitive": False},
"media": {"media_entities": media_entities, "possibly_sensitive": False},
"semantic_annotation_ids": [],
"dark_request": False,
} # type: Dict[str, Any]
@@ -526,13 +620,22 @@ class TwitterClient:
raise TwitterAPIError(0, "Failed to fetch current user info")
def quote_tweet(self, tweet_id, text):
# type: (str, str) -> str
"""Quote-tweet a tweet. Returns the new tweet ID."""
def quote_tweet(self, tweet_id, text, media_ids=None):
# type: (str, str, Optional[List[str]]) -> str
"""Quote-tweet a tweet. Returns the new tweet ID.
Args:
tweet_id: The tweet ID to quote.
text: Commentary text.
media_ids: Optional list of media IDs (from upload_media) to attach.
"""
media_entities = []
if media_ids:
media_entities = [{"media_id": mid, "tagged_users": []} for mid in media_ids]
variables = {
"tweet_text": text,
"attachment_url": "https://x.com/i/status/%s" % tweet_id,
"media": {"media_entities": [], "possibly_sensitive": False},
"media": {"media_entities": media_entities, "possibly_sensitive": False},
"semantic_annotation_ids": [],
"dark_request": False,
}

View File

@@ -36,6 +36,10 @@ class QueryIdError(TwitterError):
"""Raised when a GraphQL queryId cannot be resolved."""
class MediaUploadError(TwitterError):
"""Raised when media upload fails (file not found, too large, unsupported format, API error)."""
class TwitterAPIError(TwitterError):
"""Raised on non-OK Twitter API responses with HTTP status + message."""