154 lines
5.4 KiB
Python
154 lines
5.4 KiB
Python
import base64
|
|
import os
|
|
import json
|
|
import requests
|
|
import time
|
|
import redis
|
|
|
|
REDIRECT_URL = "https://shsf-api.reversed.dev/api/exec/15/e7f4f8a2-5bef-413d-85fe-6c47a6cfc1e2/spoti"
|
|
SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token"
|
|
|
|
# Separate Redis DB from the control function (which uses db=12)
|
|
r = redis.Redis(host='194.15.36.205', username="default", port=12004, db=13, password=os.getenv("REDIS"))
|
|
|
|
SPOTIFY_TOKENS_KEY = "data-sources:spotify-tokens"
|
|
|
|
|
|
def _save_tokens(access_token: str, refresh_token: str, expires_in: int):
|
|
payload = {
|
|
"access_token": access_token,
|
|
"refresh_token": refresh_token,
|
|
"expires_at": int(time.time()) + expires_in,
|
|
}
|
|
r.set(SPOTIFY_TOKENS_KEY, json.dumps(payload))
|
|
|
|
|
|
def _read_tokens():
|
|
raw = r.get(SPOTIFY_TOKENS_KEY)
|
|
if raw:
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _refresh_access_token(refresh_token: str):
|
|
client_id = os.getenv("SPOTIFY_CLIENT_ID")
|
|
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")
|
|
auth = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode()
|
|
resp = requests.post(
|
|
SPOTIFY_TOKEN_URL,
|
|
headers={"Authorization": f"Basic {auth}", "Content-Type": "application/x-www-form-urlencoded"},
|
|
data={"grant_type": "refresh_token", "refresh_token": refresh_token},
|
|
timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
_save_tokens(
|
|
data["access_token"],
|
|
data.get("refresh_token", refresh_token), # Spotify may or may not return a new refresh token
|
|
data.get("expires_in", 3600),
|
|
)
|
|
return data["access_token"]
|
|
|
|
|
|
def _get_valid_access_token():
|
|
"""Return a valid access token, refreshing if needed. Returns None if no tokens stored."""
|
|
tokens = _read_tokens()
|
|
if not tokens:
|
|
return None
|
|
# Refresh 60s before expiry
|
|
if time.time() >= tokens["expires_at"] - 60:
|
|
return _refresh_access_token(tokens["refresh_token"])
|
|
return tokens["access_token"]
|
|
|
|
|
|
def main(args):
|
|
route = args.get("route")
|
|
|
|
if route == "test":
|
|
return {"status": "success", "text": "Text Test Success!"}
|
|
|
|
if route == "auth_spoti":
|
|
client_id = os.getenv("SPOTIFY_CLIENT_ID")
|
|
url = (
|
|
f"https://accounts.spotify.com/authorize"
|
|
f"?client_id={client_id}"
|
|
f"&response_type=code"
|
|
f"&redirect_uri={REDIRECT_URL}"
|
|
f"&scope=user-read-playback-state%20user-modify-playback-state%20user-read-currently-playing"
|
|
)
|
|
return {"_code": 302, "_location": url, "_shsf": "v2"}
|
|
|
|
if route == "spoti":
|
|
code = args.get("queries", {}).get("code")
|
|
if not code:
|
|
return {"status": "error", "message": "Missing code parameter"}
|
|
|
|
client_id = os.getenv("SPOTIFY_CLIENT_ID")
|
|
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")
|
|
if not client_id or not client_secret:
|
|
return {"status": "error", "message": "Missing Spotify credentials in environment"}
|
|
|
|
# Exchange authorization code for tokens
|
|
auth = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode()
|
|
try:
|
|
resp = requests.post(
|
|
SPOTIFY_TOKEN_URL,
|
|
headers={
|
|
"Authorization": f"Basic {auth}",
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
},
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"redirect_uri": REDIRECT_URL,
|
|
},
|
|
timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
except requests.RequestException as e:
|
|
return {"status": "error", "message": f"Token exchange failed: {e}"}
|
|
|
|
data = resp.json()
|
|
if "error" in data:
|
|
return {"status": "error", "message": data.get("error_description", data["error"])}
|
|
|
|
_save_tokens(data["access_token"], data["refresh_token"], data.get("expires_in", 3600))
|
|
return {"status": "success", "message": "Spotify authenticated and tokens saved."}
|
|
|
|
if route == "get_spoti_tokens":
|
|
tokens = _read_tokens()
|
|
if not tokens:
|
|
return {"status": "error", "message": "No Spotify tokens stored"}
|
|
return {"status": "success", "tokens": tokens}
|
|
|
|
if route == "get_spoti_now_playing":
|
|
try:
|
|
access_token = _get_valid_access_token()
|
|
except requests.RequestException as e:
|
|
return {"status": "error", "message": f"Token refresh failed: {e}"}
|
|
|
|
if not access_token:
|
|
return {"status": "error", "message": "Not authenticated with Spotify"}
|
|
|
|
try:
|
|
resp = requests.get(
|
|
"https://api.spotify.com/v1/me/player/currently-playing",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
timeout=10,
|
|
)
|
|
except requests.RequestException as e:
|
|
return {"status": "error", "message": f"Spotify API request failed: {e}"}
|
|
|
|
if resp.status_code == 204:
|
|
return {"status": "success", "playing": False, "message": "Nothing currently playing"}
|
|
if not resp.ok:
|
|
return {"status": "error", "message": f"Spotify API error {resp.status_code}"}
|
|
|
|
return {"status": "success", "playing": True, "data": resp.json()}
|
|
|
|
return {"status": "success", "message": "Data sources endpoint is up!"}
|
|
|