import cv2 import yt_dlp import numpy as np import json import time import os import base64 # Filesystem-based cache for serverless environment CACHE_DIR = "/tmp/snapshot_cache" CACHE_DURATION = 10 # seconds def get_cache_path(url): import hashlib url_hash = hashlib.md5(url.encode('utf-8')).hexdigest() return os.path.join(CACHE_DIR, f"{url_hash}.jpg") def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=False): if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR, exist_ok=True) cache_path = get_cache_path(youtube_url) # Check cache first if os.path.exists(cache_path): mtime = os.path.getmtime(cache_path) if time.time() - mtime < CACHE_DURATION: print(f"Returning cached snapshot for {youtube_url} from {cache_path}") if toBuffer: with open(cache_path, "rb") as f: return f.read() else: import shutil shutil.copy2(cache_path, output_file) return True # 1. Configure yt-dlp to get the direct stream URL ydl_opts = { "format": "best", "quiet": True, "no_warnings": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = ydl.extract_info(youtube_url, download=False) stream_url = info["url"] except Exception as e: print(f"Error extracting info: {e}") return None # 2. Use OpenCV to capture a frame from the stream URL try: cap = cv2.VideoCapture(stream_url) except Exception as e: print(f"Error opening VideoCapture: {e}") return None if not cap.isOpened(): print("Error: Could not open video stream.") return None # Read a single frame success, frame = cap.read() result = None if success: # Save to cache first cv2.imwrite(cache_path, frame) if toBuffer: # Encode frame as JPEG to memory buffer ret, buf = cv2.imencode(".jpg", frame) if ret: print("Snapshot captured to buffer.") result = buf.tobytes() else: print("Error: Could not encode frame to buffer.") else: # 3. Save the frame as an image cv2.imwrite(output_file, frame) print(f"Snapshot saved to {output_file}") result = True else: print("Error: Could not read frame from stream.") # Cleanup cap.release() return result # Shsf Handler def main(args): route = args.get("route") if route == "snapshot": url_input = args.get("body", "") try: body_json = json.loads(url_input) except json.JSONDecodeError: print("Error: Invalid JSON input.") return { "_shsf": "v2", "_code": 400, "_res": {"message": "Invalid JSON input."}, "_headers": { "Content-Type": "application/json", }, } url = body_json.get("url", "") if not url: print("Error: URL not provided.") return { "_shsf": "v2", "_code": 400, "_res": {"message": "URL not provided."}, "_headers": { "Content-Type": "application/json", }, } if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR, exist_ok=True) cache_path = get_cache_path(url) if os.path.exists(cache_path): if time.time() - os.path.getmtime(cache_path) < CACHE_DURATION: print(f"Serving {url} from filesystem cache.") with open(cache_path, "rb") as f: encoded_content = base64.b64encode(f.read()).decode('utf-8') return { "_shsf": "v2", "_code": 200, "_res": { "message": "done (cached)", "buffer": encoded_content }, "_headers": { "Content-Type": "application/json", }, } try: # Use toBuffer=True to get the bytes directly content = capture_stream_snapshot(url, toBuffer=True) except Exception as e: print(f"Error: {e}") return { "_shsf": "v2", "_code": 500, "_res": {"message": f"Could not capture snapshot: {str(e)}"}, "_headers": { "Content-Type": "application/json", }, } if content is None: print("Error: Could not capture snapshot.") return { "_shsf": "v2", "_code": 500, "_res": {"message": "Could not capture snapshot."}, "_headers": { "Content-Type": "application/json", }, } # The content returned is already bytes if not from cache encoded_content = base64.b64encode(content).decode('utf-8') return { "_shsf": "v2", "_code": 200, "_res": { "message": "done", "buffer": encoded_content }, "_headers": { "Content-Type": "application/json", }, } elif route == "health": return { "_shsf": "v2", "_code": 200, "_res": {"message": "Service is healthy."}, "_headers": { "Content-Type": "application/json", }, } elif route == "default": # UI Route try: content = "" with open("/app/ui.html", "r") as f: content = f.read() except FileNotFoundError: print("Error: /app/ui.html not found.") return { "_shsf": "v2", "_code": 404, "_res": {"message": "UI file not found."}, "_headers": { "Content-Type": "application/json", }, } except Exception as e: print(f"Error loading UI: {e}") return { "_shsf": "v2", "_code": 500, "_res": {"message": f"Server error: {str(e)}"}, "_headers": { "Content-Type": "application/json", }, } return { "_shsf": "v2", "_code": 200, "_res": content, "_headers": { "Content-Type": "text/html", }, } if __name__ == "__main__": print("omg im on main") capture_stream_snapshot("https://www.youtube.com/watch?v=n15V_fCsl_c", output_file="snapshot.jpg")