Rewrite big file CLI tools

This commit is contained in:
2026-04-08 00:27:11 +02:00
parent ca1f80d470
commit 31b46d0dd5
4 changed files with 381 additions and 573 deletions

227
README.md
View File

@@ -1,184 +1,69 @@
# Big File Generator # big-file-gen
A collection of Python CLI tools for creating and reading large binary files. Useful for testing disk I/O performance, storage systems, and file transfer mechanisms. Small Python CLI tools for creating and reading large files.
Useful for storage testing, transfer checks, and dumb-fun disk abuse.
## Tools ## What it does
### `make_big_file.py` - File Generator - create large binary files filled with zeros
- optionally create sparse files instead
- read files back and measure throughput
- optionally compute SHA256 while reading
- no third-party dependencies
Creates large binary files filled with zeros for testing purposes. ## Usage
**Features:** ### Create a file
- Configurable file size with human-readable units (GB, TB, MB, etc.)
- Adjustable chunk size for write optimization
- Disk space validation before writing
- Real-time progress reporting with speed metrics
- Prevents accidental file overwrites
- Graceful interrupt handling with cleanup
- Quiet mode for scripting
**Usage:**
```bash
python make_big_file.py <output> <size> [options]
```
**Arguments:**
- `output` - Output file path
- `size` - File size (e.g., 15GB, 1.5TB, 500MB)
**Options:**
- `--chunk-size <size>` - Chunk size for writing (default: 64MB)
- `--quiet, -q` - Suppress progress output
- `--version` - Show version information
- `--help, -h` - Show help message
**Examples:**
```bash
# Create a 15GB file
python make_big_file.py output.bin 15GB
# Create a 1.5TB file with 128MB chunks
python make_big_file.py bigfile.dat 1.5TB --chunk-size 128MB
# Create a 500MB file quietly
python make_big_file.py test.bin 500MB --quiet
```
### `read_big_file.py` - File Reader & Benchmark
Reads large files and measures I/O performance, optionally computing checksums.
**Features:**
- Configurable chunk size for read optimization
- Real-time progress reporting with speed metrics
- SHA256 hash computation option
- File validation before reading
- Quiet mode for scripting
- Graceful interrupt handling
**Usage:**
```bash
python read_big_file.py <input> [options]
```
**Arguments:**
- `input` - Input file path to read
**Options:**
- `--chunk-size <size>` - Chunk size for reading (default: 64MB)
- `--hash` - Compute SHA256 hash of the file
- `--quiet, -q` - Suppress progress output
- `--version` - Show version information
- `--help, -h` - Show help message
**Examples:**
```bash
# Read a large file
python read_big_file.py largefile.bin
# Read with 128MB chunks and compute hash
python read_big_file.py test.dat --chunk-size 128MB --hash
# Read quietly and compute hash
python read_big_file.py data.bin --hash --quiet
```
## Installation
No external dependencies required. Works with Python 3.6+.
```bash ```bash
# Clone or download the scripts python make_big_file.py <output> <size> [--chunk-size SIZE] [--sparse] [--quiet]
git clone <repository-url>
cd bigfilegen
# Make scripts executable (optional, Unix/Linux/Mac)
chmod +x make_big_file.py read_big_file.py
``` ```
Examples:
```bash
python make_big_file.py test.bin 15GB
python make_big_file.py dump.dat 1.5TiB --chunk-size 128MB
python make_big_file.py tiny.bin 500MB --quiet
python make_big_file.py sparse.img 20GB --sparse
```
### Read a file
```bash
python read_big_file.py <input> [--chunk-size SIZE] [--hash] [--quiet]
```
Examples:
```bash
python read_big_file.py test.bin
python read_big_file.py dump.dat --chunk-size 128MB --hash
python read_big_file.py tiny.bin --quiet
```
## Size formats
Binary units are supported:
- `B`
- `KB`, `MB`, `GB`, `TB`, `PB`
- `KiB`, `MiB`, `GiB`, `TiB`, `PiB`
Plain numbers are treated as bytes.
## Exit codes
- `0` success
- `1` failure
- `130` interrupted
## Requirements ## Requirements
- Python 3.6 or higher - Python 3.8+
- Sufficient disk space for file creation - enough disk space for real writes
- Read/write permissions in target directories
## Performance Tips ## Notes
### Chunk Size Optimization - `--sparse` is handy when you want a huge file without actually burning the disk.
- **SSDs**: Use larger chunks (64-128MB) for better performance - `--hash` is SHA256, because anything weaker would be cosplay.
- **HDDs**: Use moderate chunks (32-64MB) to balance speed and memory
- **Network drives**: Experiment with different sizes based on network speed
### File System Considerations
- **NTFS** (Windows): Supports files up to 16 EiB
- **exFAT**: Good for large files on external drives
- **ext4** (Linux): Supports files up to 16 TiB
- **APFS/HFS+** (macOS): Supports very large files
## Use Cases
- **Performance Testing**: Benchmark disk I/O speeds
- **Storage Validation**: Verify storage capacity and integrity
- **Transfer Testing**: Test file transfer mechanisms and speeds
- **Application Testing**: Test applications with large file handling
- **Disk Burn-in**: Stress test new storage devices
## Output Examples
### Creating a file:
```
Creating file: test.bin
Target size: 15.00 GiB
Chunk size: 64.00 MiB
Progress: 5% (768.00 MiB written)
Written: 1.50 GiB, Speed: 1.23 GiB/s
Progress: 10% (1.50 GiB written)
...
✓ Successfully created test.bin (15.00 GiB)
Time taken: 12.34 seconds
Average speed: 1.22 GiB/s
```
### Reading a file:
```
Reading file: test.bin
File size: 15.00 GiB
Chunk size: 64.00 MiB
Progress: 5% (768.00 MiB read)
Read: 1.50 GiB, Speed: 1.45 GiB/s
Progress: 10% (1.50 GiB read)
...
✓ Successfully read 15.00 GiB
Time taken: 10.12 seconds
Average speed: 1.48 GiB/s
SHA256: a3d5c... (if --hash was used)
```
## Error Handling
Both tools include comprehensive error handling:
- File existence checks
- Disk space validation
- Permission verification
- Interrupt handling (Ctrl+C)
- Automatic cleanup on errors
## Exit Codes
- `0` - Success
- `1` - General error (file not found, permission denied, etc.)
- `130` - Interrupted by user (Ctrl+C)
## License
MIT License - Feel free to use and modify as needed.
## Contributing
Contributions welcome! Feel free to submit issues or pull requests.
## Author
Created for testing and benchmarking large file operations.

319
big_file_gen.py Executable file
View File

@@ -0,0 +1,319 @@
#!/usr/bin/env python3
"""Shared logic for generating and reading large files."""
from __future__ import annotations
import argparse
import hashlib
import os
import re
import shutil
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional
VERSION = "2.0.0"
MAX_CHUNK_SIZE = 1024 ** 3
_PROGRESS_STEP = 5
_SIZE_RE = re.compile(r"^\s*(?P<value>\d+(?:\.\d+)?)\s*(?P<unit>[A-Za-z]*)\s*$")
_BINARY_UNITS = {
"": 1,
"B": 1,
"K": 1024,
"KB": 1024,
"KIB": 1024,
"M": 1024 ** 2,
"MB": 1024 ** 2,
"MIB": 1024 ** 2,
"G": 1024 ** 3,
"GB": 1024 ** 3,
"GIB": 1024 ** 3,
"T": 1024 ** 4,
"TB": 1024 ** 4,
"TIB": 1024 ** 4,
"P": 1024 ** 5,
"PB": 1024 ** 5,
"PIB": 1024 ** 5,
}
@dataclass(frozen=True)
class ProgressState:
total_bytes: int
last_reported_percent: int = -1
last_log_time: float = 0.0
last_log_bytes: int = 0
def parse_size(value: str) -> int:
match = _SIZE_RE.match(value)
if not match:
raise ValueError(f"invalid size: {value!r}")
amount = float(match.group("value"))
unit = match.group("unit").upper()
if unit not in _BINARY_UNITS:
raise ValueError(f"unknown size unit: {unit or 'bytes'}")
result = int(amount * _BINARY_UNITS[unit])
if result <= 0:
raise ValueError("size must be greater than zero")
return result
def format_bytes(value: int) -> str:
units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
size = float(value)
for unit in units:
if size < 1024.0 or unit == units[-1]:
return f"{size:.2f} {unit}"
size /= 1024.0
return f"{size:.2f} PiB"
def _disk_free_bytes(path: Path) -> int:
usage = shutil.disk_usage(path)
return usage.free
def _ensure_parent_dir(path: Path) -> None:
if path.parent and not path.parent.exists():
raise FileNotFoundError(f"directory does not exist: {path.parent}")
def _write_progress(prefix: str, current: int, total: int) -> None:
percent = min(100, int(current * 100 / total)) if total else 100
print(f"{prefix}: {percent}% ({format_bytes(current)} of {format_bytes(total)})")
def _maybe_log_throughput(prefix: str, bytes_done: int, started_at: float, state: ProgressState) -> ProgressState:
now = time.time()
if state.last_log_time == 0.0:
return ProgressState(state.total_bytes, state.last_reported_percent, now, bytes_done)
elapsed = now - state.last_log_time
if elapsed < 1.0:
return state
delta = bytes_done - state.last_log_bytes
speed = delta / elapsed if elapsed > 0 else 0
print(f"{prefix}: {format_bytes(bytes_done)} at {format_bytes(int(speed))}/s")
return ProgressState(state.total_bytes, state.last_reported_percent, now, bytes_done)
def create_file(output: str | Path, total_bytes: int, chunk_size: int, quiet: bool = False, sparse: bool = False) -> int:
output_path = Path(output)
if output_path.exists():
print(f"Error: file already exists: {output_path}", file=sys.stderr)
return 1
_ensure_parent_dir(output_path)
if chunk_size <= 0 or chunk_size > MAX_CHUNK_SIZE:
print("Error: chunk size must be between 1 byte and 1 GiB", file=sys.stderr)
return 1
free_bytes = _disk_free_bytes(output_path.parent if output_path.parent != Path("") else Path("."))
if not sparse and free_bytes < total_bytes:
print(
f"Error: not enough disk space, need {format_bytes(total_bytes)}, have {format_bytes(free_bytes)}",
file=sys.stderr,
)
return 1
if not quiet:
print(f"Creating: {output_path}")
print(f"Size: {format_bytes(total_bytes)}")
print(f"Chunk: {format_bytes(chunk_size)}")
if sparse:
print("Mode: sparse")
print()
started_at = time.time()
progress = ProgressState(total_bytes=total_bytes, last_log_time=started_at, last_log_bytes=0)
buffer = b"\x00" * chunk_size
try:
with output_path.open("wb") as handle:
if sparse:
handle.truncate(total_bytes)
written = total_bytes
else:
written = 0
while written < total_bytes:
step = min(chunk_size, total_bytes - written)
handle.write(buffer[:step])
written += step
if quiet:
continue
percent = int(written * 100 / total_bytes)
if percent >= progress.last_reported_percent + _PROGRESS_STEP:
_write_progress("Progress", written, total_bytes)
progress = ProgressState(total_bytes, percent, progress.last_log_time, progress.last_log_bytes)
progress = _maybe_log_throughput("Speed", written, started_at, progress)
elapsed = time.time() - started_at
print(f"Done, wrote {format_bytes(written)} to {output_path}")
print(f"Elapsed: {elapsed:.2f}s")
if elapsed > 0:
print(f"Average: {format_bytes(int(written / elapsed))}/s")
return 0
except KeyboardInterrupt:
print("Interrupted, cleaning up partial file", file=sys.stderr)
try:
if output_path.exists():
output_path.unlink()
except OSError:
pass
return 130
except OSError as exc:
print(f"Error writing file: {exc}", file=sys.stderr)
try:
if output_path.exists():
output_path.unlink()
except OSError:
pass
return 1
def read_file(input_path: str | Path, chunk_size: int, compute_hash: bool = False, quiet: bool = False) -> int:
path = Path(input_path)
if not path.exists():
print(f"Error: file not found: {path}", file=sys.stderr)
return 1
if not path.is_file():
print(f"Error: not a file: {path}", file=sys.stderr)
return 1
if chunk_size <= 0 or chunk_size > MAX_CHUNK_SIZE:
print("Error: chunk size must be between 1 byte and 1 GiB", file=sys.stderr)
return 1
try:
total_bytes = path.stat().st_size
except OSError as exc:
print(f"Error reading file metadata: {exc}", file=sys.stderr)
return 1
if not quiet:
print(f"Reading: {path}")
print(f"Size: {format_bytes(total_bytes)}")
print(f"Chunk: {format_bytes(chunk_size)}")
if compute_hash:
print("Hash: sha256")
print()
started_at = time.time()
progress = ProgressState(total_bytes=total_bytes, last_log_time=started_at, last_log_bytes=0)
hasher = hashlib.sha256() if compute_hash else None
bytes_read = 0
try:
with path.open("rb") as handle:
while True:
chunk = handle.read(chunk_size)
if not chunk:
break
bytes_read += len(chunk)
if hasher is not None:
hasher.update(chunk)
if quiet:
continue
percent = int(bytes_read * 100 / total_bytes) if total_bytes else 100
if percent >= progress.last_reported_percent + _PROGRESS_STEP:
_write_progress("Progress", bytes_read, total_bytes)
progress = ProgressState(total_bytes, percent, progress.last_log_time, progress.last_log_bytes)
progress = _maybe_log_throughput("Speed", bytes_read, started_at, progress)
elapsed = time.time() - started_at
print(f"Done, read {format_bytes(bytes_read)} from {path}")
print(f"Elapsed: {elapsed:.2f}s")
if elapsed > 0:
print(f"Average: {format_bytes(int(bytes_read / elapsed))}/s")
if hasher is not None:
print(f"SHA256: {hasher.hexdigest()}")
return 0
except KeyboardInterrupt:
print("Interrupted", file=sys.stderr)
return 130
except OSError as exc:
print(f"Error reading file: {exc}", file=sys.stderr)
return 1
def build_create_parser(prog: str) -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog=prog,
description="Create large binary files for storage and transfer testing.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
f" {prog} output.bin 15GB\n"
f" {prog} dump.dat 1.5TB --chunk-size 128MB\n"
f" {prog} test.bin 500MB --quiet"
),
)
parser.add_argument("output", help="Path to the file to create")
parser.add_argument("size", help="Target size, for example 15GB or 1.5TiB")
parser.add_argument("--chunk-size", default="64MB", help="Write chunk size (default: 64MB)")
parser.add_argument("--sparse", action="store_true", help="Create a sparse file instead of writing zeros")
parser.add_argument("--quiet", "-q", action="store_true", help="Suppress progress output")
parser.add_argument("--version", action="version", version=f"{prog} {VERSION}")
return parser
def build_read_parser(prog: str) -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog=prog,
description="Read large files and benchmark I/O throughput.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
f" {prog} largefile.bin\n"
f" {prog} test.dat --chunk-size 128MB --hash\n"
f" {prog} data.bin --quiet"
),
)
parser.add_argument("input", help="Path to the file to read")
parser.add_argument("--chunk-size", default="64MB", help="Read chunk size (default: 64MB)")
parser.add_argument("--hash", action="store_true", help="Compute SHA256 while reading")
parser.add_argument("--quiet", "-q", action="store_true", help="Suppress progress output")
parser.add_argument("--version", action="version", version=f"{prog} {VERSION}")
return parser
def create_main(argv: Optional[Iterable[str]] = None) -> int:
parser = build_create_parser("make_big_file.py")
args = parser.parse_args(list(argv) if argv is not None else None)
try:
total_bytes = parse_size(args.size)
chunk_size = parse_size(args.chunk_size)
except ValueError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
return create_file(args.output, total_bytes, chunk_size, args.quiet, args.sparse)
def read_main(argv: Optional[Iterable[str]] = None) -> int:
parser = build_read_parser("read_big_file.py")
args = parser.parse_args(list(argv) if argv is not None else None)
try:
chunk_size = parse_size(args.chunk_size)
except ValueError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
return read_file(args.input, chunk_size, args.hash, args.quiet)
if __name__ == "__main__":
raise SystemExit(create_main())

216
make_big_file.py Normal file → Executable file
View File

@@ -1,218 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Generate large binary files for testing purposes.""" """CLI wrapper for big file creation."""
import argparse from big_file_gen import create_main
import os
import sys
import time
def parse_size(size_str):
"""Parse size string like '15GB', '1.5TB', '500MB' to bytes."""
size_str = size_str.upper().strip()
units = {
'B': 1,
'KB': 1024,
'MB': 1024**2,
'GB': 1024**3,
'TB': 1024**4,
'KIB': 1024,
'MIB': 1024**2,
'GIB': 1024**3,
'TIB': 1024**4,
}
for unit, multiplier in units.items():
if size_str.endswith(unit):
try:
value = float(size_str[:-len(unit)])
return int(value * multiplier)
except ValueError:
raise ValueError(f"Invalid size format: {size_str}")
# Try parsing as raw bytes
try:
return int(size_str)
except ValueError:
raise ValueError(f"Invalid size format: {size_str}. Use format like '15GB', '1.5TB', '500MB'")
def format_bytes(bytes_val):
"""Format bytes to human-readable string."""
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB']:
if bytes_val < 1024.0:
return f"{bytes_val:.2f} {unit}"
bytes_val /= 1024.0
return f"{bytes_val:.2f} PiB"
def create_file(output_path, total_bytes, chunk_size, quiet=False):
"""Create a file filled with zeros."""
# Check if file already exists
if os.path.exists(output_path):
print(f"Error: File '{output_path}' already exists", file=sys.stderr)
return 1
# Check if directory exists
output_dir = os.path.dirname(output_path)
if output_dir and not os.path.exists(output_dir):
print(f"Error: Directory '{output_dir}' does not exist", file=sys.stderr)
return 1
# Check available disk space
try:
if hasattr(os, 'statvfs'): # Unix
stat = os.statvfs(output_dir or '.')
free_space = stat.f_bavail * stat.f_frsize
else: # Windows
import ctypes
free_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(
ctypes.c_wchar_p(output_dir or '.'),
None, None,
ctypes.pointer(free_bytes)
)
free_space = free_bytes.value
if free_space < total_bytes:
print(f"Error: Insufficient disk space. Required: {format_bytes(total_bytes)}, Available: {format_bytes(free_space)}", file=sys.stderr)
return 1
except Exception as e:
print(f"Warning: Could not check disk space: {e}", file=sys.stderr)
chunk = b"\x00" * chunk_size
if not quiet:
print(f"Creating file: {output_path}")
print(f"Target size: {format_bytes(total_bytes)}")
print(f"Chunk size: {format_bytes(chunk_size)}")
print()
start_time = time.time()
last_gb_log_time = start_time
last_gb_written = 0
try:
with open(output_path, "wb") as f:
written = 0
last_reported_percent = -1
while written + chunk_size <= total_bytes:
f.write(chunk)
written += chunk_size
if not quiet:
percent = int((written / total_bytes) * 100)
if percent != last_reported_percent and percent % 5 == 0:
print(f"Progress: {percent}% ({format_bytes(written)} written)")
last_reported_percent = percent
# Per second GB log
now = time.time()
if now - last_gb_log_time >= 1.0:
gb_written = written / (1024**3)
gb_per_sec = (written - last_gb_written) / (1024**3) / (now - last_gb_log_time)
print(f"Written: {gb_written:.2f} GiB, Speed: {gb_per_sec:.2f} GiB/s")
last_gb_log_time = now
last_gb_written = written
# Write leftover
leftover = total_bytes - written
if leftover:
f.write(b"\x00" * leftover)
written += leftover
end_time = time.time()
elapsed = end_time - start_time
if not quiet:
print()
print(f"✓ Successfully created {output_path} ({format_bytes(written)})")
if elapsed > 0:
print(f"Time taken: {elapsed:.2f} seconds")
print(f"Average speed: {format_bytes(written / elapsed)}/s")
return 0
except KeyboardInterrupt:
print("\n\nInterrupted by user", file=sys.stderr)
# Clean up partial file
if os.path.exists(output_path):
print(f"Cleaning up partial file: {output_path}", file=sys.stderr)
try:
os.remove(output_path)
except Exception as e:
print(f"Warning: Could not remove partial file: {e}", file=sys.stderr)
return 130
except IOError as e:
print(f"Error writing file: {e}", file=sys.stderr)
# Clean up partial file
if os.path.exists(output_path):
try:
os.remove(output_path)
except Exception:
pass
return 1
def main():
parser = argparse.ArgumentParser(
description='Generate large binary files filled with zeros for testing purposes.',
epilog='Examples:\n'
' %(prog)s output.bin 15GB\n'
' %(prog)s test.dat 1.5TB --chunk-size 128MB\n'
' %(prog)s small.bin 500MB --quiet',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'output',
help='Output file path'
)
parser.add_argument(
'size',
help='File size (e.g., 15GB, 1.5TB, 500MB, 1073741824)'
)
parser.add_argument(
'--chunk-size',
default='64MB',
help='Chunk size for writing (default: 64MB)'
)
parser.add_argument(
'--quiet', '-q',
action='store_true',
help='Suppress progress output'
)
parser.add_argument(
'--version',
action='version',
version='%(prog)s 1.0.0'
)
args = parser.parse_args()
try:
total_bytes = parse_size(args.size)
chunk_size = parse_size(args.chunk_size)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if total_bytes <= 0:
print("Error: Size must be greater than 0", file=sys.stderr)
return 1
if chunk_size <= 0 or chunk_size > 1024**3: # Max 1GB chunk
print("Error: Chunk size must be between 1 byte and 1GB", file=sys.stderr)
return 1
return create_file(args.output, total_bytes, chunk_size, args.quiet)
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(main()) raise SystemExit(create_main())

192
read_big_file.py Normal file → Executable file
View File

@@ -1,194 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Read and benchmark large file I/O performance.""" """CLI wrapper for big file reading and benchmarking."""
import argparse from big_file_gen import read_main
import hashlib
import os
import sys
import time
def parse_size(size_str):
"""Parse size string like '64MB', '128KB' to bytes."""
size_str = size_str.upper().strip()
units = {
'B': 1,
'KB': 1024,
'MB': 1024**2,
'GB': 1024**3,
'TB': 1024**4,
'KIB': 1024,
'MIB': 1024**2,
'GIB': 1024**3,
'TIB': 1024**4,
}
for unit, multiplier in units.items():
if size_str.endswith(unit):
try:
value = float(size_str[:-len(unit)])
return int(value * multiplier)
except ValueError:
raise ValueError(f"Invalid size format: {size_str}")
# Try parsing as raw bytes
try:
return int(size_str)
except ValueError:
raise ValueError(f"Invalid size format: {size_str}. Use format like '64MB', '128KB'")
def format_bytes(bytes_val):
"""Format bytes to human-readable string."""
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB']:
if bytes_val < 1024.0:
return f"{bytes_val:.2f} {unit}"
bytes_val /= 1024.0
return f"{bytes_val:.2f} PiB"
def read_file(input_path, chunk_size, compute_hash=False, quiet=False):
"""Read a file and optionally compute its hash."""
if not os.path.exists(input_path):
print(f"Error: File '{input_path}' does not exist", file=sys.stderr)
return 1
if not os.path.isfile(input_path):
print(f"Error: '{input_path}' is not a file", file=sys.stderr)
return 1
try:
total_bytes = os.path.getsize(input_path)
except OSError as e:
print(f"Error: Cannot get file size: {e}", file=sys.stderr)
return 1
if total_bytes == 0:
print(f"Warning: File is empty", file=sys.stderr)
return 0
if not quiet:
print(f"Reading file: {input_path}")
print(f"File size: {format_bytes(total_bytes)}")
print(f"Chunk size: {format_bytes(chunk_size)}")
if compute_hash:
print(f"Computing: SHA256 hash")
print()
start_time = time.time()
last_gb_log_time = start_time
last_gb_read = 0
hash_obj = hashlib.sha256() if compute_hash else None
try:
with open(input_path, "rb") as f:
bytes_read = 0
last_reported_percent = -1
while True:
chunk = f.read(chunk_size)
if not chunk:
break
bytes_read += len(chunk)
if compute_hash:
hash_obj.update(chunk)
if not quiet:
percent = int((bytes_read / total_bytes) * 100)
if percent != last_reported_percent and percent % 5 == 0:
print(f"Progress: {percent}% ({format_bytes(bytes_read)} read)")
last_reported_percent = percent
# Per second GB log
now = time.time()
if now - last_gb_log_time >= 1.0:
gb_read = bytes_read / (1024**3)
gb_per_sec = (bytes_read - last_gb_read) / (1024**3) / (now - last_gb_log_time)
print(f"Read: {gb_read:.2f} GiB, Speed: {gb_per_sec:.2f} GiB/s")
last_gb_log_time = now
last_gb_read = bytes_read
end_time = time.time()
elapsed = end_time - start_time
if not quiet:
print()
print(f"✓ Successfully read {format_bytes(bytes_read)}")
if elapsed > 0:
print(f"Time taken: {elapsed:.2f} seconds")
print(f"Average speed: {format_bytes(bytes_read / elapsed)}/s")
if compute_hash:
print(f"SHA256: {hash_obj.hexdigest()}")
return 0
except KeyboardInterrupt:
print("\n\nInterrupted by user", file=sys.stderr)
return 130
except IOError as e:
print(f"Error reading file: {e}", file=sys.stderr)
return 1
def main():
parser = argparse.ArgumentParser(
description='Read and benchmark large file I/O performance.',
epilog='Examples:\n'
' %(prog)s largefile.bin\n'
' %(prog)s test.dat --chunk-size 128MB\n'
' %(prog)s data.bin --hash --quiet',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'input',
help='Input file path to read'
)
parser.add_argument(
'--chunk-size',
default='64MB',
help='Chunk size for reading (default: 64MB)'
)
parser.add_argument(
'--hash',
action='store_true',
help='Compute SHA256 hash of the file'
)
parser.add_argument(
'--quiet', '-q',
action='store_true',
help='Suppress progress output'
)
parser.add_argument(
'--version',
action='version',
version='%(prog)s 1.0.0'
)
args = parser.parse_args()
try:
chunk_size = parse_size(args.chunk_size)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if chunk_size <= 0 or chunk_size > 1024**3: # Max 1GB chunk
print("Error: Chunk size must be between 1 byte and 1GB", file=sys.stderr)
return 1
return read_file(args.input, chunk_size, args.hash, args.quiet)
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(main()) raise SystemExit(read_main())