Move code directly into backend and cli src
This commit is contained in:
101
cli/src/main.py
Normal file
101
cli/src/main.py
Normal file
@@ -0,0 +1,101 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
SUPPORTED_FORMATS = ("txt", "vtt", "srt", "tsv", "json")
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="Send transcription jobs to a remote Whisper backend.")
|
||||
parser.add_argument("file", type=Path, help="Path to the local media file to upload.")
|
||||
parser.add_argument("--model", required=True, help="Whisper model name to use on the backend.")
|
||||
parser.add_argument("--language", help="Optional language code to pass through to Whisper.")
|
||||
parser.add_argument(
|
||||
"--output-format",
|
||||
default="txt",
|
||||
choices=SUPPORTED_FORMATS,
|
||||
help="Transcript artifact format returned by the backend.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--server",
|
||||
help="Override the backend base URL. Defaults to the WHISPER_REMOTE environment variable.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--to-file",
|
||||
type=Path,
|
||||
help="Optional local file path or directory to save the returned transcript artifact.",
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
def resolve_server(args: argparse.Namespace) -> str:
|
||||
server = args.server or os.environ.get("WHISPER_REMOTE")
|
||||
if not server:
|
||||
raise SystemExit("WHISPER_REMOTE is not set and --server was not provided.")
|
||||
return server.rstrip("/")
|
||||
|
||||
|
||||
def infer_output_path(target: Path, input_file: Path, output_format: str) -> Path:
|
||||
if target.exists() and target.is_dir():
|
||||
return target / f"{input_file.stem}.{output_format}"
|
||||
if target.suffix:
|
||||
return target
|
||||
return target / f"{input_file.stem}.{output_format}"
|
||||
|
||||
|
||||
def print_response(response: httpx.Response) -> None:
|
||||
sys.stdout.write(response.text)
|
||||
if response.text and not response.text.endswith("\n"):
|
||||
sys.stdout.write("\n")
|
||||
|
||||
|
||||
def save_response(response: httpx.Response, destination: Path) -> None:
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
destination.write_bytes(response.content)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = build_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
input_file = args.file.expanduser().resolve()
|
||||
if not input_file.is_file():
|
||||
parser.error(f"Input file does not exist: {input_file}")
|
||||
|
||||
server = resolve_server(args)
|
||||
endpoint = f"{server}/transcriptions"
|
||||
|
||||
with input_file.open("rb") as handle, httpx.Client(timeout=300.0) as client:
|
||||
response = client.post(
|
||||
endpoint,
|
||||
data={
|
||||
"model": args.model,
|
||||
"language": args.language or "",
|
||||
"output_format": args.output_format,
|
||||
},
|
||||
files={"file": (input_file.name, handle, "application/octet-stream")},
|
||||
)
|
||||
|
||||
try:
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError as exc:
|
||||
message = exc.response.text.strip() or str(exc)
|
||||
parser.exit(1, f"{message}\n")
|
||||
|
||||
if args.to_file:
|
||||
destination = infer_output_path(args.to_file.expanduser(), input_file, args.output_format)
|
||||
save_response(response, destination)
|
||||
sys.stdout.write(f"{destination}\n")
|
||||
else:
|
||||
print_response(response)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user