refactor: standardize string formatting and improve code readability
Some checks failed
Python syntax & lint / syntax-lint (push) Failing after 7s

This commit is contained in:
Space-Banane
2026-04-06 23:03:21 +02:00
parent e95f915d86
commit b817f34607
2 changed files with 106 additions and 70 deletions

View File

@@ -16,29 +16,37 @@ URLS = [
URLS_JSON = os.environ.get("URLS_JSON", str(Path(__file__).parent / "urls.json")) URLS_JSON = os.environ.get("URLS_JSON", str(Path(__file__).parent / "urls.json"))
RESET = "\033[0m" RESET = "\033[0m"
BOLD = "\033[1m" BOLD = "\033[1m"
RED = "\033[91m" RED = "\033[91m"
GREEN = "\033[92m" GREEN = "\033[92m"
YELLOW = "\033[93m" YELLOW = "\033[93m"
CYAN = "\033[96m" CYAN = "\033[96m"
DIM = "\033[2m" DIM = "\033[2m"
def log(msg, color=RESET): def log(msg, color=RESET):
print(f"{color}{msg}{RESET}", flush=True) print(f"{color}{msg}{RESET}", flush=True)
def get_post_id(url): def get_post_id(url):
return url.rstrip("/").split("-")[-1] return url.rstrip("/").split("-")[-1]
def fetch_info(url): def fetch_info(url):
cmd = [ cmd = [
YTDLP, YTDLP,
"--dump-json", "--no-playlist", "--dump-json",
"--cookies", COOKIES, "--no-playlist",
"--extractor-args", "generic:impersonate", "--cookies",
url COOKIES,
"--extractor-args",
"generic:impersonate",
url,
] ]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30, cwd=Path(__file__).parent) result = subprocess.run(
cmd, capture_output=True, text=True, timeout=30, cwd=Path(__file__).parent
)
if result.returncode != 0: if result.returncode != 0:
return None, result.stderr.strip() return None, result.stderr.strip()
try: try:
@@ -46,31 +54,41 @@ def fetch_info(url):
except json.JSONDecodeError: except json.JSONDecodeError:
return None, "Failed to parse JSON" return None, "Failed to parse JSON"
def download(url, out_dir): def download(url, out_dir):
cmd = [ cmd = [
YTDLP, YTDLP,
"-f", "bestvideo+bestaudio/best", "-f",
"bestvideo+bestaudio/best",
"--prefer-free-formats", "--prefer-free-formats",
"--cookies", COOKIES, "--cookies",
"--extractor-args", "generic:impersonate", COOKIES,
"--merge-output-format", "mp4", "--extractor-args",
"-o", str(out_dir / "%(title)s.%(ext)s"), "generic:impersonate",
url "--merge-output-format",
"mp4",
"-o",
str(out_dir / "%(title)s.%(ext)s"),
url,
] ]
result = subprocess.run(cmd, capture_output=True, text=True, cwd=Path(__file__).parent) result = subprocess.run(
cmd, capture_output=True, text=True, cwd=Path(__file__).parent
)
return result.returncode == 0, (result.stdout + result.stderr).strip() return result.returncode == 0, (result.stdout + result.stderr).strip()
def sanitize(name, max_len=60): def sanitize(name, max_len=60):
safe = "".join(c if c.isalnum() or c in " _-." else "_" for c in name) safe = "".join(c if c.isalnum() or c in " _-." else "_" for c in name)
return safe.strip()[:max_len].strip("_. ") return safe.strip()[:max_len].strip("_. ")
def main(): def main():
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
total = len(URLS) total = len(URLS)
# Load from JSON if present and non-empty # Load from JSON if present and non-empty
try: try:
if Path(URLS_JSON).exists(): if Path(URLS_JSON).exists():
with open(URLS_JSON, 'r', encoding='utf-8') as fh: with open(URLS_JSON, "r", encoding="utf-8") as fh:
data = json.load(fh) data = json.load(fh)
if isinstance(data, list) and data: if isinstance(data, list) and data:
URLS.clear() URLS.clear()
@@ -95,7 +113,9 @@ def main():
if info is None: if info is None:
# Likely a text post or unavailable # Likely a text post or unavailable
log(f" {YELLOW}⚠ skipped — no media ({err[:80] if err else 'no video found'}){RESET}") log(
f" {YELLOW}⚠ skipped — no media ({err[:80] if err else 'no video found'}){RESET}"
)
skipped.append((url, err)) skipped.append((url, err))
print() print()
continue continue
@@ -115,7 +135,9 @@ def main():
# Find what was downloaded # Find what was downloaded
files = list(out_dir.iterdir()) files = list(out_dir.iterdir())
sizes = [f"{f.stat().st_size / 1e6:.1f} MB" for f in files if f.is_file()] sizes = [f"{f.stat().st_size / 1e6:.1f} MB" for f in files if f.is_file()]
log(f" {GREEN}✓ done — {', '.join(sizes) if sizes else 'file saved'}{RESET}") log(
f" {GREEN}✓ done — {', '.join(sizes) if sizes else 'file saved'}{RESET}"
)
ok.append(url) ok.append(url)
else: else:
# Check if it's just no video (text post that slipped through info check) # Check if it's just no video (text post that slipped through info check)
@@ -123,14 +145,18 @@ def main():
log(f" {YELLOW}⚠ skipped — text post{RESET}") log(f" {YELLOW}⚠ skipped — text post{RESET}")
skipped.append((url, "no video content")) skipped.append((url, "no video content"))
# Remove empty dir # Remove empty dir
try: out_dir.rmdir() try:
except OSError: pass out_dir.rmdir()
except OSError:
pass
else: else:
log(f" {RED}✗ failed{RESET}") log(f" {RED}✗ failed{RESET}")
# Print last few lines of output for context # Print last few lines of output for context
for line in output.splitlines()[-3:]: for line in output.splitlines()[-3:]:
log(f" {DIM}{line}{RESET}") log(f" {DIM}{line}{RESET}")
failed.append((url, output.splitlines()[-1] if output else "unknown error")) failed.append(
(url, output.splitlines()[-1] if output else "unknown error")
)
print() print()
@@ -150,5 +176,6 @@ def main():
log(f"{''*55}\n", CYAN) log(f"{''*55}\n", CYAN)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -7,7 +7,7 @@ from flask import Flask, request, jsonify, Response, stream_with_context
from flask_cors import CORS from flask_cors import CORS
import time import time
app = Flask(__name__, static_folder='.', static_url_path='') app = Flask(__name__, static_folder=".", static_url_path="")
CORS(app) CORS(app)
# Store active jobs: {job_id: {"lines": [], "done": bool, "error": bool}} # Store active jobs: {job_id: {"lines": [], "done": bool, "error": bool}}
@@ -29,7 +29,7 @@ def run_ytdlp(job_id, cmd):
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
text=True, text=True,
bufsize=1, bufsize=1,
cwd=DOWNLOAD_DIR cwd=DOWNLOAD_DIR,
) )
for line in process.stdout: for line in process.stdout:
line = line.rstrip() line = line.rstrip()
@@ -47,73 +47,79 @@ def run_ytdlp(job_id, cmd):
jobs[job_id]["error"] = True jobs[job_id]["error"] = True
@app.route('/') @app.route("/")
def index(): def index():
return app.send_static_file('frontend.html') return app.send_static_file("frontend.html")
@app.route('/api/info', methods=['POST']) @app.route("/api/info", methods=["POST"])
def get_info(): def get_info():
data = request.json data = request.json
url = data.get('url', '').strip() url = data.get("url", "").strip()
cookies = data.get('cookies', '').strip() cookies = data.get("cookies", "").strip()
if not url: if not url:
return jsonify({"error": "No URL provided"}), 400 return jsonify({"error": "No URL provided"}), 400
cmd = [YTDLP_PATH, '--dump-json', '--no-playlist'] cmd = [YTDLP_PATH, "--dump-json", "--no-playlist"]
if cookies: if cookies:
cmd += ['--cookies', cookies] cmd += ["--cookies", cookies]
cmd += ['--extractor-args', 'generic:impersonate', url] cmd += ["--extractor-args", "generic:impersonate", url]
try: try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30, cwd=DOWNLOAD_DIR) result = subprocess.run(
cmd, capture_output=True, text=True, timeout=30, cwd=DOWNLOAD_DIR
)
if result.returncode != 0: if result.returncode != 0:
return jsonify({"error": result.stderr or "Failed to fetch info"}), 400 return jsonify({"error": result.stderr or "Failed to fetch info"}), 400
info = json.loads(result.stdout) info = json.loads(result.stdout)
formats = [] formats = []
for f in info.get('formats', []): for f in info.get("formats", []):
formats.append({ formats.append(
"id": f.get('format_id'), {
"ext": f.get('ext'), "id": f.get("format_id"),
"resolution": f.get('resolution') or f.get('format_note') or '', "ext": f.get("ext"),
"vcodec": f.get('vcodec', 'none'), "resolution": f.get("resolution") or f.get("format_note") or "",
"acodec": f.get('acodec', 'none'), "vcodec": f.get("vcodec", "none"),
"filesize": f.get('filesize') or f.get('filesize_approx'), "acodec": f.get("acodec", "none"),
"tbr": f.get('tbr'), "filesize": f.get("filesize") or f.get("filesize_approx"),
"label": f.get('format'), "tbr": f.get("tbr"),
}) "label": f.get("format"),
return jsonify({ }
"title": info.get('title', 'Unknown'), )
"thumbnail": info.get('thumbnail'), return jsonify(
"duration": info.get('duration'), {
"uploader": info.get('uploader'), "title": info.get("title", "Unknown"),
"formats": formats "thumbnail": info.get("thumbnail"),
}) "duration": info.get("duration"),
"uploader": info.get("uploader"),
"formats": formats,
}
)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
return jsonify({"error": "Timed out fetching video info"}), 408 return jsonify({"error": "Timed out fetching video info"}), 408
except Exception as e: except Exception as e:
return jsonify({"error": str(e)}), 500 return jsonify({"error": str(e)}), 500
@app.route('/api/download', methods=['POST']) @app.route("/api/download", methods=["POST"])
def start_download(): def start_download():
data = request.json data = request.json
url = data.get('url', '').strip() url = data.get("url", "").strip()
format_id = data.get('format_id', '').strip() format_id = data.get("format_id", "").strip()
cookies = data.get('cookies', '').strip() cookies = data.get("cookies", "").strip()
extra_args = data.get('extra_args', '').strip() extra_args = data.get("extra_args", "").strip()
if not url: if not url:
return jsonify({"error": "No URL provided"}), 400 return jsonify({"error": "No URL provided"}), 400
cmd = [YTDLP_PATH] cmd = [YTDLP_PATH]
if format_id: if format_id:
cmd += ['-f', format_id] cmd += ["-f", format_id]
cmd += ['--prefer-free-formats'] cmd += ["--prefer-free-formats"]
if cookies: if cookies:
cmd += ['--cookies', cookies] cmd += ["--cookies", cookies]
cmd += ['--extractor-args', 'generic:impersonate'] cmd += ["--extractor-args", "generic:impersonate"]
if extra_args: if extra_args:
cmd += extra_args.split() cmd += extra_args.split()
cmd.append(url) cmd.append(url)
@@ -125,7 +131,7 @@ def start_download():
return jsonify({"job_id": job_id}) return jsonify({"job_id": job_id})
@app.route('/api/status/<job_id>') @app.route("/api/status/<job_id>")
def job_status(job_id): def job_status(job_id):
def generate(): def generate():
sent = 0 sent = 0
@@ -135,20 +141,23 @@ def job_status(job_id):
if not job: if not job:
yield f"data: {json.dumps({'error': 'Job not found'})}\n\n" yield f"data: {json.dumps({'error': 'Job not found'})}\n\n"
break break
lines = job['lines'] lines = job["lines"]
while sent < len(lines): while sent < len(lines):
yield f"data: {json.dumps({'line': lines[sent]})}\n\n" yield f"data: {json.dumps({'line': lines[sent]})}\n\n"
sent += 1 sent += 1
if job['done']: if job["done"]:
yield f"data: {json.dumps({'done': True, 'error': job['error']})}\n\n" yield f"data: {json.dumps({'done': True, 'error': job['error']})}\n\n"
break break
time.sleep(0.2) time.sleep(0.2)
return Response(stream_with_context(generate()), mimetype='text/event-stream', return Response(
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}) stream_with_context(generate()),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
if __name__ == '__main__': if __name__ == "__main__":
print(f"[yt-dlp UI] Serving on http://localhost:5000") print(f"[yt-dlp UI] Serving on http://localhost:5000")
print(f"[yt-dlp UI] Download directory: {DOWNLOAD_DIR}") print(f"[yt-dlp UI] Download directory: {DOWNLOAD_DIR}")
app.run(debug=False, host='0.0.0.0', port=5000, threaded=True) app.run(debug=False, host="0.0.0.0", port=5000, threaded=True)