team-9/backend/endpoints/spotify_api.py

134 lines
4.2 KiB
Python

import os
import urllib.parse
from email import header
import requests
from auth.session import SessionData, SessionManager
from fastapi import APIRouter
from fastapi.responses import RedirectResponse
from backend.endpoints.queue import delete_item
music_router = APIRouter(prefix="/music")
SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
SPOTIFY_REDIRECT_URI = os.getenv("HOST") + "/music/callback"
SPOTIFY_AUTH_URL = "https://accounts.spotify.com/authorize"
SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token"
SPOTIFY_PLAY_URL = "https://api.spotify.com/v1/"
# Step 1: Redirect user to Spotify login
@music_router.get("/login")
def login():
scope = "user-modify-playback-state user-read-playback-state"
url = (
f"{SPOTIFY_AUTH_URL}?response_type=code"
f"&client_id={SPOTIFY_CLIENT_ID}"
f"&scope={scope}"
f"&redirect_uri={SPOTIFY_REDIRECT_URI}"
)
return RedirectResponse(url)
# Step 2: Callback to get access token
@music_router.get("/callback")
def callback(code: str):
payload = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": SPOTIFY_REDIRECT_URI,
"client_id": SPOTIFY_CLIENT_ID,
"client_secret": SPOTIFY_CLIENT_SECRET,
}
response = requests.post(SPOTIFY_TOKEN_URL, data=payload)
token_info = response.json()
access_token = token_info.get("access_token")
refresh_token = token_info.get("refresh_token")
expires_in = token_info.get("expires_in")
if any(token is None for token in [access_token, refresh_token, expires_in]):
return {"error": "Failed to obtain access token from Spotify."}
# Salva access_token in sessione o database per usi futuri
old_session = SessionManager.instance().get_current_session()
SessionManager.instance().set_session(SessionData(access_token, refresh_token, expires_in))
session = SessionManager.instance().get_current_session()
return {
"changed": session != old_session,
"access_token": session.access_token,
}
@music_router.get("/search")
def search(query: str):
try:
url = SPOTIFY_PLAY_URL + f"search?q={query}&type=track"
header = {
"Authorization": "Bearer " + SessionManager.instance().get_current_session().access_token,
"Content-Type": "application/json"
}
response = requests.get(url, headers=header)
return response.json()
except Exception as e:
print(e)
@music_router.get("/play")
def play(song_id: str):
try:
url = SPOTIFY_PLAY_URL + "me/player/play"
body = {
"uris": [f"spotify:track:{song_id}"],
"position_ms": 0
}
header = {
"Authorization": "Bearer " + SessionManager.instance().get_current_session().access_token,
"Content-Type": "application/json"
}
print(body)
response = requests.put(url,json=body, headers=header)
return response.json()
except Exception as e:
print(e)
@music_router.get("/current-song")
def current_song():
try:
url = SPOTIFY_PLAY_URL + f"me/player/currently-playing?marketing=IT"
header = {
"Authorization": "Bearer " + SessionManager.instance().get_current_session().access_token,
"Content-Type": "application/json"
}
return requests.get(url, headers=header).json()
except Exception as e:
print(e)
@music_router.get("/add_queue_track")
async def add_queue_track(song_id: str):
try:
params = {
"uri": f"spotify:track:{song_id}"
}
url = f"{SPOTIFY_PLAY_URL}me/player/queue"
header = {
"Authorization": "Bearer " + SessionManager.instance().get_current_session().access_token,
}
response_code = requests.post(url, headers=header, params=params).status_code
if response_code == 204:
return {"success": True, "message": "Track added to queue"}
else:
return {"success": False, "message": "Failed to add track to queue", "status_code": response_code}
except Exception as e:
import traceback
traceback.print_exc()
print(e)