feat: Add snapshot caching (10s duration)
All checks were successful
Lint and Syntax Check / build (pull_request) Successful in 6s
All checks were successful
Lint and Syntax Check / build (pull_request) Successful in 6s
This commit is contained in:
69
main.py
69
main.py
@@ -2,9 +2,23 @@ import cv2
|
|||||||
import yt_dlp
|
import yt_dlp
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import json
|
import json
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
import base64
|
||||||
|
|
||||||
|
# Simple in-memory cache
|
||||||
|
# Format: {url: {"time": timestamp, "buffer": base64_string}}
|
||||||
|
cache = {}
|
||||||
|
CACHE_DURATION = 10 # seconds
|
||||||
|
|
||||||
def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=False):
|
def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=False):
|
||||||
|
# Check cache first
|
||||||
|
if youtube_url in cache:
|
||||||
|
cached_item = cache[youtube_url]
|
||||||
|
if time.time() - cached_item["time"] < CACHE_DURATION:
|
||||||
|
print(f"Returning cached snapshot for {youtube_url}")
|
||||||
|
return base64.b64decode(cached_item["buffer"]) if toBuffer else True
|
||||||
|
|
||||||
# 1. Configure yt-dlp to get the direct stream URL
|
# 1. Configure yt-dlp to get the direct stream URL
|
||||||
ydl_opts = {
|
ydl_opts = {
|
||||||
"format": "best",
|
"format": "best",
|
||||||
@@ -29,41 +43,54 @@ def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=Fa
|
|||||||
|
|
||||||
if not cap.isOpened():
|
if not cap.isOpened():
|
||||||
print("Error: Could not open video stream.")
|
print("Error: Could not open video stream.")
|
||||||
return None if toBuffer else None
|
return None
|
||||||
|
|
||||||
# Read a single frame
|
# Read a single frame
|
||||||
success, frame = cap.read()
|
success, frame = cap.read()
|
||||||
|
|
||||||
|
result = None
|
||||||
if success:
|
if success:
|
||||||
if toBuffer:
|
if toBuffer:
|
||||||
# Encode frame as JPEG to memory buffer
|
# Encode frame as JPEG to memory buffer
|
||||||
ret, buf = cv2.imencode(".jpg", frame)
|
ret, buf = cv2.imencode(".jpg", frame)
|
||||||
if ret:
|
if ret:
|
||||||
print("Snapshot captured to buffer.")
|
print("Snapshot captured to buffer.")
|
||||||
cap.release()
|
result = buf.tobytes()
|
||||||
return buf.tobytes()
|
|
||||||
else:
|
else:
|
||||||
print("Error: Could not encode frame to buffer.")
|
print("Error: Could not encode frame to buffer.")
|
||||||
cap.release()
|
|
||||||
return None
|
|
||||||
else:
|
else:
|
||||||
# 3. Save the frame as an image
|
# 3. Save the frame as an image
|
||||||
cv2.imwrite(output_file, frame)
|
cv2.imwrite(output_file, frame)
|
||||||
print(f"Snapshot saved to {output_file}")
|
print(f"Snapshot saved to {output_file}")
|
||||||
|
result = True
|
||||||
|
|
||||||
|
# Update cache if successful
|
||||||
|
if result:
|
||||||
|
if toBuffer:
|
||||||
|
buf_to_cache = result
|
||||||
|
else:
|
||||||
|
with open(output_file, "rb") as f:
|
||||||
|
buf_to_cache = f.read()
|
||||||
|
|
||||||
|
cache[youtube_url] = {
|
||||||
|
"time": time.time(),
|
||||||
|
"buffer": base64.b64encode(buf_to_cache).decode('utf-8')
|
||||||
|
}
|
||||||
else:
|
else:
|
||||||
print("Error: Could not read frame from stream.")
|
print("Error: Could not read frame from stream.")
|
||||||
|
|
||||||
# Cleanup
|
# Cleanup
|
||||||
cap.release()
|
cap.release()
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
# Shsf Handler
|
# Shsf Handler
|
||||||
def main(args):
|
def main(args):
|
||||||
route = args.get("route")
|
route = args.get("route")
|
||||||
if route == "snapshot":
|
if route == "snapshot":
|
||||||
url = args.get("body", "")
|
url_input = args.get("body", "")
|
||||||
try:
|
try:
|
||||||
url = json.loads(url)
|
body_json = json.loads(url_input)
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
print("Error: Invalid JSON input.")
|
print("Error: Invalid JSON input.")
|
||||||
return {
|
return {
|
||||||
@@ -74,7 +101,7 @@ def main(args):
|
|||||||
"Content-Type": "application/json",
|
"Content-Type": "application/json",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
url = url.get("url", "")
|
url = body_json.get("url", "")
|
||||||
if not url:
|
if not url:
|
||||||
print("Error: URL not provided.")
|
print("Error: URL not provided.")
|
||||||
return {
|
return {
|
||||||
@@ -86,11 +113,26 @@ def main(args):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Check cache explicitly in main to avoid redundant toBuffer logic if possible
|
||||||
|
if url in cache:
|
||||||
|
cached_item = cache[url]
|
||||||
|
if time.time() - cached_item["time"] < CACHE_DURATION:
|
||||||
|
print(f"Serving {url} from cache.")
|
||||||
|
return {
|
||||||
|
"_shsf": "v2",
|
||||||
|
"_code": 200,
|
||||||
|
"_res": {
|
||||||
|
"message": "done (cached)",
|
||||||
|
"buffer": cached_item["buffer"]
|
||||||
|
},
|
||||||
|
"_headers": {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
capture_stream_snapshot(url, output_file="/tmp/snapshot.jpg")
|
# Use toBuffer=True to get the bytes directly
|
||||||
content = None
|
content = capture_stream_snapshot(url, toBuffer=True)
|
||||||
with open("/tmp/snapshot.jpg", "rb") as f:
|
|
||||||
content = f.read()
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error: {e}")
|
print(f"Error: {e}")
|
||||||
return {
|
return {
|
||||||
@@ -113,8 +155,7 @@ def main(args):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
# Convert content to base64 for JSON response
|
# The content returned is already bytes if not from cache
|
||||||
import base64
|
|
||||||
encoded_content = base64.b64encode(content).decode('utf-8')
|
encoded_content = base64.b64encode(content).decode('utf-8')
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
Reference in New Issue
Block a user