118 lines
3.8 KiB
Python
118 lines
3.8 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
SUPPORTED_FORMATS = ("txt", "vtt", "srt", "tsv", "json")
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Send transcription jobs to a remote Whisper backend.")
|
|
parser.add_argument("file", type=Path, help="Path to the local media file to upload.")
|
|
parser.add_argument("--model", required=True, help="Whisper model name to use on the backend.")
|
|
parser.add_argument("--language", help="Optional language code to pass through to Whisper.")
|
|
parser.add_argument(
|
|
"--output-format",
|
|
default="txt",
|
|
choices=SUPPORTED_FORMATS,
|
|
help="Transcript artifact format returned by the backend.",
|
|
)
|
|
parser.add_argument(
|
|
"--server",
|
|
help="Override the backend base URL. Defaults to the WHISPER_REMOTE environment variable.",
|
|
)
|
|
parser.add_argument(
|
|
"--to-file",
|
|
type=Path,
|
|
help="Optional local file path or directory to save the returned transcript artifact.",
|
|
)
|
|
return parser
|
|
|
|
|
|
def resolve_server(args: argparse.Namespace) -> str:
|
|
server = args.server or os.environ.get("WHISPER_REMOTE")
|
|
if not server:
|
|
raise SystemExit("WHISPER_REMOTE is not set and --server was not provided.")
|
|
return server.rstrip("/")
|
|
|
|
|
|
def infer_output_path(target: Path, input_file: Path, output_format: str) -> Path:
|
|
if target.exists() and target.is_dir():
|
|
return target / f"{input_file.stem}.{output_format}"
|
|
if target.suffix:
|
|
return target
|
|
return target / f"{input_file.stem}.{output_format}"
|
|
|
|
|
|
def print_response(response: httpx.Response) -> None:
|
|
sys.stdout.write(response.text)
|
|
if response.text and not response.text.endswith("\n"):
|
|
sys.stdout.write("\n")
|
|
|
|
|
|
def save_response(response: httpx.Response, destination: Path) -> None:
|
|
destination.parent.mkdir(parents=True, exist_ok=True)
|
|
destination.write_bytes(response.content)
|
|
|
|
|
|
def format_http_error(response: httpx.Response, endpoint: str) -> str:
|
|
body = response.text.strip() or "<empty response body>"
|
|
return f"HTTP {response.status_code} from {endpoint}: {body}"
|
|
|
|
|
|
def format_request_error(exc: httpx.RequestError, endpoint: str) -> str:
|
|
if isinstance(exc, httpx.TimeoutException):
|
|
return f"Request to {endpoint} timed out."
|
|
|
|
reason = str(exc).strip() or exc.__class__.__name__
|
|
return f"Request to {endpoint} failed: {reason}"
|
|
|
|
|
|
def main() -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
|
|
input_file = args.file.expanduser().resolve()
|
|
if not input_file.is_file():
|
|
parser.error(f"Input file does not exist: {input_file}")
|
|
|
|
server = resolve_server(args)
|
|
endpoint = f"{server}/transcriptions"
|
|
|
|
try:
|
|
with input_file.open("rb") as handle, httpx.Client(timeout=300.0) as client:
|
|
response = client.post(
|
|
endpoint,
|
|
data={
|
|
"model": args.model,
|
|
"language": args.language or "",
|
|
"output_format": args.output_format,
|
|
},
|
|
files={"file": (input_file.name, handle, "application/octet-stream")},
|
|
)
|
|
except httpx.RequestError as exc:
|
|
parser.exit(1, f"{format_request_error(exc, endpoint)}\n")
|
|
|
|
try:
|
|
response.raise_for_status()
|
|
except httpx.HTTPStatusError as exc:
|
|
message = format_http_error(exc.response, endpoint)
|
|
parser.exit(1, f"{message}\n")
|
|
|
|
if args.to_file:
|
|
destination = infer_output_path(args.to_file.expanduser(), input_file, args.output_format)
|
|
save_response(response, destination)
|
|
sys.stdout.write(f"{destination}\n")
|
|
else:
|
|
print_response(response)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|