# Imports import os import time import threading import shutil from obswebsocket import obsws from obswebsocket import requests as rq import websockets import asyncio import json import ctypes import subprocess from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler # Import live display handler (user can customize livedisplay.py) try: from livedisplay import handle_update as live_display_handler LIVE_DISPLAY_AVAILABLE = True except ImportError: print("Warning: livedisplay.py not found. Live display feature disabled.") LIVE_DISPLAY_AVAILABLE = False live_display_handler = None # Try to import psutil early for better performance try: import psutil PSUTIL_AVAILABLE = True except ImportError: PSUTIL_AVAILABLE = False psutil = None def is_obs_running(): """Check if OBS Studio is running.""" if not PSUTIL_AVAILABLE: return True # Assume running if cannot check try: for proc in psutil.process_iter(["name"]): if proc.info["name"] and "obs64.exe" in proc.info["name"].lower(): return True return False except Exception: return True # Assume running if check fails def set_process_priority(): """Elevate process priority on Windows to reduce missed events under heavy load.""" if os.name != "nt": return try: # Try using psutil when available for clarity. import psutil # type: ignore process = psutil.Process(os.getpid()) process.nice(psutil.HIGH_PRIORITY_CLASS) print("Process priority elevated to HIGH_PRIORITY_CLASS") return except ImportError: print("psutil not installed; falling back to ctypes priority raise") except Exception as exc: print(f"Failed to elevate priority with psutil: {exc}; falling back to ctypes") try: kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined] handle = kernel32.GetCurrentProcess() HIGH_PRIORITY_CLASS = 0x00000080 if not kernel32.SetPriorityClass(handle, HIGH_PRIORITY_CLASS): raise OSError("SetPriorityClass returned 0") print("Process priority elevated via ctypes SetPriorityClass") except Exception as exc: print(f"Unable to adjust process priority: {exc}") # Directories WATCH_DIR = r"E:\OBS" MOVETO_DIR = os.path.abspath("./replays") # Server Ports SERVER_PORT = 8000 WEBSOCKET_PORT = 8001 # Replay Settings REPLAY_LENGTH_SECONDS = 20 DEFAULT_LOCATION = "center" # options: 'top_left', 'bottom_left', 'top_right', 'center' DEFAULT_SIZE = 5 # 1 (smallest) to 10 (largest) # Voice Recognition Settings VOICE_ENABLED = False SAMPLE_RATE = 16000 DURATION = 4 # seconds to record per command attempt CALIBRATION_DURATION = 2 # seconds for ambient calibration # Live Display Settings LIVE_DISPLAY_ENABLED = False # Enable periodic status updates during replay LIVE_DISPLAY_INTERVAL = 0.5 # Seconds between each update (e.g., 0.1 = 10 updates per second) # Commands ACTIVATION_COMMANDS = ["instant replay", "save replay", "capture replay", "replay that"] STOP_COMMANDS = ["stop replay", "stop video", "hide replay"] REPLAY_LAST_COMMANDS = [ "replay last", "show last replay", "play last", "replay it again", ] CLEAR_REPLAYS_COMMANDS = [ "clear replays", "delete all replays", "nuke replays", "remove all clips", ] # Hotkey Settings HOTKEY = { "enabled": True, "key": "pagedown", # Change to your preferred key } # Replay state tracking is_replaying = threading.Event() last_replay_filename = None active_replay_threads = {} # Track active live display threads by filename # Ensure the move-to directory exists os.makedirs(MOVETO_DIR, exist_ok=True) # Check if OBS is running before connecting if not is_obs_running(): print("ERROR: OBS Studio is not running. Please start OBS and try again.") exit(1) # Setup OBS ws = obsws("localhost", 4455, "") ws.connect() # Global WebSocket connections websocket_clients = set() async def websocket_handler(websocket, path): """Handle WebSocket connections from Electron app""" websocket_clients.add(websocket) print(f"Electron client connected. Total clients: {len(websocket_clients)}") print("Electron WebSocket connection established!") try: await websocket.wait_closed() finally: websocket_clients.remove(websocket) print(f"Electron client disconnected. Total clients: {len(websocket_clients)}") async def send_video_url(filename, location=DEFAULT_LOCATION, size=DEFAULT_SIZE, duration=None): """Send video URL and display settings to connected Electron clients""" if websocket_clients: video_url = f"http://localhost:{SERVER_PORT}/{filename}" message = json.dumps( { "type": "play_video", "url": video_url, "filename": filename, "location": location, "size": size, "duration": duration, # provided by backend } ) # Send to all connected clients disconnected = set() for client in websocket_clients: try: await client.send(message) print(f"Sent video URL to Electron: {video_url} (duration={duration})") except websockets.exceptions.ConnectionClosed: disconnected.add(client) # Clean up disconnected clients websocket_clients.difference_update(disconnected) async def send_stop_command(): """Send stop command to connected Electron clients""" # Stop all active live display threads for stop_event in list(active_replay_threads.values()): stop_event.set() if websocket_clients: message = json.dumps({"type": "stop_video"}) # Send to all connected clients disconnected = set() for client in websocket_clients: try: await client.send(message) print("Sent stop command to Electron") except websockets.exceptions.ConnectionClosed: disconnected.add(client) # Clean up disconnected clients websocket_clients.difference_update(disconnected) async def send_status_update(status): """Send status update to Electron clients for HUD loading screen.""" if websocket_clients: message = json.dumps({"type": "status_update", "status": status}) disconnected = set() for client in websocket_clients: try: await client.send(message) except websockets.exceptions.ConnectionClosed: disconnected.add(client) websocket_clients.difference_update(disconnected) def get_media_duration(filepath): """Return media duration in seconds (float) using ffprobe, or None on failure.""" try: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", filepath, ], capture_output=True, text=True, check=True, ) dur = float(result.stdout.strip()) return round(dur, 3) except Exception as e: print(f"Error getting duration for {filepath}: {e}") return None def live_display_update_thread(filename, duration): """Thread that periodically calls the live display handler with current replay state.""" if not LIVE_DISPLAY_ENABLED or not LIVE_DISPLAY_AVAILABLE or live_display_handler is None: return start_time = time.time() stop_event = active_replay_threads.get(filename) next_update_time = start_time # Track when next update should happen try: while not stop_event.is_set(): current_time = time.time() elapsed = current_time - start_time # Stop if we've exceeded the duration if duration and elapsed >= duration: break # Only send update if we've reached the next scheduled update time if current_time >= next_update_time: # Call the handler with current state try: live_display_handler("playing", elapsed) except Exception as handler_error: # Don't let handler errors stop the update loop print(f"Error in live_display_handler: {handler_error}") # Schedule next update at a fixed interval from start, not from now # This prevents drift from slow handler execution next_update_time += LIVE_DISPLAY_INTERVAL # If we're falling behind, catch up to current time if next_update_time < current_time: next_update_time = current_time + LIVE_DISPLAY_INTERVAL # Sleep for a short time or until stopped (use smaller sleep to reduce latency) sleep_time = min(0.01, max(0.001, next_update_time - time.time())) if sleep_time > 0: stop_event.wait(timeout=sleep_time) # Send final "stopped" update final_timestamp = time.time() - start_time try: live_display_handler("stopped", final_timestamp) except Exception as handler_error: print(f"Error in live_display_handler (stopped): {handler_error}") except Exception as e: print(f"Error in live display update thread: {e}") finally: # Clean up the thread tracking if filename in active_replay_threads: del active_replay_threads[filename] def show_replay(filename, location=DEFAULT_LOCATION, size=DEFAULT_SIZE): global last_replay_filename filepath = os.path.join(MOVETO_DIR, filename) if os.path.exists(filepath): # Ensure file is ready before sending to Electron (faster check) if not wait_for_file_ready(filepath, timeout=5, poll_interval=0.1): print(f"Replay file not ready for playback: {filepath}") log_file_locks(filepath) return # Compute duration once, send to Electron duration = get_media_duration(filepath) # Prepare live display BEFORE sending video to Electron for perfect sync if LIVE_DISPLAY_ENABLED and LIVE_DISPLAY_AVAILABLE: # Stop any existing live display thread for this filename if filename in active_replay_threads: active_replay_threads[filename].set() # Create a new stop event for this replay stop_event = threading.Event() active_replay_threads[filename] = stop_event # Send initial "playing" state at timestamp 0 IMMEDIATELY try: live_display_handler("playing", 0.0) except Exception as handler_error: print(f"Error in initial live_display_handler call: {handler_error}") # Start the live display update thread threading.Thread( target=live_display_update_thread, args=(filename, duration), daemon=True ).start() # Now send video to Electron (live display already started) asyncio.run_coroutine_threadsafe( send_video_url(filename, location, size, duration), websocket_loop ) last_replay_filename = filename print(f"Sent replay to Electron: {filepath} (location={location}, size={size}, duration={duration})") is_replaying.set() threading.Thread(target=auto_stop_replay, args=(REPLAY_LENGTH_SECONDS,), daemon=True).start() else: print(f"Replay file not found: {filepath}") def auto_stop_replay(duration): """Automatically stop replay after duration if not stopped manually.""" time.sleep(duration) # Only auto-stop if replaying is still active (not stopped manually) if is_replaying.is_set(): print(f"Auto-stopping replay after {duration} seconds.") # Stop all active live display threads for stop_event in list(active_replay_threads.values()): stop_event.set() asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop) is_replaying.clear() def delete_old_files(directory, max_age_seconds=3 * 60 * 60): """Delete files older than max_age_seconds in the given directory.""" now = time.time() for filename in os.listdir(directory): filepath = os.path.join(directory, filename) if os.path.isfile(filepath): file_age = now - os.path.getmtime(filepath) if file_age > max_age_seconds: try: os.remove(filepath) print(f"Deleted old replay: {filepath}") except Exception as e: print(f"Error deleting {filepath}: {e}") def clear_all_replays(directory): """Delete all files in the given directory.""" for filename in os.listdir(directory): filepath = os.path.join(directory, filename) if os.path.isfile(filepath): try: os.remove(filepath) print(f"Deleted replay: {filepath}") except Exception as e: print(f"Error deleting {filepath}: {e}") def trim_replay_with_ffmpeg(filepath, length_seconds): """Trim the replay file to the specified length using ffmpeg, only if needed.""" import subprocess # Get video duration using ffprobe (single call) try: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", filepath, ], capture_output=True, text=True, check=True, ) duration = float(result.stdout.strip()) except Exception as e: print(f"Error getting duration for {filepath}: {e}") return # Exit early if we can't get duration if duration <= length_seconds + 0.1: print( f"Skipping trim: video duration ({duration:.2f}s) <= target ({length_seconds}s)" ) return temp_filepath = filepath + ".trimmed.mp4" try: # Calculate start time to keep the last X seconds start_time = max(0, duration - length_seconds) # ffmpeg command: cut from start_time to end subprocess.run( [ "ffmpeg", "-y", "-loglevel", "error", "-ss", str(start_time), "-i", filepath, "-t", str(length_seconds), "-c", "copy", "-movflags", "+faststart", temp_filepath, ], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) # Replace original file with trimmed file os.replace(temp_filepath, filepath) print(f"Trimmed replay to last {length_seconds}s: {filepath}") except Exception as e: print(f"Error trimming replay {filepath}: {e}") def log_file_locks(filepath): """Log processes that are using the given file (Windows only).""" if not PSUTIL_AVAILABLE: print("psutil not available; cannot check file locks.") return try: locked_by = [] for proc in psutil.process_iter(["pid", "name"]): try: for item in proc.open_files(): if os.path.normcase(item.path) == os.path.normcase(filepath): locked_by.append(f"PID {proc.pid} ({proc.name()})") except (psutil.NoSuchProcess, psutil.AccessDenied): continue if locked_by: print(f"File '{filepath}' is locked by: {', '.join(locked_by)}") else: print(f"File '{filepath}' is not locked by any process.") except Exception as e: print(f"Error checking file locks: {e}") def wait_for_file_ready(filepath, timeout=10, poll_interval=0.1): """Wait for a file to stop growing and release any locks.""" deadline = time.time() + timeout last_size = -1 stable_reads = 0 while time.time() < deadline: try: current_size = os.path.getsize(filepath) with open(filepath, "rb"): pass except (FileNotFoundError, PermissionError, OSError): time.sleep(poll_interval) continue if current_size == last_size: stable_reads += 1 if stable_reads >= 2: return True else: last_size = current_size stable_reads = 0 time.sleep(poll_interval) print(f"Timed out waiting for file to be ready: {filepath}") return False def monitor_directory(path): print(f"Monitoring directory: {path}") already_seen = set(os.listdir(path)) cleanup_counter = 0 while True: time.sleep(0.5) # Check every 0.5 seconds for faster detection current_files = set(os.listdir(path)) new_files = current_files - already_seen # Delete old files in MOVETO_DIR only every 60 cycles (30 seconds) cleanup_counter += 1 if cleanup_counter >= 60: delete_old_files(MOVETO_DIR) cleanup_counter = 0 for new_file in new_files: print(f"New file detected: {new_file}") source_path = os.path.join(path, new_file) dest_path = os.path.join(MOVETO_DIR, new_file) try: # Status: waiting for file asyncio.run_coroutine_threadsafe( send_status_update("Waiting for file"), websocket_loop ) if not wait_for_file_ready(source_path): print(f"Skipping file that never became accessible: {source_path}") continue # Status: moving file asyncio.run_coroutine_threadsafe( send_status_update("Moving file"), websocket_loop ) shutil.move(source_path, dest_path) print(f"Moved file to: {dest_path}") # Status: waiting for moved file asyncio.run_coroutine_threadsafe( send_status_update("Waiting for moved file"), websocket_loop ) if not wait_for_file_ready(dest_path): print( f"Skipping replay because destination file stayed locked: {dest_path}" ) continue # Status: cutting replay asyncio.run_coroutine_threadsafe( send_status_update("Cutting replay"), websocket_loop ) trim_replay_with_ffmpeg(dest_path, REPLAY_LENGTH_SECONDS) # Status: finalizing asyncio.run_coroutine_threadsafe( send_status_update("Finalizing"), websocket_loop ) time.sleep(0.5) # Status: ready to play asyncio.run_coroutine_threadsafe( send_status_update("Ready to play"), websocket_loop ) # trigger playback show_replay(new_file) except PermissionError as e: print(f"PermissionError moving file '{source_path}': {e}") log_file_locks(source_path) except Exception as e: print(f"Error moving file '{source_path}': {e}") already_seen = current_files def capture_replay(): """Trigger OBS to save a replay buffer""" try: ws.call(rq.SaveReplayBuffer()) print("Replay buffer saved.") asyncio.run_coroutine_threadsafe( send_status_update("Replay saved"), websocket_loop ) except Exception as e: print(f"Error saving replay buffer: {e}") def start_mic_listener(): """Listen for microphone input to trigger replay capture using sounddevice and whisper, with volume threshold""" import sounddevice as sd import numpy as np import whisper import scipy.io.wavfile import concurrent.futures whisper_model = whisper.load_model( "base.en" ) # You can choose "tiny", "base", "small", etc. print( "Microphone listener started. Say one of the activation, stop, or replay last commands." ) print(f"Activation commands: {ACTIVATION_COMMANDS}") print(f"Stop commands: {STOP_COMMANDS}") print(f"Replay last commands: {REPLAY_LAST_COMMANDS}") print(f"Clear replays commands: {CLEAR_REPLAYS_COMMANDS}") # Calibration phase print("Calibrating ambient volume, please be quiet...") ambient_audio = sd.rec( int(CALIBRATION_DURATION * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="int16", ) sd.wait() ambient_rms = np.sqrt(np.mean(np.square(ambient_audio.astype(np.float32)))) threshold = ambient_rms * 4 # You can adjust multiplier for sensitivity print(f"Ambient RMS: {ambient_rms:.2f}, threshold set to: {threshold:.2f}") while True: print("Listening for command...") audio = sd.rec( int(DURATION * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="int16", ) sd.wait() rms = np.sqrt(np.mean(np.square(audio.astype(np.float32)))) print(f"Recorded RMS: {rms:.2f}") if rms < threshold: print("No speech detected (below threshold).") continue wav_path = "temp_mic_audio.wav" scipy.io.wavfile.write(wav_path, SAMPLE_RATE, audio) try: print("Transcribing audio with Whisper...") # Transcribe using Whisper with 5 second timeout with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(whisper_model.transcribe, wav_path) result = future.result(timeout=5) command = result["text"].lower().strip() print(f"Heard (Whisper): {command}") if any(phrase in command for phrase in ACTIVATION_COMMANDS): capture_replay() elif any(phrase in command for phrase in STOP_COMMANDS): asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop) print("Stop replay command triggered") is_replaying.clear() # Ensure replay state is cleared on manual stop elif any(phrase in command for phrase in REPLAY_LAST_COMMANDS): if last_replay_filename: show_replay(last_replay_filename) print(f"Replaying last replay: {last_replay_filename}") else: print("No replay available to replay last.") elif any(phrase in command for phrase in CLEAR_REPLAYS_COMMANDS): clear_all_replays(MOVETO_DIR) print("All replays cleared.") except concurrent.futures.TimeoutError: print("Whisper transcription timed out (5 seconds).") except Exception as e: print(f"Whisper transcription error: {e}") def start_websocket_server(): """Start WebSocket server for Electron communication""" global websocket_loop websocket_loop = asyncio.new_event_loop() asyncio.set_event_loop(websocket_loop) start_server = websockets.serve(websocket_handler, "localhost", WEBSOCKET_PORT) print(f"WebSocket server started on ws://localhost:{WEBSOCKET_PORT}") websocket_loop.run_until_complete(start_server) websocket_loop.run_forever() def handle_command(command): """Process a command string as if it was spoken.""" global is_replaying command = command.lower().strip() if any(phrase in command for phrase in ACTIVATION_COMMANDS): capture_replay() is_replaying.set() elif any(phrase in command for phrase in STOP_COMMANDS): asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop) print("Stop replay command triggered") is_replaying.clear() # Ensure replay state is cleared on manual stop elif any(phrase in command for phrase in REPLAY_LAST_COMMANDS): if last_replay_filename: show_replay(last_replay_filename) print(f"Replaying last replay: {last_replay_filename}") is_replaying.set() else: print("No replay available to replay last.") elif any(phrase in command for phrase in CLEAR_REPLAYS_COMMANDS): clear_all_replays(MOVETO_DIR) print("All replays cleared.") def start_hotkey_listener(): """Listen for hotkey globally and toggle replay/abort.""" try: import keyboard except ImportError: print("keyboard library not installed; hotkey feature disabled.") return print(f"Global hotkey listener enabled. Press '{HOTKEY['key']}' anywhere to toggle replay.") while True: keyboard.wait(HOTKEY["key"]) if is_replaying.is_set(): # Always send stop command and clear state, even if already expired print(f"Global hotkey '{HOTKEY['key']}' pressed! Aborting replay.") asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop) is_replaying.clear() # Ensure replay state is cleared on manual stop else: print(f"Global hotkey '{HOTKEY['key']}' pressed! Simulating 'instant replay' command.") handle_command("instant replay") time.sleep(0.5) # Prevent rapid retrigger def main(): set_process_priority() # Check if at least one input method is enabled if not VOICE_ENABLED and not HOTKEY["enabled"]: print( "ERROR: Both voice recognition and hotkey are disabled. Enable at least one input method." ) exit(1) # Start WebSocket server in a separate thread websocket_thread = threading.Thread(target=start_websocket_server, daemon=True) websocket_thread.start() # Start monitoring directory (create another thread) monitor_thread = threading.Thread( target=monitor_directory, args=(WATCH_DIR,), daemon=True ) monitor_thread.start() # Start microphone listener in a separate thread if enabled if VOICE_ENABLED: mic_thread = threading.Thread(target=start_mic_listener, daemon=True) mic_thread.start() # Start hotkey listener if enabled if HOTKEY["enabled"]: hotkey_thread = threading.Thread(target=start_hotkey_listener, daemon=True) hotkey_thread.start() # Keep main thread alive, and kill child processes on exit try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down...") if __name__ == "__main__": main()