219 lines
6.8 KiB
Python
219 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Generate large binary files for testing purposes."""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
|
|
def parse_size(size_str):
|
|
"""Parse size string like '15GB', '1.5TB', '500MB' to bytes."""
|
|
size_str = size_str.upper().strip()
|
|
units = {
|
|
'B': 1,
|
|
'KB': 1024,
|
|
'MB': 1024**2,
|
|
'GB': 1024**3,
|
|
'TB': 1024**4,
|
|
'KIB': 1024,
|
|
'MIB': 1024**2,
|
|
'GIB': 1024**3,
|
|
'TIB': 1024**4,
|
|
}
|
|
|
|
for unit, multiplier in units.items():
|
|
if size_str.endswith(unit):
|
|
try:
|
|
value = float(size_str[:-len(unit)])
|
|
return int(value * multiplier)
|
|
except ValueError:
|
|
raise ValueError(f"Invalid size format: {size_str}")
|
|
|
|
# Try parsing as raw bytes
|
|
try:
|
|
return int(size_str)
|
|
except ValueError:
|
|
raise ValueError(f"Invalid size format: {size_str}. Use format like '15GB', '1.5TB', '500MB'")
|
|
|
|
|
|
def format_bytes(bytes_val):
|
|
"""Format bytes to human-readable string."""
|
|
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB']:
|
|
if bytes_val < 1024.0:
|
|
return f"{bytes_val:.2f} {unit}"
|
|
bytes_val /= 1024.0
|
|
return f"{bytes_val:.2f} PiB"
|
|
|
|
|
|
def create_file(output_path, total_bytes, chunk_size, quiet=False):
|
|
"""Create a file filled with zeros."""
|
|
# Check if file already exists
|
|
if os.path.exists(output_path):
|
|
print(f"Error: File '{output_path}' already exists", file=sys.stderr)
|
|
return 1
|
|
|
|
# Check if directory exists
|
|
output_dir = os.path.dirname(output_path)
|
|
if output_dir and not os.path.exists(output_dir):
|
|
print(f"Error: Directory '{output_dir}' does not exist", file=sys.stderr)
|
|
return 1
|
|
|
|
# Check available disk space
|
|
try:
|
|
if hasattr(os, 'statvfs'): # Unix
|
|
stat = os.statvfs(output_dir or '.')
|
|
free_space = stat.f_bavail * stat.f_frsize
|
|
else: # Windows
|
|
import ctypes
|
|
free_bytes = ctypes.c_ulonglong(0)
|
|
ctypes.windll.kernel32.GetDiskFreeSpaceExW(
|
|
ctypes.c_wchar_p(output_dir or '.'),
|
|
None, None,
|
|
ctypes.pointer(free_bytes)
|
|
)
|
|
free_space = free_bytes.value
|
|
|
|
if free_space < total_bytes:
|
|
print(f"Error: Insufficient disk space. Required: {format_bytes(total_bytes)}, Available: {format_bytes(free_space)}", file=sys.stderr)
|
|
return 1
|
|
except Exception as e:
|
|
print(f"Warning: Could not check disk space: {e}", file=sys.stderr)
|
|
|
|
chunk = b"\x00" * chunk_size
|
|
|
|
if not quiet:
|
|
print(f"Creating file: {output_path}")
|
|
print(f"Target size: {format_bytes(total_bytes)}")
|
|
print(f"Chunk size: {format_bytes(chunk_size)}")
|
|
print()
|
|
|
|
start_time = time.time()
|
|
last_gb_log_time = start_time
|
|
last_gb_written = 0
|
|
|
|
try:
|
|
with open(output_path, "wb") as f:
|
|
written = 0
|
|
last_reported_percent = -1
|
|
|
|
while written + chunk_size <= total_bytes:
|
|
f.write(chunk)
|
|
written += chunk_size
|
|
|
|
if not quiet:
|
|
percent = int((written / total_bytes) * 100)
|
|
if percent != last_reported_percent and percent % 5 == 0:
|
|
print(f"Progress: {percent}% ({format_bytes(written)} written)")
|
|
last_reported_percent = percent
|
|
|
|
# Per second GB log
|
|
now = time.time()
|
|
if now - last_gb_log_time >= 1.0:
|
|
gb_written = written / (1024**3)
|
|
gb_per_sec = (written - last_gb_written) / (1024**3) / (now - last_gb_log_time)
|
|
print(f"Written: {gb_written:.2f} GiB, Speed: {gb_per_sec:.2f} GiB/s")
|
|
last_gb_log_time = now
|
|
last_gb_written = written
|
|
|
|
# Write leftover
|
|
leftover = total_bytes - written
|
|
if leftover:
|
|
f.write(b"\x00" * leftover)
|
|
written += leftover
|
|
|
|
end_time = time.time()
|
|
elapsed = end_time - start_time
|
|
|
|
if not quiet:
|
|
print()
|
|
print(f"✓ Successfully created {output_path} ({format_bytes(written)})")
|
|
if elapsed > 0:
|
|
print(f"Time taken: {elapsed:.2f} seconds")
|
|
print(f"Average speed: {format_bytes(written / elapsed)}/s")
|
|
|
|
return 0
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\nInterrupted by user", file=sys.stderr)
|
|
# Clean up partial file
|
|
if os.path.exists(output_path):
|
|
print(f"Cleaning up partial file: {output_path}", file=sys.stderr)
|
|
try:
|
|
os.remove(output_path)
|
|
except Exception as e:
|
|
print(f"Warning: Could not remove partial file: {e}", file=sys.stderr)
|
|
return 130
|
|
|
|
except IOError as e:
|
|
print(f"Error writing file: {e}", file=sys.stderr)
|
|
# Clean up partial file
|
|
if os.path.exists(output_path):
|
|
try:
|
|
os.remove(output_path)
|
|
except Exception:
|
|
pass
|
|
return 1
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Generate large binary files filled with zeros for testing purposes.',
|
|
epilog='Examples:\n'
|
|
' %(prog)s output.bin 15GB\n'
|
|
' %(prog)s test.dat 1.5TB --chunk-size 128MB\n'
|
|
' %(prog)s small.bin 500MB --quiet',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter
|
|
)
|
|
|
|
parser.add_argument(
|
|
'output',
|
|
help='Output file path'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'size',
|
|
help='File size (e.g., 15GB, 1.5TB, 500MB, 1073741824)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--chunk-size',
|
|
default='64MB',
|
|
help='Chunk size for writing (default: 64MB)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--quiet', '-q',
|
|
action='store_true',
|
|
help='Suppress progress output'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--version',
|
|
action='version',
|
|
version='%(prog)s 1.0.0'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
total_bytes = parse_size(args.size)
|
|
chunk_size = parse_size(args.chunk_size)
|
|
except ValueError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
return 1
|
|
|
|
if total_bytes <= 0:
|
|
print("Error: Size must be greater than 0", file=sys.stderr)
|
|
return 1
|
|
|
|
if chunk_size <= 0 or chunk_size > 1024**3: # Max 1GB chunk
|
|
print("Error: Chunk size must be between 1 byte and 1GB", file=sys.stderr)
|
|
return 1
|
|
|
|
return create_file(args.output, total_bytes, chunk_size, args.quiet)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|