feat: Use filesystem cache for serverless compatibility
All checks were successful
Lint and Syntax Check / build (pull_request) Successful in 6s

This commit is contained in:
2026-04-02 14:12:00 +02:00
parent 1b246addff
commit a84da6dde8

63
main.py
View File

@@ -6,18 +6,33 @@ import time
import os import os
import base64 import base64
# Simple in-memory cache # Filesystem-based cache for serverless environment
# Format: {url: {"time": timestamp, "buffer": base64_string}} CACHE_DIR = "/tmp/snapshot_cache"
cache = {}
CACHE_DURATION = 10 # seconds CACHE_DURATION = 10 # seconds
def get_cache_path(url):
import hashlib
url_hash = hashlib.md5(url.encode('utf-8')).hexdigest()
return os.path.join(CACHE_DIR, f"{url_hash}.jpg")
def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=False): def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=False):
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR, exist_ok=True)
cache_path = get_cache_path(youtube_url)
# Check cache first # Check cache first
if youtube_url in cache: if os.path.exists(cache_path):
cached_item = cache[youtube_url] mtime = os.path.getmtime(cache_path)
if time.time() - cached_item["time"] < CACHE_DURATION: if time.time() - mtime < CACHE_DURATION:
print(f"Returning cached snapshot for {youtube_url}") print(f"Returning cached snapshot for {youtube_url} from {cache_path}")
return base64.b64decode(cached_item["buffer"]) if toBuffer else True if toBuffer:
with open(cache_path, "rb") as f:
return f.read()
else:
import shutil
shutil.copy2(cache_path, output_file)
return True
# 1. Configure yt-dlp to get the direct stream URL # 1. Configure yt-dlp to get the direct stream URL
ydl_opts = { ydl_opts = {
@@ -50,6 +65,9 @@ def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=Fa
result = None result = None
if success: if success:
# Save to cache first
cv2.imwrite(cache_path, frame)
if toBuffer: if toBuffer:
# Encode frame as JPEG to memory buffer # Encode frame as JPEG to memory buffer
ret, buf = cv2.imencode(".jpg", frame) ret, buf = cv2.imencode(".jpg", frame)
@@ -63,19 +81,6 @@ def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=Fa
cv2.imwrite(output_file, frame) cv2.imwrite(output_file, frame)
print(f"Snapshot saved to {output_file}") print(f"Snapshot saved to {output_file}")
result = True result = True
# Update cache if successful
if result:
if toBuffer:
buf_to_cache = result
else:
with open(output_file, "rb") as f:
buf_to_cache = f.read()
cache[youtube_url] = {
"time": time.time(),
"buffer": base64.b64encode(buf_to_cache).decode('utf-8')
}
else: else:
print("Error: Could not read frame from stream.") print("Error: Could not read frame from stream.")
@@ -113,17 +118,21 @@ def main(args):
}, },
} }
# Check cache explicitly in main to avoid redundant toBuffer logic if possible if not os.path.exists(CACHE_DIR):
if url in cache: os.makedirs(CACHE_DIR, exist_ok=True)
cached_item = cache[url]
if time.time() - cached_item["time"] < CACHE_DURATION: cache_path = get_cache_path(url)
print(f"Serving {url} from cache.") if os.path.exists(cache_path):
if time.time() - os.path.getmtime(cache_path) < CACHE_DURATION:
print(f"Serving {url} from filesystem cache.")
with open(cache_path, "rb") as f:
encoded_content = base64.b64encode(f.read()).decode('utf-8')
return { return {
"_shsf": "v2", "_shsf": "v2",
"_code": 200, "_code": 200,
"_res": { "_res": {
"message": "done (cached)", "message": "done (cached)",
"buffer": cached_item["buffer"] "buffer": encoded_content
}, },
"_headers": { "_headers": {
"Content-Type": "application/json", "Content-Type": "application/json",