Files
stream-shot/main.py

176 lines
5.2 KiB
Python

import cv2
import yt_dlp
import numpy as np
import json
def capture_stream_snapshot(youtube_url, output_file="snapshot.jpg", toBuffer=False):
# 1. Configure yt-dlp to get the direct stream URL
ydl_opts = {
"format": "best",
"quiet": True,
"no_warnings": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(youtube_url, download=False)
stream_url = info["url"]
except Exception as e:
print(f"Error extracting info: {e}")
return None
# 2. Use OpenCV to capture a frame from the stream URL
try:
cap = cv2.VideoCapture(stream_url)
except Exception as e:
print(f"Error opening VideoCapture: {e}")
return None
if not cap.isOpened():
print("Error: Could not open video stream.")
return None if toBuffer else None
# Read a single frame
success, frame = cap.read()
if success:
if toBuffer:
# Encode frame as JPEG to memory buffer
ret, buf = cv2.imencode(".jpg", frame)
if ret:
print("Snapshot captured to buffer.")
cap.release()
return buf.tobytes()
else:
print("Error: Could not encode frame to buffer.")
cap.release()
return None
else:
# 3. Save the frame as an image
cv2.imwrite(output_file, frame)
print(f"Snapshot saved to {output_file}")
else:
print("Error: Could not read frame from stream.")
# Cleanup
cap.release()
# Shsf Handler
def main(args):
route = args.get("route")
if route == "snapshot":
url = args.get("body", "")
try:
url = json.loads(url)
except json.JSONDecodeError:
print("Error: Invalid JSON input.")
return {
"_shsf": "v2",
"_code": 400,
"_res": {"message": "Invalid JSON input."},
"_headers": {
"Content-Type": "application/json",
},
}
url = url.get("url", "")
if not url:
print("Error: URL not provided.")
return {
"_shsf": "v2",
"_code": 400,
"_res": {"message": "URL not provided."},
"_headers": {
"Content-Type": "application/json",
},
}
try:
capture_stream_snapshot(url, output_file="/tmp/snapshot.jpg")
content = None
with open("/tmp/snapshot.jpg", "rb") as f:
content = f.read()
except Exception as e:
print(f"Error: {e}")
return {
"_shsf": "v2",
"_code": 500,
"_res": {"message": f"Could not capture snapshot: {str(e)}"},
"_headers": {
"Content-Type": "application/json",
},
}
if content is None:
print("Error: Could not capture snapshot.")
return {
"_shsf": "v2",
"_code": 500,
"_res": {"message": "Could not capture snapshot."},
"_headers": {
"Content-Type": "application/json",
},
}
# Convert content to base64 for JSON response
import base64
encoded_content = base64.b64encode(content).decode('utf-8')
return {
"_shsf": "v2",
"_code": 200,
"_res": {
"message": "done",
"buffer": encoded_content
},
"_headers": {
"Content-Type": "application/json",
},
}
elif route == "health":
return {
"_shsf": "v2",
"_code": 200,
"_res": {"message": "Service is healthy."},
"_headers": {
"Content-Type": "application/json",
},
}
elif route == "default": # UI Route
try:
content = ""
with open("/app/ui.html", "r") as f:
content = f.read()
except FileNotFoundError:
print("Error: /app/ui.html not found.")
return {
"_shsf": "v2",
"_code": 404,
"_res": {"message": "UI file not found."},
"_headers": {
"Content-Type": "application/json",
},
}
except Exception as e:
print(f"Error loading UI: {e}")
return {
"_shsf": "v2",
"_code": 500,
"_res": {"message": f"Server error: {str(e)}"},
"_headers": {
"Content-Type": "application/json",
},
}
return {
"_shsf": "v2",
"_code": 200,
"_res": content,
"_headers": {
"Content-Type": "text/html",
},
}
if __name__ == "__main__":
print("omg im on main")
capture_stream_snapshot("https://www.youtube.com/watch?v=n15V_fCsl_c", output_file="snapshot.jpg")