From 1b246addfff1865188c663b892fe29dad006bc7a Mon Sep 17 00:00:00 2001 From: Luna Date: Thu, 2 Apr 2026 14:09:58 +0200 Subject: [PATCH 1/2] feat: Add snapshot caching (10s duration) --- main.py | 69 +++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 14 deletions(-) diff --git a/main.py b/main.py index 0317829..c862e84 100644 --- a/main.py +++ b/main.py @@ -2,9 +2,23 @@ import cv2 import yt_dlp import numpy as np import json +import time +import os +import base64 +# Simple in-memory cache +# Format: {url: {"time": timestamp, "buffer": base64_string}} +cache = {} +CACHE_DURATION = 10 # seconds def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=False): + # Check cache first + if youtube_url in cache: + cached_item = cache[youtube_url] + if time.time() - cached_item["time"] < CACHE_DURATION: + print(f"Returning cached snapshot for {youtube_url}") + return base64.b64decode(cached_item["buffer"]) if toBuffer else True + # 1. Configure yt-dlp to get the direct stream URL ydl_opts = { "format": "best", @@ -29,41 +43,54 @@ def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=Fa if not cap.isOpened(): print("Error: Could not open video stream.") - return None if toBuffer else None + return None # Read a single frame success, frame = cap.read() + result = None if success: if toBuffer: # Encode frame as JPEG to memory buffer ret, buf = cv2.imencode(".jpg", frame) if ret: print("Snapshot captured to buffer.") - cap.release() - return buf.tobytes() + result = buf.tobytes() else: print("Error: Could not encode frame to buffer.") - cap.release() - return None else: # 3. Save the frame as an image cv2.imwrite(output_file, frame) print(f"Snapshot saved to {output_file}") + result = True + + # Update cache if successful + if result: + if toBuffer: + buf_to_cache = result + else: + with open(output_file, "rb") as f: + buf_to_cache = f.read() + + cache[youtube_url] = { + "time": time.time(), + "buffer": base64.b64encode(buf_to_cache).decode('utf-8') + } else: print("Error: Could not read frame from stream.") # Cleanup cap.release() + return result # Shsf Handler def main(args): route = args.get("route") if route == "snapshot": - url = args.get("body", "") + url_input = args.get("body", "") try: - url = json.loads(url) + body_json = json.loads(url_input) except json.JSONDecodeError: print("Error: Invalid JSON input.") return { @@ -74,7 +101,7 @@ def main(args): "Content-Type": "application/json", }, } - url = url.get("url", "") + url = body_json.get("url", "") if not url: print("Error: URL not provided.") return { @@ -86,11 +113,26 @@ def main(args): }, } + # Check cache explicitly in main to avoid redundant toBuffer logic if possible + if url in cache: + cached_item = cache[url] + if time.time() - cached_item["time"] < CACHE_DURATION: + print(f"Serving {url} from cache.") + return { + "_shsf": "v2", + "_code": 200, + "_res": { + "message": "done (cached)", + "buffer": cached_item["buffer"] + }, + "_headers": { + "Content-Type": "application/json", + }, + } + try: - capture_stream_snapshot(url, output_file="/tmp/snapshot.jpg") - content = None - with open("/tmp/snapshot.jpg", "rb") as f: - content = f.read() + # Use toBuffer=True to get the bytes directly + content = capture_stream_snapshot(url, toBuffer=True) except Exception as e: print(f"Error: {e}") return { @@ -113,8 +155,7 @@ def main(args): }, } - # Convert content to base64 for JSON response - import base64 + # The content returned is already bytes if not from cache encoded_content = base64.b64encode(content).decode('utf-8') return { From a84da6dde86d1fc6ccb932f5248ead3a7c89cadb Mon Sep 17 00:00:00 2001 From: Luna Date: Thu, 2 Apr 2026 14:12:00 +0200 Subject: [PATCH 2/2] feat: Use filesystem cache for serverless compatibility --- main.py | 63 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/main.py b/main.py index c862e84..a6c3229 100644 --- a/main.py +++ b/main.py @@ -6,18 +6,33 @@ import time import os import base64 -# Simple in-memory cache -# Format: {url: {"time": timestamp, "buffer": base64_string}} -cache = {} +# Filesystem-based cache for serverless environment +CACHE_DIR = "/tmp/snapshot_cache" CACHE_DURATION = 10 # seconds +def get_cache_path(url): + import hashlib + url_hash = hashlib.md5(url.encode('utf-8')).hexdigest() + return os.path.join(CACHE_DIR, f"{url_hash}.jpg") + def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=False): + if not os.path.exists(CACHE_DIR): + os.makedirs(CACHE_DIR, exist_ok=True) + + cache_path = get_cache_path(youtube_url) + # Check cache first - if youtube_url in cache: - cached_item = cache[youtube_url] - if time.time() - cached_item["time"] < CACHE_DURATION: - print(f"Returning cached snapshot for {youtube_url}") - return base64.b64decode(cached_item["buffer"]) if toBuffer else True + if os.path.exists(cache_path): + mtime = os.path.getmtime(cache_path) + if time.time() - mtime < CACHE_DURATION: + print(f"Returning cached snapshot for {youtube_url} from {cache_path}") + if toBuffer: + with open(cache_path, "rb") as f: + return f.read() + else: + import shutil + shutil.copy2(cache_path, output_file) + return True # 1. Configure yt-dlp to get the direct stream URL ydl_opts = { @@ -50,6 +65,9 @@ def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=Fa result = None if success: + # Save to cache first + cv2.imwrite(cache_path, frame) + if toBuffer: # Encode frame as JPEG to memory buffer ret, buf = cv2.imencode(".jpg", frame) @@ -63,19 +81,6 @@ def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=Fa cv2.imwrite(output_file, frame) print(f"Snapshot saved to {output_file}") result = True - - # Update cache if successful - if result: - if toBuffer: - buf_to_cache = result - else: - with open(output_file, "rb") as f: - buf_to_cache = f.read() - - cache[youtube_url] = { - "time": time.time(), - "buffer": base64.b64encode(buf_to_cache).decode('utf-8') - } else: print("Error: Could not read frame from stream.") @@ -113,17 +118,21 @@ def main(args): }, } - # Check cache explicitly in main to avoid redundant toBuffer logic if possible - if url in cache: - cached_item = cache[url] - if time.time() - cached_item["time"] < CACHE_DURATION: - print(f"Serving {url} from cache.") + if not os.path.exists(CACHE_DIR): + os.makedirs(CACHE_DIR, exist_ok=True) + + cache_path = get_cache_path(url) + if os.path.exists(cache_path): + if time.time() - os.path.getmtime(cache_path) < CACHE_DURATION: + print(f"Serving {url} from filesystem cache.") + with open(cache_path, "rb") as f: + encoded_content = base64.b64encode(f.read()).decode('utf-8') return { "_shsf": "v2", "_code": 200, "_res": { "message": "done (cached)", - "buffer": cached_item["buffer"] + "buffer": encoded_content }, "_headers": { "Content-Type": "application/json",