63 lines
2.1 KiB
Python
63 lines
2.1 KiB
Python
import base64
|
|
import io
|
|
import json
|
|
import time
|
|
from minio import Minio
|
|
|
|
DEFAULT_STATE = {"text": "", "image_url": ""}
|
|
|
|
MINIO_CLIENT = Minio(
|
|
"content2.reversed.dev",
|
|
access_key="tv-control",
|
|
secret_key="0gEjOlnVAECrqJx5Nd77y7Hhouc5faVf0WttjcRH",
|
|
secure=True,
|
|
)
|
|
BUCKET = "tv-control"
|
|
|
|
def _read_state():
|
|
try:
|
|
with open("/app/state.json", "r") as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return dict(DEFAULT_STATE)
|
|
|
|
def _write_state(state):
|
|
with open("/app/state.json", "w") as f:
|
|
json.dump(state, f)
|
|
|
|
def main(args):
|
|
route = args.get("route")
|
|
|
|
body = args.get("body")
|
|
body = json.loads(body) if body else {}
|
|
|
|
if route.startswith("push"):
|
|
if route == "push_text":
|
|
current = _read_state()
|
|
current["text"] = body.get("text", "")
|
|
_write_state(current)
|
|
return {"status": "success"}
|
|
elif route == "push_image":
|
|
image_b64 = body.get("image_b64", "")
|
|
image_bytes = base64.b64decode(image_b64)
|
|
file_name = f"tv-image-{int(time.time() * 1000)}.jpg"
|
|
MINIO_CLIENT.put_object(
|
|
BUCKET, file_name, io.BytesIO(image_bytes), len(image_bytes), content_type="image/jpeg"
|
|
)
|
|
public_url = f"https://content2.reversed.dev/{BUCKET}/{file_name}"
|
|
current = _read_state()
|
|
current["image_url"] = public_url
|
|
_write_state(current)
|
|
return {"status": "success", "image_url": public_url}
|
|
else:
|
|
return {"status": "error", "message": "Unknown push route"}
|
|
elif route.startswith("pull"):
|
|
current = _read_state()
|
|
if route == "pull_text":
|
|
return {"status": "success", "text": current.get("text", "")}
|
|
elif route == "pull_image_url":
|
|
return {"status": "success", "image_url": current.get("image_url", "")}
|
|
elif route == "pull_full":
|
|
return {"status": "success", "state": current}
|
|
else:
|
|
return {"status": "error", "message": "Unknown pull route"} |