From 1b246addfff1865188c663b892fe29dad006bc7a Mon Sep 17 00:00:00 2001 From: Luna Date: Thu, 2 Apr 2026 14:09:58 +0200 Subject: [PATCH] feat: Add snapshot caching (10s duration) --- main.py | 69 +++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 14 deletions(-) diff --git a/main.py b/main.py index 0317829..c862e84 100644 --- a/main.py +++ b/main.py @@ -2,9 +2,23 @@ import cv2 import yt_dlp import numpy as np import json +import time +import os +import base64 +# Simple in-memory cache +# Format: {url: {"time": timestamp, "buffer": base64_string}} +cache = {} +CACHE_DURATION = 10 # seconds def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=False): + # Check cache first + if youtube_url in cache: + cached_item = cache[youtube_url] + if time.time() - cached_item["time"] < CACHE_DURATION: + print(f"Returning cached snapshot for {youtube_url}") + return base64.b64decode(cached_item["buffer"]) if toBuffer else True + # 1. Configure yt-dlp to get the direct stream URL ydl_opts = { "format": "best", @@ -29,41 +43,54 @@ def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=Fa if not cap.isOpened(): print("Error: Could not open video stream.") - return None if toBuffer else None + return None # Read a single frame success, frame = cap.read() + result = None if success: if toBuffer: # Encode frame as JPEG to memory buffer ret, buf = cv2.imencode(".jpg", frame) if ret: print("Snapshot captured to buffer.") - cap.release() - return buf.tobytes() + result = buf.tobytes() else: print("Error: Could not encode frame to buffer.") - cap.release() - return None else: # 3. Save the frame as an image cv2.imwrite(output_file, frame) print(f"Snapshot saved to {output_file}") + result = True + + # Update cache if successful + if result: + if toBuffer: + buf_to_cache = result + else: + with open(output_file, "rb") as f: + buf_to_cache = f.read() + + cache[youtube_url] = { + "time": time.time(), + "buffer": base64.b64encode(buf_to_cache).decode('utf-8') + } else: print("Error: Could not read frame from stream.") # Cleanup cap.release() + return result # Shsf Handler def main(args): route = args.get("route") if route == "snapshot": - url = args.get("body", "") + url_input = args.get("body", "") try: - url = json.loads(url) + body_json = json.loads(url_input) except json.JSONDecodeError: print("Error: Invalid JSON input.") return { @@ -74,7 +101,7 @@ def main(args): "Content-Type": "application/json", }, } - url = url.get("url", "") + url = body_json.get("url", "") if not url: print("Error: URL not provided.") return { @@ -86,11 +113,26 @@ def main(args): }, } + # Check cache explicitly in main to avoid redundant toBuffer logic if possible + if url in cache: + cached_item = cache[url] + if time.time() - cached_item["time"] < CACHE_DURATION: + print(f"Serving {url} from cache.") + return { + "_shsf": "v2", + "_code": 200, + "_res": { + "message": "done (cached)", + "buffer": cached_item["buffer"] + }, + "_headers": { + "Content-Type": "application/json", + }, + } + try: - capture_stream_snapshot(url, output_file="/tmp/snapshot.jpg") - content = None - with open("/tmp/snapshot.jpg", "rb") as f: - content = f.read() + # Use toBuffer=True to get the bytes directly + content = capture_stream_snapshot(url, toBuffer=True) except Exception as e: print(f"Error: {e}") return { @@ -113,8 +155,7 @@ def main(args): }, } - # Convert content to base64 for JSON response - import base64 + # The content returned is already bytes if not from cache encoded_content = base64.b64encode(content).decode('utf-8') return {