Files
instant-replay/main.py
2026-01-16 21:55:05 +01:00

814 lines
29 KiB
Python

# Imports
import os
import time
import threading
import shutil
from obswebsocket import obsws
from obswebsocket import requests as rq
import websockets
import asyncio
import json
import ctypes
import subprocess
from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler
# Import live display handler (user can customize livedisplay.py)
try:
from livedisplay import handle_update as live_display_handler
LIVE_DISPLAY_AVAILABLE = True
except ImportError:
print("Warning: livedisplay.py not found. Live display feature disabled.")
LIVE_DISPLAY_AVAILABLE = False
live_display_handler = None
# Try to import psutil early for better performance
try:
import psutil
PSUTIL_AVAILABLE = True
except ImportError:
PSUTIL_AVAILABLE = False
psutil = None
def load_config():
"""Load configuration from config.json with fallback to defaults."""
default_config = {
"directories": {
"watch_dir": r"E:\OBS",
"moveto_dir": "./replays"
},
"server": {
"port": 8000,
"websocket_port": 8001
},
"replay": {
"length_seconds": 20,
"default_location": "center",
"default_size": 5
},
"voice_recognition": {
"enabled": False,
"sample_rate": 16000,
"duration": 4,
"calibration_duration": 2
},
"live_display": {
"enabled": False,
"interval": 0.5
},
"commands": {
"activation": ["instant replay", "save replay", "capture replay", "replay that"],
"stop": ["stop replay", "stop video", "hide replay"],
"replay_last": ["replay last", "show last replay", "play last", "replay it again"],
"clear_replays": ["clear replays", "delete all replays", "nuke replays", "remove all clips"]
},
"hotkey": {
"enabled": True,
"key": "pagedown"
}
}
config_path = "config.json"
if os.path.exists(config_path):
try:
with open(config_path, "r") as f:
user_config = json.load(f)
# Merge user config with defaults (deep merge)
def deep_merge(default, user):
result = default.copy()
for key, value in user.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = deep_merge(result[key], value)
else:
result[key] = value
return result
config = deep_merge(default_config, user_config)
print(f"Configuration loaded from {config_path}")
return config
except Exception as e:
print(f"Error loading config.json: {e}. Using default configuration.")
return default_config
else:
print(f"No {config_path} found. Using default configuration.")
print(f"Tip: Copy config.example.json to config.json to customize settings.")
return default_config
# Load configuration
config = load_config()
def is_obs_running():
"""Check if OBS Studio is running."""
if not PSUTIL_AVAILABLE:
return True # Assume running if cannot check
try:
for proc in psutil.process_iter(["name"]):
if proc.info["name"] and "obs64.exe" in proc.info["name"].lower():
return True
return False
except Exception:
return True # Assume running if check fails
def set_process_priority():
"""Elevate process priority on Windows to reduce missed events under heavy load."""
if os.name != "nt":
return
try:
# Try using psutil when available for clarity.
import psutil # type: ignore
process = psutil.Process(os.getpid())
process.nice(psutil.HIGH_PRIORITY_CLASS)
print("Process priority elevated to HIGH_PRIORITY_CLASS")
return
except ImportError:
print("psutil not installed; falling back to ctypes priority raise")
except Exception as exc:
print(f"Failed to elevate priority with psutil: {exc}; falling back to ctypes")
try:
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]
handle = kernel32.GetCurrentProcess()
HIGH_PRIORITY_CLASS = 0x00000080
if not kernel32.SetPriorityClass(handle, HIGH_PRIORITY_CLASS):
raise OSError("SetPriorityClass returned 0")
print("Process priority elevated via ctypes SetPriorityClass")
except Exception as exc:
print(f"Unable to adjust process priority: {exc}")
# Extract configuration values
WATCH_DIR = config["directories"]["watch_dir"]
MOVETO_DIR = os.path.abspath(config["directories"]["moveto_dir"])
SERVER_PORT = config["server"]["port"]
WEBSOCKET_PORT = config["server"]["websocket_port"]
REPLAY_LENGTH_SECONDS = config["replay"]["length_seconds"]
DEFAULT_LOCATION = config["replay"]["default_location"]
DEFAULT_SIZE = config["replay"]["default_size"]
VOICE_ENABLED = config["voice_recognition"]["enabled"]
SAMPLE_RATE = config["voice_recognition"]["sample_rate"]
DURATION = config["voice_recognition"]["duration"]
CALIBRATION_DURATION = config["voice_recognition"]["calibration_duration"]
LIVE_DISPLAY_ENABLED = config["live_display"]["enabled"]
LIVE_DISPLAY_INTERVAL = config["live_display"]["interval"]
ACTIVATION_COMMANDS = config["commands"]["activation"]
STOP_COMMANDS = config["commands"]["stop"]
REPLAY_LAST_COMMANDS = config["commands"]["replay_last"]
CLEAR_REPLAYS_COMMANDS = config["commands"]["clear_replays"]
HOTKEY = config["hotkey"]
# Replay state tracking
is_replaying = threading.Event()
last_replay_filename = None
active_replay_threads = {} # Track active live display threads by filename
# Ensure the move-to directory exists
os.makedirs(MOVETO_DIR, exist_ok=True)
# Check if OBS is running before connecting
if not is_obs_running():
print("ERROR: OBS Studio is not running. Please start OBS and try again.")
exit(1)
# Setup OBS
ws = obsws("localhost", 4455, "")
ws.connect()
# Global WebSocket connections
websocket_clients = set()
async def websocket_handler(websocket, path):
"""Handle WebSocket connections from Electron app"""
websocket_clients.add(websocket)
print(f"Electron client connected. Total clients: {len(websocket_clients)}")
print("Electron WebSocket connection established!")
try:
await websocket.wait_closed()
finally:
websocket_clients.remove(websocket)
print(f"Electron client disconnected. Total clients: {len(websocket_clients)}")
async def send_video_url(filename, location=DEFAULT_LOCATION, size=DEFAULT_SIZE, duration=None):
"""Send video URL and display settings to connected Electron clients"""
if websocket_clients:
video_url = f"http://localhost:{SERVER_PORT}/{filename}"
message = json.dumps(
{
"type": "play_video",
"url": video_url,
"filename": filename,
"location": location,
"size": size,
"duration": duration, # provided by backend
}
)
# Send to all connected clients
disconnected = set()
for client in websocket_clients:
try:
await client.send(message)
print(f"Sent video URL to Electron: {video_url} (duration={duration})")
except websockets.exceptions.ConnectionClosed:
disconnected.add(client)
# Clean up disconnected clients
websocket_clients.difference_update(disconnected)
async def send_stop_command():
"""Send stop command to connected Electron clients"""
# Stop all active live display threads
for stop_event in list(active_replay_threads.values()):
stop_event.set()
if websocket_clients:
message = json.dumps({"type": "stop_video"})
# Send to all connected clients
disconnected = set()
for client in websocket_clients:
try:
await client.send(message)
print("Sent stop command to Electron")
except websockets.exceptions.ConnectionClosed:
disconnected.add(client)
# Clean up disconnected clients
websocket_clients.difference_update(disconnected)
async def send_status_update(status):
"""Send status update to Electron clients for HUD loading screen."""
if websocket_clients:
message = json.dumps({"type": "status_update", "status": status})
disconnected = set()
for client in websocket_clients:
try:
await client.send(message)
except websockets.exceptions.ConnectionClosed:
disconnected.add(client)
websocket_clients.difference_update(disconnected)
def get_media_duration(filepath):
"""Return media duration in seconds (float) using ffprobe, or None on failure."""
try:
result = subprocess.run(
[
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
filepath,
],
capture_output=True,
text=True,
check=True,
)
dur = float(result.stdout.strip())
return round(dur, 3)
except Exception as e:
print(f"Error getting duration for {filepath}: {e}")
return None
def live_display_update_thread(filename, duration):
"""Thread that periodically calls the live display handler with current replay state."""
if not LIVE_DISPLAY_ENABLED or not LIVE_DISPLAY_AVAILABLE or live_display_handler is None:
return
start_time = time.time()
stop_event = active_replay_threads.get(filename)
next_update_time = start_time # Track when next update should happen
try:
while not stop_event.is_set():
current_time = time.time()
elapsed = current_time - start_time
# Stop if we've exceeded the duration
if duration and elapsed >= duration:
break
# Only send update if we've reached the next scheduled update time
if current_time >= next_update_time:
# Call the handler with current state
try:
live_display_handler("playing", elapsed)
except Exception as handler_error:
# Don't let handler errors stop the update loop
print(f"Error in live_display_handler: {handler_error}")
# Schedule next update at a fixed interval from start, not from now
# This prevents drift from slow handler execution
next_update_time += LIVE_DISPLAY_INTERVAL
# If we're falling behind, catch up to current time
if next_update_time < current_time:
next_update_time = current_time + LIVE_DISPLAY_INTERVAL
# Sleep for a short time or until stopped (use smaller sleep to reduce latency)
sleep_time = min(0.01, max(0.001, next_update_time - time.time()))
if sleep_time > 0:
stop_event.wait(timeout=sleep_time)
# Send final "stopped" update
final_timestamp = time.time() - start_time
try:
live_display_handler("stopped", final_timestamp)
except Exception as handler_error:
print(f"Error in live_display_handler (stopped): {handler_error}")
except Exception as e:
print(f"Error in live display update thread: {e}")
finally:
# Clean up the thread tracking
if filename in active_replay_threads:
del active_replay_threads[filename]
def show_replay(filename, location=DEFAULT_LOCATION, size=DEFAULT_SIZE):
global last_replay_filename
filepath = os.path.join(MOVETO_DIR, filename)
if os.path.exists(filepath):
# Ensure file is ready before sending to Electron (faster check)
if not wait_for_file_ready(filepath, timeout=5, poll_interval=0.1):
print(f"Replay file not ready for playback: {filepath}")
log_file_locks(filepath)
return
# Compute duration once, send to Electron
duration = get_media_duration(filepath)
# Prepare live display BEFORE sending video to Electron for perfect sync
if LIVE_DISPLAY_ENABLED and LIVE_DISPLAY_AVAILABLE:
# Stop any existing live display thread for this filename
if filename in active_replay_threads:
active_replay_threads[filename].set()
# Create a new stop event for this replay
stop_event = threading.Event()
active_replay_threads[filename] = stop_event
# Send initial "playing" state at timestamp 0 IMMEDIATELY
try:
live_display_handler("playing", 0.0)
except Exception as handler_error:
print(f"Error in initial live_display_handler call: {handler_error}")
# Start the live display update thread
threading.Thread(
target=live_display_update_thread,
args=(filename, duration),
daemon=True
).start()
# Now send video to Electron (live display already started)
asyncio.run_coroutine_threadsafe(
send_video_url(filename, location, size, duration), websocket_loop
)
last_replay_filename = filename
print(f"Sent replay to Electron: {filepath} (location={location}, size={size}, duration={duration})")
is_replaying.set()
threading.Thread(target=auto_stop_replay, args=(REPLAY_LENGTH_SECONDS,), daemon=True).start()
else:
print(f"Replay file not found: {filepath}")
def auto_stop_replay(duration):
"""Automatically stop replay after duration if not stopped manually."""
time.sleep(duration)
# Only auto-stop if replaying is still active (not stopped manually)
if is_replaying.is_set():
print(f"Auto-stopping replay after {duration} seconds.")
# Stop all active live display threads
for stop_event in list(active_replay_threads.values()):
stop_event.set()
asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop)
is_replaying.clear()
def delete_old_files(directory, max_age_seconds=3 * 60 * 60):
"""Delete files older than max_age_seconds in the given directory."""
now = time.time()
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath):
file_age = now - os.path.getmtime(filepath)
if file_age > max_age_seconds:
try:
os.remove(filepath)
print(f"Deleted old replay: {filepath}")
except Exception as e:
print(f"Error deleting {filepath}: {e}")
def clear_all_replays(directory):
"""Delete all files in the given directory."""
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath):
try:
os.remove(filepath)
print(f"Deleted replay: {filepath}")
except Exception as e:
print(f"Error deleting {filepath}: {e}")
def trim_replay_with_ffmpeg(filepath, length_seconds):
"""Trim the replay file to the specified length using ffmpeg, only if needed."""
import subprocess
# Get video duration using ffprobe (single call)
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
filepath,
],
capture_output=True,
text=True,
check=True,
)
duration = float(result.stdout.strip())
except Exception as e:
print(f"Error getting duration for {filepath}: {e}")
return # Exit early if we can't get duration
if duration <= length_seconds + 0.1:
print(
f"Skipping trim: video duration ({duration:.2f}s) <= target ({length_seconds}s)"
)
return
temp_filepath = filepath + ".trimmed.mp4"
try:
# Calculate start time to keep the last X seconds
start_time = max(0, duration - length_seconds)
# ffmpeg command: cut from start_time to end
subprocess.run(
[
"ffmpeg",
"-y",
"-loglevel",
"error",
"-ss",
str(start_time),
"-i",
filepath,
"-t",
str(length_seconds),
"-c",
"copy",
"-movflags",
"+faststart",
temp_filepath,
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Replace original file with trimmed file
os.replace(temp_filepath, filepath)
print(f"Trimmed replay to last {length_seconds}s: {filepath}")
except Exception as e:
print(f"Error trimming replay {filepath}: {e}")
def log_file_locks(filepath):
"""Log processes that are using the given file (Windows only)."""
if not PSUTIL_AVAILABLE:
print("psutil not available; cannot check file locks.")
return
try:
locked_by = []
for proc in psutil.process_iter(["pid", "name"]):
try:
for item in proc.open_files():
if os.path.normcase(item.path) == os.path.normcase(filepath):
locked_by.append(f"PID {proc.pid} ({proc.name()})")
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if locked_by:
print(f"File '{filepath}' is locked by: {', '.join(locked_by)}")
else:
print(f"File '{filepath}' is not locked by any process.")
except Exception as e:
print(f"Error checking file locks: {e}")
def wait_for_file_ready(filepath, timeout=10, poll_interval=0.1):
"""Wait for a file to stop growing and release any locks."""
deadline = time.time() + timeout
last_size = -1
stable_reads = 0
while time.time() < deadline:
try:
current_size = os.path.getsize(filepath)
with open(filepath, "rb"):
pass
except (FileNotFoundError, PermissionError, OSError):
time.sleep(poll_interval)
continue
if current_size == last_size:
stable_reads += 1
if stable_reads >= 2:
return True
else:
last_size = current_size
stable_reads = 0
time.sleep(poll_interval)
print(f"Timed out waiting for file to be ready: {filepath}")
return False
def monitor_directory(path):
print(f"Monitoring directory: {path}")
already_seen = set(os.listdir(path))
cleanup_counter = 0
while True:
time.sleep(0.5) # Check every 0.5 seconds for faster detection
current_files = set(os.listdir(path))
new_files = current_files - already_seen
# Delete old files in MOVETO_DIR only every 60 cycles (30 seconds)
cleanup_counter += 1
if cleanup_counter >= 60:
delete_old_files(MOVETO_DIR)
cleanup_counter = 0
for new_file in new_files:
print(f"New file detected: {new_file}")
source_path = os.path.join(path, new_file)
dest_path = os.path.join(MOVETO_DIR, new_file)
try:
# Status: waiting for file
asyncio.run_coroutine_threadsafe(
send_status_update("Waiting for file"), websocket_loop
)
if not wait_for_file_ready(source_path):
print(f"Skipping file that never became accessible: {source_path}")
continue
# Status: moving file
asyncio.run_coroutine_threadsafe(
send_status_update("Moving file"), websocket_loop
)
shutil.move(source_path, dest_path)
print(f"Moved file to: {dest_path}")
# Status: waiting for moved file
asyncio.run_coroutine_threadsafe(
send_status_update("Waiting for moved file"), websocket_loop
)
if not wait_for_file_ready(dest_path):
print(
f"Skipping replay because destination file stayed locked: {dest_path}"
)
continue
# Status: cutting replay
asyncio.run_coroutine_threadsafe(
send_status_update("Cutting replay"), websocket_loop
)
trim_replay_with_ffmpeg(dest_path, REPLAY_LENGTH_SECONDS)
# Status: finalizing
asyncio.run_coroutine_threadsafe(
send_status_update("Finalizing"), websocket_loop
)
time.sleep(0.5)
# Status: ready to play
asyncio.run_coroutine_threadsafe(
send_status_update("Ready to play"), websocket_loop
)
# trigger playback
show_replay(new_file)
except PermissionError as e:
print(f"PermissionError moving file '{source_path}': {e}")
log_file_locks(source_path)
except Exception as e:
print(f"Error moving file '{source_path}': {e}")
already_seen = current_files
def capture_replay():
"""Trigger OBS to save a replay buffer"""
try:
ws.call(rq.SaveReplayBuffer())
print("Replay buffer saved.")
asyncio.run_coroutine_threadsafe(
send_status_update("Replay saved"), websocket_loop
)
except Exception as e:
print(f"Error saving replay buffer: {e}")
def start_mic_listener():
"""Listen for microphone input to trigger replay capture using sounddevice and whisper, with volume threshold"""
import sounddevice as sd
import numpy as np
import whisper
import scipy.io.wavfile
import concurrent.futures
whisper_model = whisper.load_model(
"base.en"
) # You can choose "tiny", "base", "small", etc.
print(
"Microphone listener started. Say one of the activation, stop, or replay last commands."
)
print(f"Activation commands: {ACTIVATION_COMMANDS}")
print(f"Stop commands: {STOP_COMMANDS}")
print(f"Replay last commands: {REPLAY_LAST_COMMANDS}")
print(f"Clear replays commands: {CLEAR_REPLAYS_COMMANDS}")
# Calibration phase
print("Calibrating ambient volume, please be quiet...")
ambient_audio = sd.rec(
int(CALIBRATION_DURATION * SAMPLE_RATE),
samplerate=SAMPLE_RATE,
channels=1,
dtype="int16",
)
sd.wait()
ambient_rms = np.sqrt(np.mean(np.square(ambient_audio.astype(np.float32))))
threshold = ambient_rms * 4 # You can adjust multiplier for sensitivity
print(f"Ambient RMS: {ambient_rms:.2f}, threshold set to: {threshold:.2f}")
while True:
print("Listening for command...")
audio = sd.rec(
int(DURATION * SAMPLE_RATE),
samplerate=SAMPLE_RATE,
channels=1,
dtype="int16",
)
sd.wait()
rms = np.sqrt(np.mean(np.square(audio.astype(np.float32))))
print(f"Recorded RMS: {rms:.2f}")
if rms < threshold:
print("No speech detected (below threshold).")
continue
wav_path = "temp_mic_audio.wav"
scipy.io.wavfile.write(wav_path, SAMPLE_RATE, audio)
try:
print("Transcribing audio with Whisper...")
# Transcribe using Whisper with 5 second timeout
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(whisper_model.transcribe, wav_path)
result = future.result(timeout=5)
command = result["text"].lower().strip()
print(f"Heard (Whisper): {command}")
if any(phrase in command for phrase in ACTIVATION_COMMANDS):
capture_replay()
elif any(phrase in command for phrase in STOP_COMMANDS):
asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop)
print("Stop replay command triggered")
is_replaying.clear() # Ensure replay state is cleared on manual stop
elif any(phrase in command for phrase in REPLAY_LAST_COMMANDS):
if last_replay_filename:
show_replay(last_replay_filename)
print(f"Replaying last replay: {last_replay_filename}")
else:
print("No replay available to replay last.")
elif any(phrase in command for phrase in CLEAR_REPLAYS_COMMANDS):
clear_all_replays(MOVETO_DIR)
print("All replays cleared.")
except concurrent.futures.TimeoutError:
print("Whisper transcription timed out (5 seconds).")
except Exception as e:
print(f"Whisper transcription error: {e}")
def start_websocket_server():
"""Start WebSocket server for Electron communication"""
global websocket_loop
websocket_loop = asyncio.new_event_loop()
asyncio.set_event_loop(websocket_loop)
start_server = websockets.serve(websocket_handler, "localhost", WEBSOCKET_PORT)
print(f"WebSocket server started on ws://localhost:{WEBSOCKET_PORT}")
websocket_loop.run_until_complete(start_server)
websocket_loop.run_forever()
def handle_command(command):
"""Process a command string as if it was spoken."""
global is_replaying
command = command.lower().strip()
if any(phrase in command for phrase in ACTIVATION_COMMANDS):
capture_replay()
is_replaying.set()
elif any(phrase in command for phrase in STOP_COMMANDS):
asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop)
print("Stop replay command triggered")
is_replaying.clear() # Ensure replay state is cleared on manual stop
elif any(phrase in command for phrase in REPLAY_LAST_COMMANDS):
if last_replay_filename:
show_replay(last_replay_filename)
print(f"Replaying last replay: {last_replay_filename}")
is_replaying.set()
else:
print("No replay available to replay last.")
elif any(phrase in command for phrase in CLEAR_REPLAYS_COMMANDS):
clear_all_replays(MOVETO_DIR)
print("All replays cleared.")
def start_hotkey_listener():
"""Listen for hotkey globally and toggle replay/abort."""
try:
import keyboard
except ImportError:
print("keyboard library not installed; hotkey feature disabled.")
return
print(f"Global hotkey listener enabled. Press '{HOTKEY['key']}' anywhere to toggle replay.")
while True:
keyboard.wait(HOTKEY["key"])
if is_replaying.is_set():
# Always send stop command and clear state, even if already expired
print(f"Global hotkey '{HOTKEY['key']}' pressed! Aborting replay.")
asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop)
is_replaying.clear() # Ensure replay state is cleared on manual stop
else:
print(f"Global hotkey '{HOTKEY['key']}' pressed! Simulating 'instant replay' command.")
handle_command("instant replay")
time.sleep(0.5) # Prevent rapid retrigger
def main():
set_process_priority()
# Check if at least one input method is enabled
if not VOICE_ENABLED and not HOTKEY["enabled"]:
print(
"ERROR: Both voice recognition and hotkey are disabled. Enable at least one input method."
)
exit(1)
# Start WebSocket server in a separate thread
websocket_thread = threading.Thread(target=start_websocket_server, daemon=True)
websocket_thread.start()
# Start monitoring directory (create another thread)
monitor_thread = threading.Thread(
target=monitor_directory, args=(WATCH_DIR,), daemon=True
)
monitor_thread.start()
# Start microphone listener in a separate thread if enabled
if VOICE_ENABLED:
mic_thread = threading.Thread(target=start_mic_listener, daemon=True)
mic_thread.start()
# Start hotkey listener if enabled
if HOTKEY["enabled"]:
hotkey_thread = threading.Thread(target=start_hotkey_listener, daemon=True)
hotkey_thread.start()
# Keep main thread alive, and kill child processes on exit
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
if __name__ == "__main__":
main()