#!/usr/bin/env python3 """Shared logic for generating and reading large files.""" from __future__ import annotations import argparse import hashlib import os import re import shutil import sys import time from dataclasses import dataclass from pathlib import Path from typing import Iterable, Optional VERSION = "2.0.0" MAX_CHUNK_SIZE = 1024 ** 3 _PROGRESS_STEP = 5 _SIZE_RE = re.compile(r"^\s*(?P\d+(?:\.\d+)?)\s*(?P[A-Za-z]*)\s*$") _BINARY_UNITS = { "": 1, "B": 1, "K": 1024, "KB": 1024, "KIB": 1024, "M": 1024 ** 2, "MB": 1024 ** 2, "MIB": 1024 ** 2, "G": 1024 ** 3, "GB": 1024 ** 3, "GIB": 1024 ** 3, "T": 1024 ** 4, "TB": 1024 ** 4, "TIB": 1024 ** 4, "P": 1024 ** 5, "PB": 1024 ** 5, "PIB": 1024 ** 5, } @dataclass(frozen=True) class ProgressState: total_bytes: int last_reported_percent: int = -1 last_log_time: float = 0.0 last_log_bytes: int = 0 def parse_size(value: str) -> int: match = _SIZE_RE.match(value) if not match: raise ValueError(f"invalid size: {value!r}") amount = float(match.group("value")) unit = match.group("unit").upper() if unit not in _BINARY_UNITS: raise ValueError(f"unknown size unit: {unit or 'bytes'}") result = int(amount * _BINARY_UNITS[unit]) if result <= 0: raise ValueError("size must be greater than zero") return result def format_bytes(value: int) -> str: units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"] size = float(value) for unit in units: if size < 1024.0 or unit == units[-1]: return f"{size:.2f} {unit}" size /= 1024.0 return f"{size:.2f} PiB" def _disk_free_bytes(path: Path) -> int: usage = shutil.disk_usage(path) return usage.free def _ensure_parent_dir(path: Path) -> None: if path.parent and not path.parent.exists(): raise FileNotFoundError(f"directory does not exist: {path.parent}") def _write_progress(prefix: str, current: int, total: int) -> None: percent = min(100, int(current * 100 / total)) if total else 100 print(f"{prefix}: {percent}% ({format_bytes(current)} of {format_bytes(total)})") def _maybe_log_throughput(prefix: str, bytes_done: int, started_at: float, state: ProgressState) -> ProgressState: now = time.time() if state.last_log_time == 0.0: return ProgressState(state.total_bytes, state.last_reported_percent, now, bytes_done) elapsed = now - state.last_log_time if elapsed < 1.0: return state delta = bytes_done - state.last_log_bytes speed = delta / elapsed if elapsed > 0 else 0 print(f"{prefix}: {format_bytes(bytes_done)} at {format_bytes(int(speed))}/s") return ProgressState(state.total_bytes, state.last_reported_percent, now, bytes_done) def create_file( output: str | Path, total_bytes: int, chunk_size: int, quiet: bool = False, sparse: bool = False, force: bool = False, ) -> int: output_path = Path(output) existed_before = output_path.exists() if existed_before and not force: print(f"Error: file already exists: {output_path}", file=sys.stderr) print("Use --force to overwrite.", file=sys.stderr) return 1 _ensure_parent_dir(output_path) if chunk_size <= 0 or chunk_size > MAX_CHUNK_SIZE: print("Error: chunk size must be between 1 byte and 1 GiB", file=sys.stderr) return 1 free_bytes = _disk_free_bytes(output_path.parent if output_path.parent != Path("") else Path(".")) if not sparse and free_bytes < total_bytes: print( f"Error: not enough disk space, need {format_bytes(total_bytes)}, have {format_bytes(free_bytes)}", file=sys.stderr, ) return 1 if not quiet: print(f"Creating: {output_path}") print(f"Size: {format_bytes(total_bytes)}") print(f"Chunk: {format_bytes(chunk_size)}") if sparse: print("Mode: sparse") print() started_at = time.time() progress = ProgressState(total_bytes=total_bytes, last_log_time=started_at, last_log_bytes=0) buffer = b"\x00" * chunk_size try: with output_path.open("wb") as handle: if sparse: handle.truncate(total_bytes) written = total_bytes else: written = 0 while written < total_bytes: step = min(chunk_size, total_bytes - written) handle.write(buffer[:step]) written += step if quiet: continue percent = int(written * 100 / total_bytes) if percent >= progress.last_reported_percent + _PROGRESS_STEP: _write_progress("Progress", written, total_bytes) progress = ProgressState(total_bytes, percent, progress.last_log_time, progress.last_log_bytes) progress = _maybe_log_throughput("Speed", written, started_at, progress) elapsed = time.time() - started_at print(f"Done, wrote {format_bytes(written)} to {output_path}") print(f"Elapsed: {elapsed:.2f}s") if elapsed > 0: print(f"Average: {format_bytes(int(written / elapsed))}/s") return 0 except KeyboardInterrupt: print("Interrupted, cleaning up partial file", file=sys.stderr) try: if output_path.exists() and not existed_before: output_path.unlink() except OSError: pass return 130 except OSError as exc: print(f"Error writing file: {exc}", file=sys.stderr) try: if output_path.exists() and not existed_before: output_path.unlink() except OSError: pass return 1 def read_file(input_path: str | Path, chunk_size: int, compute_hash: bool = False, quiet: bool = False) -> int: path = Path(input_path) if not path.exists(): print(f"Error: file not found: {path}", file=sys.stderr) return 1 if not path.is_file(): print(f"Error: not a file: {path}", file=sys.stderr) return 1 if chunk_size <= 0 or chunk_size > MAX_CHUNK_SIZE: print("Error: chunk size must be between 1 byte and 1 GiB", file=sys.stderr) return 1 try: total_bytes = path.stat().st_size except OSError as exc: print(f"Error reading file metadata: {exc}", file=sys.stderr) return 1 if not quiet: print(f"Reading: {path}") print(f"Size: {format_bytes(total_bytes)}") print(f"Chunk: {format_bytes(chunk_size)}") if compute_hash: print("Hash: sha256") print() started_at = time.time() progress = ProgressState(total_bytes=total_bytes, last_log_time=started_at, last_log_bytes=0) hasher = hashlib.sha256() if compute_hash else None bytes_read = 0 try: with path.open("rb") as handle: while True: chunk = handle.read(chunk_size) if not chunk: break bytes_read += len(chunk) if hasher is not None: hasher.update(chunk) if quiet: continue percent = int(bytes_read * 100 / total_bytes) if total_bytes else 100 if percent >= progress.last_reported_percent + _PROGRESS_STEP: _write_progress("Progress", bytes_read, total_bytes) progress = ProgressState(total_bytes, percent, progress.last_log_time, progress.last_log_bytes) progress = _maybe_log_throughput("Speed", bytes_read, started_at, progress) elapsed = time.time() - started_at print(f"Done, read {format_bytes(bytes_read)} from {path}") print(f"Elapsed: {elapsed:.2f}s") if elapsed > 0: print(f"Average: {format_bytes(int(bytes_read / elapsed))}/s") if hasher is not None: print(f"SHA256: {hasher.hexdigest()}") return 0 except KeyboardInterrupt: print("Interrupted", file=sys.stderr) return 130 except OSError as exc: print(f"Error reading file: {exc}", file=sys.stderr) return 1 def build_create_parser(prog: str) -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog=prog, description="Create large binary files for storage and transfer testing.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Examples:\n" f" {prog} output.bin 15GB\n" f" {prog} dump.dat 1.5TB --chunk-size 128MB\n" f" {prog} test.bin 500MB --quiet" ), ) parser.add_argument("output", help="Path to the file to create") parser.add_argument("size", help="Target size, for example 15GB or 1.5TiB") parser.add_argument("--chunk-size", default="64MB", help="Write chunk size (default: 64MB)") parser.add_argument("--sparse", action="store_true", help="Create a sparse file instead of writing zeros") parser.add_argument("--force", "-f", action="store_true", help="Overwrite output file if it already exists") parser.add_argument("--quiet", "-q", action="store_true", help="Suppress progress output") parser.add_argument("--version", action="version", version=f"{prog} {VERSION}") return parser def build_read_parser(prog: str) -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog=prog, description="Read large files and benchmark I/O throughput.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Examples:\n" f" {prog} largefile.bin\n" f" {prog} test.dat --chunk-size 128MB --hash\n" f" {prog} data.bin --quiet" ), ) parser.add_argument("input", help="Path to the file to read") parser.add_argument("--chunk-size", default="64MB", help="Read chunk size (default: 64MB)") parser.add_argument("--hash", action="store_true", help="Compute SHA256 while reading") parser.add_argument("--quiet", "-q", action="store_true", help="Suppress progress output") parser.add_argument("--version", action="version", version=f"{prog} {VERSION}") return parser def create_main(argv: Optional[Iterable[str]] = None) -> int: parser = build_create_parser("make_big_file.py") args = parser.parse_args(list(argv) if argv is not None else None) try: total_bytes = parse_size(args.size) chunk_size = parse_size(args.chunk_size) except ValueError as exc: print(f"Error: {exc}", file=sys.stderr) return 1 return create_file(args.output, total_bytes, chunk_size, args.quiet, args.sparse, args.force) def read_main(argv: Optional[Iterable[str]] = None) -> int: parser = build_read_parser("read_big_file.py") args = parser.parse_args(list(argv) if argv is not None else None) try: chunk_size = parse_size(args.chunk_size) except ValueError as exc: print(f"Error: {exc}", file=sys.stderr) return 1 return read_file(args.input, chunk_size, args.hash, args.quiet) if __name__ == "__main__": raise SystemExit(create_main())