All checks were successful
Lint and Syntax Check / build (pull_request) Successful in 6s
226 lines
6.9 KiB
Python
226 lines
6.9 KiB
Python
import cv2
|
|
import yt_dlp
|
|
import numpy as np
|
|
import json
|
|
import time
|
|
import os
|
|
import base64
|
|
|
|
# Filesystem-based cache for serverless environment
|
|
CACHE_DIR = "/tmp/snapshot_cache"
|
|
CACHE_DURATION = 10 # seconds
|
|
|
|
def get_cache_path(url):
|
|
import hashlib
|
|
url_hash = hashlib.md5(url.encode('utf-8')).hexdigest()
|
|
return os.path.join(CACHE_DIR, f"{url_hash}.jpg")
|
|
|
|
def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=False):
|
|
if not os.path.exists(CACHE_DIR):
|
|
os.makedirs(CACHE_DIR, exist_ok=True)
|
|
|
|
cache_path = get_cache_path(youtube_url)
|
|
|
|
# Check cache first
|
|
if os.path.exists(cache_path):
|
|
mtime = os.path.getmtime(cache_path)
|
|
if time.time() - mtime < CACHE_DURATION:
|
|
print(f"Returning cached snapshot for {youtube_url} from {cache_path}")
|
|
if toBuffer:
|
|
with open(cache_path, "rb") as f:
|
|
return f.read()
|
|
else:
|
|
import shutil
|
|
shutil.copy2(cache_path, output_file)
|
|
return True
|
|
|
|
# 1. Configure yt-dlp to get the direct stream URL
|
|
ydl_opts = {
|
|
"format": "best",
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
try:
|
|
info = ydl.extract_info(youtube_url, download=False)
|
|
stream_url = info["url"]
|
|
except Exception as e:
|
|
print(f"Error extracting info: {e}")
|
|
return None
|
|
|
|
# 2. Use OpenCV to capture a frame from the stream URL
|
|
try:
|
|
cap = cv2.VideoCapture(stream_url)
|
|
except Exception as e:
|
|
print(f"Error opening VideoCapture: {e}")
|
|
return None
|
|
|
|
if not cap.isOpened():
|
|
print("Error: Could not open video stream.")
|
|
return None
|
|
|
|
# Read a single frame
|
|
success, frame = cap.read()
|
|
|
|
result = None
|
|
if success:
|
|
# Save to cache first
|
|
cv2.imwrite(cache_path, frame)
|
|
|
|
if toBuffer:
|
|
# Encode frame as JPEG to memory buffer
|
|
ret, buf = cv2.imencode(".jpg", frame)
|
|
if ret:
|
|
print("Snapshot captured to buffer.")
|
|
result = buf.tobytes()
|
|
else:
|
|
print("Error: Could not encode frame to buffer.")
|
|
else:
|
|
# 3. Save the frame as an image
|
|
cv2.imwrite(output_file, frame)
|
|
print(f"Snapshot saved to {output_file}")
|
|
result = True
|
|
else:
|
|
print("Error: Could not read frame from stream.")
|
|
|
|
# Cleanup
|
|
cap.release()
|
|
return result
|
|
|
|
|
|
# Shsf Handler
|
|
def main(args):
|
|
route = args.get("route")
|
|
if route == "snapshot":
|
|
url_input = args.get("body", "")
|
|
try:
|
|
body_json = json.loads(url_input)
|
|
except json.JSONDecodeError:
|
|
print("Error: Invalid JSON input.")
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 400,
|
|
"_res": {"message": "Invalid JSON input."},
|
|
"_headers": {
|
|
"Content-Type": "application/json",
|
|
},
|
|
}
|
|
url = body_json.get("url", "")
|
|
if not url:
|
|
print("Error: URL not provided.")
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 400,
|
|
"_res": {"message": "URL not provided."},
|
|
"_headers": {
|
|
"Content-Type": "application/json",
|
|
},
|
|
}
|
|
|
|
if not os.path.exists(CACHE_DIR):
|
|
os.makedirs(CACHE_DIR, exist_ok=True)
|
|
|
|
cache_path = get_cache_path(url)
|
|
if os.path.exists(cache_path):
|
|
if time.time() - os.path.getmtime(cache_path) < CACHE_DURATION:
|
|
print(f"Serving {url} from filesystem cache.")
|
|
with open(cache_path, "rb") as f:
|
|
encoded_content = base64.b64encode(f.read()).decode('utf-8')
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 200,
|
|
"_res": {
|
|
"message": "done (cached)",
|
|
"buffer": encoded_content
|
|
},
|
|
"_headers": {
|
|
"Content-Type": "application/json",
|
|
},
|
|
}
|
|
|
|
try:
|
|
# Use toBuffer=True to get the bytes directly
|
|
content = capture_stream_snapshot(url, toBuffer=True)
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 500,
|
|
"_res": {"message": f"Could not capture snapshot: {str(e)}"},
|
|
"_headers": {
|
|
"Content-Type": "application/json",
|
|
},
|
|
}
|
|
|
|
if content is None:
|
|
print("Error: Could not capture snapshot.")
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 500,
|
|
"_res": {"message": "Could not capture snapshot."},
|
|
"_headers": {
|
|
"Content-Type": "application/json",
|
|
},
|
|
}
|
|
|
|
# The content returned is already bytes if not from cache
|
|
encoded_content = base64.b64encode(content).decode('utf-8')
|
|
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 200,
|
|
"_res": {
|
|
"message": "done",
|
|
"buffer": encoded_content
|
|
},
|
|
"_headers": {
|
|
"Content-Type": "application/json",
|
|
},
|
|
}
|
|
elif route == "health":
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 200,
|
|
"_res": {"message": "Service is healthy."},
|
|
"_headers": {
|
|
"Content-Type": "application/json",
|
|
},
|
|
}
|
|
elif route == "default": # UI Route
|
|
try:
|
|
content = ""
|
|
with open("/app/ui.html", "r") as f:
|
|
content = f.read()
|
|
except FileNotFoundError:
|
|
print("Error: /app/ui.html not found.")
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 404,
|
|
"_res": {"message": "UI file not found."},
|
|
"_headers": {
|
|
"Content-Type": "application/json",
|
|
},
|
|
}
|
|
except Exception as e:
|
|
print(f"Error loading UI: {e}")
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 500,
|
|
"_res": {"message": f"Server error: {str(e)}"},
|
|
"_headers": {
|
|
"Content-Type": "application/json",
|
|
},
|
|
}
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 200,
|
|
"_res": content,
|
|
"_headers": {
|
|
"Content-Type": "text/html",
|
|
},
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
print("omg im on main")
|
|
capture_stream_snapshot("https://www.youtube.com/watch?v=n15V_fCsl_c", output_file="snapshot.jpg") |