import cv2 import yt_dlp import numpy as np import json import time import base64 _snapshot_cache = {} CACHE_TTL = 10 def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=False): # 1. Configure yt-dlp to get the direct stream URL ydl_opts = { "format": "best", "quiet": True, "no_warnings": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = ydl.extract_info(youtube_url, download=False) stream_url = info["url"] except Exception as e: print(f"Error extracting info: {e}") return None # 2. Use OpenCV to capture a frame from the stream URL try: cap = cv2.VideoCapture(stream_url) except Exception as e: print(f"Error opening VideoCapture: {e}") return None if not cap.isOpened(): print("Error: Could not open video stream.") return None # Read a single frame success, frame = cap.read() if success: if toBuffer: # Encode frame as JPEG to memory buffer ret, buf = cv2.imencode(".jpg", frame) if ret: cap.release() return buf.tobytes() else: cap.release() return None else: # 3. Save the frame as an image cv2.imwrite(output_file, frame) cap.release() return True else: print("Error: Could not read frame from stream.") cap.release() return None # Shsf Handler def main(args): route = args.get("route", "") # Handle /app/{id}.jpg if route.startswith("app/") and route.endswith(".jpg"): url_id = route.split("/")[-1].replace(".jpg", "") now = time.time() if url_id in _snapshot_cache: ts, encoded = _snapshot_cache[url_id] if now - ts < CACHE_TTL: return { "_shsf": "v1", # Changed to v1 for simpler binary response if v2 is picky "_code": 200, "_res": encoded, "_is_base64": True, "_headers": { "Content-Type": "image/jpeg", "X-Cache": "HIT" } } # Cache miss yt_url = f"https://www.youtube.com/watch?v={url_id}" image_bytes = capture_stream_snapshot(yt_url, toBuffer=True) if image_bytes: encoded_content = base64.b64encode(image_bytes).decode('utf-8') _snapshot_cache[url_id] = (now, encoded_content) return { "_shsf": "v1", "_code": 200, "_res": encoded_content, "_is_base64": True, "_headers": { "Content-Type": "image/jpeg", "X-Cache": "MISS" } } if route == "snapshot": url = args.get("body", "") try: url = json.loads(url) except json.JSONDecodeError: return { "_shsf": "v2", "_code": 400, "_res": {"message": "Invalid JSON input."}, "_headers": { "Content-Type": "application/json" }, } url = url.get("url", "") if not url: return { "_shsf": "v2", "_code": 400, "_res": {"message": "URL not provided."}, "_headers": { "Content-Type": "application/json" }, } try: image_bytes = capture_stream_snapshot(url, toBuffer=True) if image_bytes: encoded_content = base64.b64encode(image_bytes).decode('utf-8') return { "_shsf": "v2", "_code": 200, "_res": { "message": "done", "buffer": encoded_content }, "_headers": { "Content-Type": "application/json" }, } except Exception as e: return { "_shsf": "v2", "_code": 500, "_res": {"message": str(e)}, "_headers": { "Content-Type": "application/json" }, } return { "_shsf": "v2", "_code": 500, "_res": {"message": "Could not capture snapshot."}, "_headers": { "Content-Type": "application/json" }, } elif route == "health": return { "_shsf": "v2", "_code": 200, "_res": {"message": "Service is healthy."}, "cache_size": len(_snapshot_cache), "_headers": { "Content-Type": "application/json" }, } elif route == "default": # UI Route try: with open("/app/ui.html", "r") as f: content = f.read() return { "_shsf": "v2", "_code": 200, "_res": content, "_headers": { "Content-Type": "text/html" }, } except Exception as e: return { "_shsf": "v2", "_code": 404, "_res": {"message": "UI file not found or error."}, } if __name__ == "__main__": print("running stream-shot")