814 lines
29 KiB
Python
814 lines
29 KiB
Python
# Imports
|
|
import os
|
|
import time
|
|
import threading
|
|
import shutil
|
|
from obswebsocket import obsws
|
|
from obswebsocket import requests as rq
|
|
import websockets
|
|
import asyncio
|
|
import json
|
|
import ctypes
|
|
import subprocess
|
|
from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler
|
|
|
|
# Import live display handler (user can customize livedisplay.py)
|
|
try:
|
|
from livedisplay import handle_update as live_display_handler
|
|
LIVE_DISPLAY_AVAILABLE = True
|
|
except ImportError:
|
|
print("Warning: livedisplay.py not found. Live display feature disabled.")
|
|
LIVE_DISPLAY_AVAILABLE = False
|
|
live_display_handler = None
|
|
|
|
# Try to import psutil early for better performance
|
|
try:
|
|
import psutil
|
|
PSUTIL_AVAILABLE = True
|
|
except ImportError:
|
|
PSUTIL_AVAILABLE = False
|
|
psutil = None
|
|
|
|
|
|
def load_config():
|
|
"""Load configuration from config.json with fallback to defaults."""
|
|
default_config = {
|
|
"directories": {
|
|
"watch_dir": r"E:\OBS",
|
|
"moveto_dir": "./replays"
|
|
},
|
|
"server": {
|
|
"port": 8000,
|
|
"websocket_port": 8001
|
|
},
|
|
"replay": {
|
|
"length_seconds": 20,
|
|
"default_location": "center",
|
|
"default_size": 5
|
|
},
|
|
"voice_recognition": {
|
|
"enabled": False,
|
|
"sample_rate": 16000,
|
|
"duration": 4,
|
|
"calibration_duration": 2
|
|
},
|
|
"live_display": {
|
|
"enabled": False,
|
|
"interval": 0.5
|
|
},
|
|
"commands": {
|
|
"activation": ["instant replay", "save replay", "capture replay", "replay that"],
|
|
"stop": ["stop replay", "stop video", "hide replay"],
|
|
"replay_last": ["replay last", "show last replay", "play last", "replay it again"],
|
|
"clear_replays": ["clear replays", "delete all replays", "nuke replays", "remove all clips"]
|
|
},
|
|
"hotkey": {
|
|
"enabled": True,
|
|
"key": "pagedown"
|
|
}
|
|
}
|
|
|
|
config_path = "config.json"
|
|
if os.path.exists(config_path):
|
|
try:
|
|
with open(config_path, "r") as f:
|
|
user_config = json.load(f)
|
|
# Merge user config with defaults (deep merge)
|
|
def deep_merge(default, user):
|
|
result = default.copy()
|
|
for key, value in user.items():
|
|
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
|
|
result[key] = deep_merge(result[key], value)
|
|
else:
|
|
result[key] = value
|
|
return result
|
|
config = deep_merge(default_config, user_config)
|
|
print(f"Configuration loaded from {config_path}")
|
|
return config
|
|
except Exception as e:
|
|
print(f"Error loading config.json: {e}. Using default configuration.")
|
|
return default_config
|
|
else:
|
|
print(f"No {config_path} found. Using default configuration.")
|
|
print(f"Tip: Copy config.example.json to config.json to customize settings.")
|
|
return default_config
|
|
|
|
|
|
# Load configuration
|
|
config = load_config()
|
|
|
|
|
|
def is_obs_running():
|
|
"""Check if OBS Studio is running."""
|
|
if not PSUTIL_AVAILABLE:
|
|
return True # Assume running if cannot check
|
|
|
|
try:
|
|
for proc in psutil.process_iter(["name"]):
|
|
if proc.info["name"] and "obs64.exe" in proc.info["name"].lower():
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return True # Assume running if check fails
|
|
|
|
|
|
def set_process_priority():
|
|
"""Elevate process priority on Windows to reduce missed events under heavy load."""
|
|
if os.name != "nt":
|
|
return
|
|
|
|
try:
|
|
# Try using psutil when available for clarity.
|
|
import psutil # type: ignore
|
|
|
|
process = psutil.Process(os.getpid())
|
|
process.nice(psutil.HIGH_PRIORITY_CLASS)
|
|
print("Process priority elevated to HIGH_PRIORITY_CLASS")
|
|
return
|
|
except ImportError:
|
|
print("psutil not installed; falling back to ctypes priority raise")
|
|
except Exception as exc:
|
|
print(f"Failed to elevate priority with psutil: {exc}; falling back to ctypes")
|
|
|
|
try:
|
|
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]
|
|
handle = kernel32.GetCurrentProcess()
|
|
HIGH_PRIORITY_CLASS = 0x00000080
|
|
if not kernel32.SetPriorityClass(handle, HIGH_PRIORITY_CLASS):
|
|
raise OSError("SetPriorityClass returned 0")
|
|
print("Process priority elevated via ctypes SetPriorityClass")
|
|
except Exception as exc:
|
|
print(f"Unable to adjust process priority: {exc}")
|
|
|
|
|
|
# Extract configuration values
|
|
WATCH_DIR = config["directories"]["watch_dir"]
|
|
MOVETO_DIR = os.path.abspath(config["directories"]["moveto_dir"])
|
|
|
|
SERVER_PORT = config["server"]["port"]
|
|
WEBSOCKET_PORT = config["server"]["websocket_port"]
|
|
|
|
REPLAY_LENGTH_SECONDS = config["replay"]["length_seconds"]
|
|
DEFAULT_LOCATION = config["replay"]["default_location"]
|
|
DEFAULT_SIZE = config["replay"]["default_size"]
|
|
|
|
VOICE_ENABLED = config["voice_recognition"]["enabled"]
|
|
SAMPLE_RATE = config["voice_recognition"]["sample_rate"]
|
|
DURATION = config["voice_recognition"]["duration"]
|
|
CALIBRATION_DURATION = config["voice_recognition"]["calibration_duration"]
|
|
|
|
LIVE_DISPLAY_ENABLED = config["live_display"]["enabled"]
|
|
LIVE_DISPLAY_INTERVAL = config["live_display"]["interval"]
|
|
|
|
ACTIVATION_COMMANDS = config["commands"]["activation"]
|
|
STOP_COMMANDS = config["commands"]["stop"]
|
|
REPLAY_LAST_COMMANDS = config["commands"]["replay_last"]
|
|
CLEAR_REPLAYS_COMMANDS = config["commands"]["clear_replays"]
|
|
|
|
HOTKEY = config["hotkey"]
|
|
|
|
# Replay state tracking
|
|
is_replaying = threading.Event()
|
|
|
|
last_replay_filename = None
|
|
active_replay_threads = {} # Track active live display threads by filename
|
|
|
|
# Ensure the move-to directory exists
|
|
os.makedirs(MOVETO_DIR, exist_ok=True)
|
|
|
|
# Check if OBS is running before connecting
|
|
if not is_obs_running():
|
|
print("ERROR: OBS Studio is not running. Please start OBS and try again.")
|
|
exit(1)
|
|
|
|
# Setup OBS
|
|
ws = obsws("localhost", 4455, "")
|
|
ws.connect()
|
|
|
|
# Global WebSocket connections
|
|
websocket_clients = set()
|
|
|
|
|
|
async def websocket_handler(websocket, path):
|
|
"""Handle WebSocket connections from Electron app"""
|
|
websocket_clients.add(websocket)
|
|
print(f"Electron client connected. Total clients: {len(websocket_clients)}")
|
|
print("Electron WebSocket connection established!")
|
|
try:
|
|
await websocket.wait_closed()
|
|
finally:
|
|
websocket_clients.remove(websocket)
|
|
print(f"Electron client disconnected. Total clients: {len(websocket_clients)}")
|
|
|
|
|
|
async def send_video_url(filename, location=DEFAULT_LOCATION, size=DEFAULT_SIZE, duration=None):
|
|
"""Send video URL and display settings to connected Electron clients"""
|
|
if websocket_clients:
|
|
video_url = f"http://localhost:{SERVER_PORT}/{filename}"
|
|
message = json.dumps(
|
|
{
|
|
"type": "play_video",
|
|
"url": video_url,
|
|
"filename": filename,
|
|
"location": location,
|
|
"size": size,
|
|
"duration": duration, # provided by backend
|
|
}
|
|
)
|
|
|
|
# Send to all connected clients
|
|
disconnected = set()
|
|
for client in websocket_clients:
|
|
try:
|
|
await client.send(message)
|
|
print(f"Sent video URL to Electron: {video_url} (duration={duration})")
|
|
except websockets.exceptions.ConnectionClosed:
|
|
disconnected.add(client)
|
|
|
|
# Clean up disconnected clients
|
|
websocket_clients.difference_update(disconnected)
|
|
|
|
|
|
async def send_stop_command():
|
|
"""Send stop command to connected Electron clients"""
|
|
# Stop all active live display threads
|
|
for stop_event in list(active_replay_threads.values()):
|
|
stop_event.set()
|
|
|
|
if websocket_clients:
|
|
message = json.dumps({"type": "stop_video"})
|
|
|
|
# Send to all connected clients
|
|
disconnected = set()
|
|
for client in websocket_clients:
|
|
try:
|
|
await client.send(message)
|
|
print("Sent stop command to Electron")
|
|
except websockets.exceptions.ConnectionClosed:
|
|
disconnected.add(client)
|
|
|
|
# Clean up disconnected clients
|
|
websocket_clients.difference_update(disconnected)
|
|
|
|
|
|
async def send_status_update(status):
|
|
"""Send status update to Electron clients for HUD loading screen."""
|
|
if websocket_clients:
|
|
message = json.dumps({"type": "status_update", "status": status})
|
|
disconnected = set()
|
|
for client in websocket_clients:
|
|
try:
|
|
await client.send(message)
|
|
except websockets.exceptions.ConnectionClosed:
|
|
disconnected.add(client)
|
|
websocket_clients.difference_update(disconnected)
|
|
|
|
|
|
def get_media_duration(filepath):
|
|
"""Return media duration in seconds (float) using ffprobe, or None on failure."""
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"ffprobe",
|
|
"-v", "error",
|
|
"-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
|
filepath,
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
dur = float(result.stdout.strip())
|
|
return round(dur, 3)
|
|
except Exception as e:
|
|
print(f"Error getting duration for {filepath}: {e}")
|
|
return None
|
|
|
|
|
|
def live_display_update_thread(filename, duration):
|
|
"""Thread that periodically calls the live display handler with current replay state."""
|
|
if not LIVE_DISPLAY_ENABLED or not LIVE_DISPLAY_AVAILABLE or live_display_handler is None:
|
|
return
|
|
|
|
start_time = time.time()
|
|
stop_event = active_replay_threads.get(filename)
|
|
next_update_time = start_time # Track when next update should happen
|
|
|
|
try:
|
|
while not stop_event.is_set():
|
|
current_time = time.time()
|
|
elapsed = current_time - start_time
|
|
|
|
# Stop if we've exceeded the duration
|
|
if duration and elapsed >= duration:
|
|
break
|
|
|
|
# Only send update if we've reached the next scheduled update time
|
|
if current_time >= next_update_time:
|
|
# Call the handler with current state
|
|
try:
|
|
live_display_handler("playing", elapsed)
|
|
except Exception as handler_error:
|
|
# Don't let handler errors stop the update loop
|
|
print(f"Error in live_display_handler: {handler_error}")
|
|
|
|
# Schedule next update at a fixed interval from start, not from now
|
|
# This prevents drift from slow handler execution
|
|
next_update_time += LIVE_DISPLAY_INTERVAL
|
|
|
|
# If we're falling behind, catch up to current time
|
|
if next_update_time < current_time:
|
|
next_update_time = current_time + LIVE_DISPLAY_INTERVAL
|
|
|
|
# Sleep for a short time or until stopped (use smaller sleep to reduce latency)
|
|
sleep_time = min(0.01, max(0.001, next_update_time - time.time()))
|
|
if sleep_time > 0:
|
|
stop_event.wait(timeout=sleep_time)
|
|
|
|
# Send final "stopped" update
|
|
final_timestamp = time.time() - start_time
|
|
try:
|
|
live_display_handler("stopped", final_timestamp)
|
|
except Exception as handler_error:
|
|
print(f"Error in live_display_handler (stopped): {handler_error}")
|
|
|
|
except Exception as e:
|
|
print(f"Error in live display update thread: {e}")
|
|
finally:
|
|
# Clean up the thread tracking
|
|
if filename in active_replay_threads:
|
|
del active_replay_threads[filename]
|
|
|
|
|
|
def show_replay(filename, location=DEFAULT_LOCATION, size=DEFAULT_SIZE):
|
|
global last_replay_filename
|
|
filepath = os.path.join(MOVETO_DIR, filename)
|
|
if os.path.exists(filepath):
|
|
# Ensure file is ready before sending to Electron (faster check)
|
|
if not wait_for_file_ready(filepath, timeout=5, poll_interval=0.1):
|
|
print(f"Replay file not ready for playback: {filepath}")
|
|
log_file_locks(filepath)
|
|
return
|
|
# Compute duration once, send to Electron
|
|
duration = get_media_duration(filepath)
|
|
|
|
# Prepare live display BEFORE sending video to Electron for perfect sync
|
|
if LIVE_DISPLAY_ENABLED and LIVE_DISPLAY_AVAILABLE:
|
|
# Stop any existing live display thread for this filename
|
|
if filename in active_replay_threads:
|
|
active_replay_threads[filename].set()
|
|
|
|
# Create a new stop event for this replay
|
|
stop_event = threading.Event()
|
|
active_replay_threads[filename] = stop_event
|
|
|
|
# Send initial "playing" state at timestamp 0 IMMEDIATELY
|
|
try:
|
|
live_display_handler("playing", 0.0)
|
|
except Exception as handler_error:
|
|
print(f"Error in initial live_display_handler call: {handler_error}")
|
|
|
|
# Start the live display update thread
|
|
threading.Thread(
|
|
target=live_display_update_thread,
|
|
args=(filename, duration),
|
|
daemon=True
|
|
).start()
|
|
|
|
# Now send video to Electron (live display already started)
|
|
asyncio.run_coroutine_threadsafe(
|
|
send_video_url(filename, location, size, duration), websocket_loop
|
|
)
|
|
last_replay_filename = filename
|
|
print(f"Sent replay to Electron: {filepath} (location={location}, size={size}, duration={duration})")
|
|
is_replaying.set()
|
|
|
|
threading.Thread(target=auto_stop_replay, args=(REPLAY_LENGTH_SECONDS,), daemon=True).start()
|
|
else:
|
|
print(f"Replay file not found: {filepath}")
|
|
|
|
def auto_stop_replay(duration):
|
|
"""Automatically stop replay after duration if not stopped manually."""
|
|
time.sleep(duration)
|
|
# Only auto-stop if replaying is still active (not stopped manually)
|
|
if is_replaying.is_set():
|
|
print(f"Auto-stopping replay after {duration} seconds.")
|
|
# Stop all active live display threads
|
|
for stop_event in list(active_replay_threads.values()):
|
|
stop_event.set()
|
|
asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop)
|
|
is_replaying.clear()
|
|
|
|
def delete_old_files(directory, max_age_seconds=3 * 60 * 60):
|
|
"""Delete files older than max_age_seconds in the given directory."""
|
|
now = time.time()
|
|
for filename in os.listdir(directory):
|
|
filepath = os.path.join(directory, filename)
|
|
if os.path.isfile(filepath):
|
|
file_age = now - os.path.getmtime(filepath)
|
|
if file_age > max_age_seconds:
|
|
try:
|
|
os.remove(filepath)
|
|
print(f"Deleted old replay: {filepath}")
|
|
except Exception as e:
|
|
print(f"Error deleting {filepath}: {e}")
|
|
|
|
|
|
def clear_all_replays(directory):
|
|
"""Delete all files in the given directory."""
|
|
for filename in os.listdir(directory):
|
|
filepath = os.path.join(directory, filename)
|
|
if os.path.isfile(filepath):
|
|
try:
|
|
os.remove(filepath)
|
|
print(f"Deleted replay: {filepath}")
|
|
except Exception as e:
|
|
print(f"Error deleting {filepath}: {e}")
|
|
|
|
|
|
def trim_replay_with_ffmpeg(filepath, length_seconds):
|
|
"""Trim the replay file to the specified length using ffmpeg, only if needed."""
|
|
import subprocess
|
|
|
|
# Get video duration using ffprobe (single call)
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-show_entries",
|
|
"format=duration",
|
|
"-of",
|
|
"default=noprint_wrappers=1:nokey=1",
|
|
filepath,
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
duration = float(result.stdout.strip())
|
|
except Exception as e:
|
|
print(f"Error getting duration for {filepath}: {e}")
|
|
return # Exit early if we can't get duration
|
|
|
|
if duration <= length_seconds + 0.1:
|
|
print(
|
|
f"Skipping trim: video duration ({duration:.2f}s) <= target ({length_seconds}s)"
|
|
)
|
|
return
|
|
|
|
temp_filepath = filepath + ".trimmed.mp4"
|
|
try:
|
|
# Calculate start time to keep the last X seconds
|
|
start_time = max(0, duration - length_seconds)
|
|
# ffmpeg command: cut from start_time to end
|
|
subprocess.run(
|
|
[
|
|
"ffmpeg",
|
|
"-y",
|
|
"-loglevel",
|
|
"error",
|
|
"-ss",
|
|
str(start_time),
|
|
"-i",
|
|
filepath,
|
|
"-t",
|
|
str(length_seconds),
|
|
"-c",
|
|
"copy",
|
|
"-movflags",
|
|
"+faststart",
|
|
temp_filepath,
|
|
],
|
|
check=True,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
# Replace original file with trimmed file
|
|
os.replace(temp_filepath, filepath)
|
|
print(f"Trimmed replay to last {length_seconds}s: {filepath}")
|
|
except Exception as e:
|
|
print(f"Error trimming replay {filepath}: {e}")
|
|
|
|
|
|
def log_file_locks(filepath):
|
|
"""Log processes that are using the given file (Windows only)."""
|
|
if not PSUTIL_AVAILABLE:
|
|
print("psutil not available; cannot check file locks.")
|
|
return
|
|
|
|
try:
|
|
locked_by = []
|
|
for proc in psutil.process_iter(["pid", "name"]):
|
|
try:
|
|
for item in proc.open_files():
|
|
if os.path.normcase(item.path) == os.path.normcase(filepath):
|
|
locked_by.append(f"PID {proc.pid} ({proc.name()})")
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
continue
|
|
if locked_by:
|
|
print(f"File '{filepath}' is locked by: {', '.join(locked_by)}")
|
|
else:
|
|
print(f"File '{filepath}' is not locked by any process.")
|
|
except Exception as e:
|
|
print(f"Error checking file locks: {e}")
|
|
|
|
|
|
def wait_for_file_ready(filepath, timeout=10, poll_interval=0.1):
|
|
"""Wait for a file to stop growing and release any locks."""
|
|
deadline = time.time() + timeout
|
|
last_size = -1
|
|
stable_reads = 0
|
|
|
|
while time.time() < deadline:
|
|
try:
|
|
current_size = os.path.getsize(filepath)
|
|
with open(filepath, "rb"):
|
|
pass
|
|
except (FileNotFoundError, PermissionError, OSError):
|
|
time.sleep(poll_interval)
|
|
continue
|
|
|
|
if current_size == last_size:
|
|
stable_reads += 1
|
|
if stable_reads >= 2:
|
|
return True
|
|
else:
|
|
last_size = current_size
|
|
stable_reads = 0
|
|
|
|
time.sleep(poll_interval)
|
|
|
|
print(f"Timed out waiting for file to be ready: {filepath}")
|
|
return False
|
|
|
|
|
|
def monitor_directory(path):
|
|
print(f"Monitoring directory: {path}")
|
|
already_seen = set(os.listdir(path))
|
|
cleanup_counter = 0
|
|
|
|
while True:
|
|
time.sleep(0.5) # Check every 0.5 seconds for faster detection
|
|
current_files = set(os.listdir(path))
|
|
new_files = current_files - already_seen
|
|
|
|
# Delete old files in MOVETO_DIR only every 60 cycles (30 seconds)
|
|
cleanup_counter += 1
|
|
if cleanup_counter >= 60:
|
|
delete_old_files(MOVETO_DIR)
|
|
cleanup_counter = 0
|
|
|
|
for new_file in new_files:
|
|
print(f"New file detected: {new_file}")
|
|
source_path = os.path.join(path, new_file)
|
|
dest_path = os.path.join(MOVETO_DIR, new_file)
|
|
|
|
try:
|
|
# Status: waiting for file
|
|
asyncio.run_coroutine_threadsafe(
|
|
send_status_update("Waiting for file"), websocket_loop
|
|
)
|
|
if not wait_for_file_ready(source_path):
|
|
print(f"Skipping file that never became accessible: {source_path}")
|
|
continue
|
|
|
|
# Status: moving file
|
|
asyncio.run_coroutine_threadsafe(
|
|
send_status_update("Moving file"), websocket_loop
|
|
)
|
|
shutil.move(source_path, dest_path)
|
|
print(f"Moved file to: {dest_path}")
|
|
|
|
# Status: waiting for moved file
|
|
asyncio.run_coroutine_threadsafe(
|
|
send_status_update("Waiting for moved file"), websocket_loop
|
|
)
|
|
if not wait_for_file_ready(dest_path):
|
|
print(
|
|
f"Skipping replay because destination file stayed locked: {dest_path}"
|
|
)
|
|
continue
|
|
|
|
# Status: cutting replay
|
|
asyncio.run_coroutine_threadsafe(
|
|
send_status_update("Cutting replay"), websocket_loop
|
|
)
|
|
trim_replay_with_ffmpeg(dest_path, REPLAY_LENGTH_SECONDS)
|
|
|
|
# Status: finalizing
|
|
asyncio.run_coroutine_threadsafe(
|
|
send_status_update("Finalizing"), websocket_loop
|
|
)
|
|
time.sleep(0.5)
|
|
|
|
# Status: ready to play
|
|
asyncio.run_coroutine_threadsafe(
|
|
send_status_update("Ready to play"), websocket_loop
|
|
)
|
|
|
|
# trigger playback
|
|
show_replay(new_file)
|
|
except PermissionError as e:
|
|
print(f"PermissionError moving file '{source_path}': {e}")
|
|
log_file_locks(source_path)
|
|
except Exception as e:
|
|
print(f"Error moving file '{source_path}': {e}")
|
|
|
|
already_seen = current_files
|
|
|
|
|
|
def capture_replay():
|
|
"""Trigger OBS to save a replay buffer"""
|
|
try:
|
|
ws.call(rq.SaveReplayBuffer())
|
|
print("Replay buffer saved.")
|
|
asyncio.run_coroutine_threadsafe(
|
|
send_status_update("Replay saved"), websocket_loop
|
|
)
|
|
except Exception as e:
|
|
print(f"Error saving replay buffer: {e}")
|
|
|
|
|
|
def start_mic_listener():
|
|
"""Listen for microphone input to trigger replay capture using sounddevice and whisper, with volume threshold"""
|
|
import sounddevice as sd
|
|
import numpy as np
|
|
import whisper
|
|
import scipy.io.wavfile
|
|
import concurrent.futures
|
|
|
|
whisper_model = whisper.load_model(
|
|
"base.en"
|
|
) # You can choose "tiny", "base", "small", etc.
|
|
|
|
print(
|
|
"Microphone listener started. Say one of the activation, stop, or replay last commands."
|
|
)
|
|
print(f"Activation commands: {ACTIVATION_COMMANDS}")
|
|
print(f"Stop commands: {STOP_COMMANDS}")
|
|
print(f"Replay last commands: {REPLAY_LAST_COMMANDS}")
|
|
print(f"Clear replays commands: {CLEAR_REPLAYS_COMMANDS}")
|
|
|
|
# Calibration phase
|
|
print("Calibrating ambient volume, please be quiet...")
|
|
ambient_audio = sd.rec(
|
|
int(CALIBRATION_DURATION * SAMPLE_RATE),
|
|
samplerate=SAMPLE_RATE,
|
|
channels=1,
|
|
dtype="int16",
|
|
)
|
|
sd.wait()
|
|
ambient_rms = np.sqrt(np.mean(np.square(ambient_audio.astype(np.float32))))
|
|
threshold = ambient_rms * 4 # You can adjust multiplier for sensitivity
|
|
print(f"Ambient RMS: {ambient_rms:.2f}, threshold set to: {threshold:.2f}")
|
|
|
|
while True:
|
|
print("Listening for command...")
|
|
audio = sd.rec(
|
|
int(DURATION * SAMPLE_RATE),
|
|
samplerate=SAMPLE_RATE,
|
|
channels=1,
|
|
dtype="int16",
|
|
)
|
|
sd.wait()
|
|
rms = np.sqrt(np.mean(np.square(audio.astype(np.float32))))
|
|
print(f"Recorded RMS: {rms:.2f}")
|
|
|
|
if rms < threshold:
|
|
print("No speech detected (below threshold).")
|
|
continue
|
|
|
|
wav_path = "temp_mic_audio.wav"
|
|
scipy.io.wavfile.write(wav_path, SAMPLE_RATE, audio)
|
|
|
|
try:
|
|
print("Transcribing audio with Whisper...")
|
|
# Transcribe using Whisper with 5 second timeout
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
|
future = executor.submit(whisper_model.transcribe, wav_path)
|
|
result = future.result(timeout=5)
|
|
command = result["text"].lower().strip()
|
|
print(f"Heard (Whisper): {command}")
|
|
|
|
if any(phrase in command for phrase in ACTIVATION_COMMANDS):
|
|
capture_replay()
|
|
elif any(phrase in command for phrase in STOP_COMMANDS):
|
|
asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop)
|
|
print("Stop replay command triggered")
|
|
is_replaying.clear() # Ensure replay state is cleared on manual stop
|
|
elif any(phrase in command for phrase in REPLAY_LAST_COMMANDS):
|
|
if last_replay_filename:
|
|
show_replay(last_replay_filename)
|
|
print(f"Replaying last replay: {last_replay_filename}")
|
|
else:
|
|
print("No replay available to replay last.")
|
|
elif any(phrase in command for phrase in CLEAR_REPLAYS_COMMANDS):
|
|
clear_all_replays(MOVETO_DIR)
|
|
print("All replays cleared.")
|
|
except concurrent.futures.TimeoutError:
|
|
print("Whisper transcription timed out (5 seconds).")
|
|
except Exception as e:
|
|
print(f"Whisper transcription error: {e}")
|
|
|
|
|
|
def start_websocket_server():
|
|
"""Start WebSocket server for Electron communication"""
|
|
global websocket_loop
|
|
websocket_loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(websocket_loop)
|
|
|
|
start_server = websockets.serve(websocket_handler, "localhost", WEBSOCKET_PORT)
|
|
print(f"WebSocket server started on ws://localhost:{WEBSOCKET_PORT}")
|
|
|
|
websocket_loop.run_until_complete(start_server)
|
|
websocket_loop.run_forever()
|
|
|
|
|
|
def handle_command(command):
|
|
"""Process a command string as if it was spoken."""
|
|
global is_replaying
|
|
command = command.lower().strip()
|
|
if any(phrase in command for phrase in ACTIVATION_COMMANDS):
|
|
capture_replay()
|
|
is_replaying.set()
|
|
elif any(phrase in command for phrase in STOP_COMMANDS):
|
|
asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop)
|
|
print("Stop replay command triggered")
|
|
is_replaying.clear() # Ensure replay state is cleared on manual stop
|
|
elif any(phrase in command for phrase in REPLAY_LAST_COMMANDS):
|
|
if last_replay_filename:
|
|
show_replay(last_replay_filename)
|
|
print(f"Replaying last replay: {last_replay_filename}")
|
|
is_replaying.set()
|
|
else:
|
|
print("No replay available to replay last.")
|
|
elif any(phrase in command for phrase in CLEAR_REPLAYS_COMMANDS):
|
|
clear_all_replays(MOVETO_DIR)
|
|
print("All replays cleared.")
|
|
|
|
|
|
def start_hotkey_listener():
|
|
"""Listen for hotkey globally and toggle replay/abort."""
|
|
try:
|
|
import keyboard
|
|
except ImportError:
|
|
print("keyboard library not installed; hotkey feature disabled.")
|
|
return
|
|
|
|
print(f"Global hotkey listener enabled. Press '{HOTKEY['key']}' anywhere to toggle replay.")
|
|
while True:
|
|
keyboard.wait(HOTKEY["key"])
|
|
if is_replaying.is_set():
|
|
# Always send stop command and clear state, even if already expired
|
|
print(f"Global hotkey '{HOTKEY['key']}' pressed! Aborting replay.")
|
|
asyncio.run_coroutine_threadsafe(send_stop_command(), websocket_loop)
|
|
is_replaying.clear() # Ensure replay state is cleared on manual stop
|
|
else:
|
|
print(f"Global hotkey '{HOTKEY['key']}' pressed! Simulating 'instant replay' command.")
|
|
handle_command("instant replay")
|
|
time.sleep(0.5) # Prevent rapid retrigger
|
|
|
|
|
|
|
|
def main():
|
|
set_process_priority()
|
|
|
|
# Check if at least one input method is enabled
|
|
if not VOICE_ENABLED and not HOTKEY["enabled"]:
|
|
print(
|
|
"ERROR: Both voice recognition and hotkey are disabled. Enable at least one input method."
|
|
)
|
|
exit(1)
|
|
|
|
# Start WebSocket server in a separate thread
|
|
websocket_thread = threading.Thread(target=start_websocket_server, daemon=True)
|
|
websocket_thread.start()
|
|
|
|
# Start monitoring directory (create another thread)
|
|
monitor_thread = threading.Thread(
|
|
target=monitor_directory, args=(WATCH_DIR,), daemon=True
|
|
)
|
|
monitor_thread.start()
|
|
|
|
# Start microphone listener in a separate thread if enabled
|
|
if VOICE_ENABLED:
|
|
mic_thread = threading.Thread(target=start_mic_listener, daemon=True)
|
|
mic_thread.start()
|
|
|
|
# Start hotkey listener if enabled
|
|
if HOTKEY["enabled"]:
|
|
hotkey_thread = threading.Thread(target=start_hotkey_listener, daemon=True)
|
|
hotkey_thread.start()
|
|
|
|
# Keep main thread alive, and kill child processes on exit
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down...")
|
|
|
|
if __name__ == "__main__":
|
|
main() |