148 lines
5.5 KiB
Python
148 lines
5.5 KiB
Python
import base64
|
|
import io
|
|
import json
|
|
import time
|
|
from minio import Minio
|
|
import redis
|
|
import os
|
|
|
|
DEFAULT_STATE = {
|
|
"text": {
|
|
"showing": False,
|
|
"title": "Wassup"
|
|
},
|
|
"image_popup": {
|
|
"showing": False,
|
|
"image_url": "",
|
|
"caption": ""
|
|
},
|
|
"data_cards": [],
|
|
"settings": {
|
|
"background_url": ""
|
|
}
|
|
}
|
|
|
|
MINIO_CLIENT = Minio(
|
|
"content2.reversed.dev",
|
|
access_key="tv-control",
|
|
secret_key="0gEjOlnVAECrqJx5Nd77y7Hhouc5faVf0WttjcRH",
|
|
secure=True,
|
|
)
|
|
BUCKET = "tv-control"
|
|
r = redis.Redis(host='194.15.36.205',username="default", port=12004, db=12, password=os.getenv("REDIS"))
|
|
|
|
def _read_state():
|
|
state_json = r.get("tv-control-state")
|
|
if state_json:
|
|
try:
|
|
return json.loads(state_json)
|
|
except json.JSONDecodeError:
|
|
return dict(DEFAULT_STATE)
|
|
else:
|
|
return dict(DEFAULT_STATE)
|
|
|
|
def _write_state(state):
|
|
r.set("tv-control-state", json.dumps(state))
|
|
|
|
def main(args):
|
|
route = args.get("route")
|
|
|
|
body = args.get("body")
|
|
body = json.loads(body) if body else {}
|
|
|
|
if route.startswith("push"):
|
|
if route == "push_text":
|
|
current = _read_state()
|
|
current["text"]["title"] = body.get("title", "")
|
|
current["text"]["showing"] = True
|
|
_write_state(current)
|
|
return {"status": "success"}
|
|
elif route == "push_dismiss_text":
|
|
current = _read_state()
|
|
current["text"]["showing"] = False
|
|
_write_state(current)
|
|
return {"status": "success"}
|
|
elif route == "push_image":
|
|
image_b64 = body.get("image_b64", "")
|
|
caption = body.get("caption", "")
|
|
image_bytes = base64.b64decode(image_b64)
|
|
file_name = f"tv-image-{int(time.time() * 1000)}.jpg"
|
|
MINIO_CLIENT.put_object(
|
|
BUCKET, file_name, io.BytesIO(image_bytes), len(image_bytes), content_type="image/jpeg"
|
|
)
|
|
public_url = f"https://content2.reversed.dev/{BUCKET}/{file_name}"
|
|
current = _read_state()
|
|
current["image_popup"]["image_url"] = public_url
|
|
current["image_popup"]["caption"] = caption
|
|
current["image_popup"]["showing"] = True
|
|
_write_state(current)
|
|
return {"status": "success", "image_url": public_url}
|
|
elif route == "push_dismiss_image":
|
|
current = _read_state()
|
|
current["image_popup"]["showing"] = False
|
|
_write_state(current)
|
|
return {"status": "success"}
|
|
elif route == "push_data_card":
|
|
# Upsert a data card by id
|
|
card = body.get("card")
|
|
if not card or not card.get("id"):
|
|
return {"status": "error", "message": "Missing card or card id"}
|
|
current = _read_state()
|
|
cards = current.get("data_cards", [])
|
|
existing = next((i for i, c in enumerate(cards) if c["id"] == card["id"]), None)
|
|
if existing is not None:
|
|
cards[existing] = card
|
|
else:
|
|
cards.append(card)
|
|
current["data_cards"] = cards
|
|
_write_state(current)
|
|
return {"status": "success"}
|
|
elif route == "push_delete_data_card":
|
|
card_id = body.get("id")
|
|
if not card_id:
|
|
return {"status": "error", "message": "Missing card id"}
|
|
current = _read_state()
|
|
current["data_cards"] = [c for c in current.get("data_cards", []) if c["id"] != card_id]
|
|
_write_state(current)
|
|
return {"status": "success"}
|
|
elif route == "push_settings":
|
|
bg_url = body.get("background_url", "")
|
|
current = _read_state()
|
|
current.setdefault("settings", {})["background_url"] = bg_url
|
|
_write_state(current)
|
|
return {"status": "success"}
|
|
elif route == "push_upload_images":
|
|
# Upload multiple images to MinIO; returns their public URLs (no state change)
|
|
images = body.get("images", [])
|
|
if not images:
|
|
return {"status": "error", "message": "No images provided"}
|
|
urls = []
|
|
ts = int(time.time() * 1000)
|
|
for idx, img in enumerate(images):
|
|
image_b64 = img.get("image_b64", "")
|
|
ext = img.get("ext", "jpg").lstrip(".")
|
|
if not image_b64:
|
|
continue
|
|
image_bytes = base64.b64decode(image_b64)
|
|
file_name = f"tv-rotator-{ts}-{idx}.{ext}"
|
|
MINIO_CLIENT.put_object(
|
|
BUCKET, file_name, io.BytesIO(image_bytes), len(image_bytes),
|
|
content_type=f"image/{ext}",
|
|
)
|
|
public_url = f"https://content2.reversed.dev/{BUCKET}/{file_name}"
|
|
urls.append(public_url)
|
|
return {"status": "success", "urls": urls}
|
|
else:
|
|
return {"status": "error", "message": "Unknown push route"}
|
|
elif route.startswith("pull"):
|
|
current = _read_state()
|
|
if route == "pull_text":
|
|
return {"status": "success", "text": current.get("text", {})}
|
|
elif route == "pull_image":
|
|
return {"status": "success", "image_popup": current.get("image_popup", {})}
|
|
elif route == "pull_full":
|
|
return {"status": "success", "state": current}
|
|
elif route == "pull_data_cards":
|
|
return {"status": "success", "data_cards": current.get("data_cards", [])}
|
|
else:
|
|
return {"status": "error", "message": "Unknown pull route"} |