Files
tv-control/functions/control/main.py

113 lines
4.0 KiB
Python

import base64
import io
import json
import time
from minio import Minio
DEFAULT_STATE = {
"text": {
"showing": False,
"title": "Wassup"
},
"image_popup": {
"showing": False,
"image_url": "",
"caption": ""
},
"data_cards": []
}
MINIO_CLIENT = Minio(
"content2.reversed.dev",
access_key="tv-control",
secret_key="0gEjOlnVAECrqJx5Nd77y7Hhouc5faVf0WttjcRH",
secure=True,
)
BUCKET = "tv-control"
def _read_state():
try:
with open("/app/state.json", "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return dict(DEFAULT_STATE)
def _write_state(state):
with open("/app/state.json", "w") as f:
json.dump(state, f)
def main(args):
route = args.get("route")
body = args.get("body")
body = json.loads(body) if body else {}
if route.startswith("push"):
if route == "push_text":
current = _read_state()
current["text"]["title"] = body.get("title", "")
current["text"]["showing"] = True
_write_state(current)
return {"status": "success"}
elif route == "push_dismiss_text":
current = _read_state()
current["text"]["showing"] = False
_write_state(current)
return {"status": "success"}
elif route == "push_image":
image_b64 = body.get("image_b64", "")
caption = body.get("caption", "")
image_bytes = base64.b64decode(image_b64)
file_name = f"tv-image-{int(time.time() * 1000)}.jpg"
MINIO_CLIENT.put_object(
BUCKET, file_name, io.BytesIO(image_bytes), len(image_bytes), content_type="image/jpeg"
)
public_url = f"https://content2.reversed.dev/{BUCKET}/{file_name}"
current = _read_state()
current["image_popup"]["image_url"] = public_url
current["image_popup"]["caption"] = caption
current["image_popup"]["showing"] = True
_write_state(current)
return {"status": "success", "image_url": public_url}
elif route == "push_dismiss_image":
current = _read_state()
current["image_popup"]["showing"] = False
_write_state(current)
return {"status": "success"}
elif route == "push_data_card":
# Upsert a data card by id
card = body.get("card")
if not card or not card.get("id"):
return {"status": "error", "message": "Missing card or card id"}
current = _read_state()
cards = current.get("data_cards", [])
existing = next((i for i, c in enumerate(cards) if c["id"] == card["id"]), None)
if existing is not None:
cards[existing] = card
else:
cards.append(card)
current["data_cards"] = cards
_write_state(current)
return {"status": "success"}
elif route == "push_delete_data_card":
card_id = body.get("id")
if not card_id:
return {"status": "error", "message": "Missing card id"}
current = _read_state()
current["data_cards"] = [c for c in current.get("data_cards", []) if c["id"] != card_id]
_write_state(current)
return {"status": "success"}
else:
return {"status": "error", "message": "Unknown push route"}
elif route.startswith("pull"):
current = _read_state()
if route == "pull_text":
return {"status": "success", "text": current.get("text", {})}
elif route == "pull_image":
return {"status": "success", "image_popup": current.get("image_popup", {})}
elif route == "pull_full":
return {"status": "success", "state": current}
elif route == "pull_data_cards":
return {"status": "success", "data_cards": current.get("data_cards", [])}
else:
return {"status": "error", "message": "Unknown pull route"}