import uuid from dataclasses import asdict import dotenv from flask import Flask, Response, jsonify, request from flask_cors import CORS from flask_socketio import SocketIO, join_room, leave_room from .state import State from .connect import get_connection from .room import Room from .song import Song, init_db, get_song_by_title_artist, add_song_in_db, get_song_by_uuid from .song_fetch import query_search, yt_get_audio_url, yt_search_song from .qrcode_gen import generate_qr from .gps import is_within_range dotenv.load_dotenv() app = Flask(__name__) app.config["SECRET_KEY"] = "your_secret_key" socketio = SocketIO(app, cors_allowed_origins="*", path="/ws") CORS(app) db_conn = get_connection() state = State(app, socketio, db_conn.cursor()) init_db(state.db) state.rooms[1000] = Room( id=1000, coord=(1.0, 5.5), name="Test Room", pin=None, tags=set(), range_size=100, songs={}, history=[], playing=[], playing_idx=0, ) def error(msg: str, status: int = 400) -> Response: res = jsonify({"success": False, "error": msg}) res.status_code = status return res @socketio.on("connect") def handle_connection(): print("somebody connected to socket.io", flush=True) @socketio.on("disconnect") def handle_disconnection(): print("somebody disconnected from socket.io", flush=True) @socketio.on("join_room") def on_join(data): room = data["id"] join_room(room) print(f"somebody joined {room=}", flush=True) @socketio.on("leave_room") def on_leave(data): room = data["id"] leave_room(room) print(f"somebody left {room=}", flush=True) @app.get("/api/join") def join(): room_id = request.args.get("room") code = request.args.get("code") user_location = request.args.get("location") if room_id is None: return error("Missing room id") if (room := state.rooms.get(int(room_id))) is None: return error("Invalid room") if room.pin is not None and room.pin != code: return error("Invalid code") if is_within_range(user_location, room.coord, room.range_size) is False: return error("You are not within the room range") return {"success": True, "ws": f"/ws/{room_id}"} @app.get("/api/queue") def queue(): if (room_id := request.args.get("room")) is None: return error("Missing room id") if (room := state.rooms.get(int(room_id))) is None: return error("Invalid room") return {"success": True, "queue": room.playing, "index": room.playing_idx} @app.post("/api/queue/next") def queue_next(): if (room_id := request.args.get("room")) is None: return error("Missing room id") if (room := state.rooms.get(int(room_id))) is None: return error("Invalid room") room.playing_idx += 1 if room.playing_idx >= len(room.playing): ## queue ended room.renew_queue() ended = True else: ended = False data = {"success": True, "ended": ended, "index": room.playing_idx, "queue": [asdict(s) for s in room.playing]} state.socketio.emit("queue_update", data, to=str(room.id)) return data @app.post("/api/room/new") def room_new(): if (room_name := request.args.get("name")) is None: return error("Missing room name") if (room_cords := request.args.get("coords")) is None: return error("Missing room coords") if (room_range := request.args.get("range")) is None: return error("Missing room range") if room_pin := request.args.get("pin"): room_pin = int(room_pin) else: room_pin = None lat, lon = room_cords.split(",") room = Room( id=max(state.rooms or [0]) + 1, # coord=(float(lat), float(lon)), range_size=int(room_range), name=room_name, pin=room_pin, tags=set([tag for tag in request.args.get("tags", "").split(",") if tag]), songs={}, history=[], playing=[], playing_idx=-1, ) state.rooms[room.id] = room return {"success": True, "room_id": room.id} @app.get("/api/room") def room(): return [ { "id": room.id, "name": room.name, "private": room.pin is not None, "coords": room.coord, "range": room.range_size, } for room in state.rooms.values() ] @app.post("/api/addsong") def add_song(): if (room_id := request.args.get("room")) is None: return error("Missing room id") if (room := state.rooms.get(int(room_id))) is None: return error("Invalid room") if (query := request.args.get("query")) is None: return error("Missing query") if (info := query_search(query)) is None: return error("Search failed") if (song := get_song_by_title_artist(info.title, info.artist)) is None: ## song not found, downolad from YT yt_video_id = yt_search_song(info.title, info.artist) ## add in DB song = Song( uuid=str(uuid.uuid4()), title=info.title, artist=info.artist, tags=info.tags, image_id=info.img_id, youtube_id=yt_video_id, ) add_song_in_db(song) ## add the song in the room if does not exists if song.uuid not in room.songs: room.songs[song.uuid] = (song, 1) # start with one vote socketio.emit("new_song", {"song": asdict(song) | {"upvote": 1}}, to=str(room.id)) return {"success": True, "song": song} @app.get("/api/room/suggestions") def get_room_suggestion(): if (room_id := request.args.get("room")) is None: return error("Missing room id") if (room := state.rooms.get(int(room_id))) is None: return error("Invalid room id") return {"success": True, "songs": [asdict(song) | {"upvote": score} for song, score in room.songs.values()]} @app.post("/api/song/voting") def post_song_vote(): if (room_id := request.args.get("room")) is None: return error("Missing room id") if (room := state.rooms.get(int(room_id))) is None: return error("Invalid room id") if (song_id := request.args.get("song")) is None: return error("Missing song id") if (song_info := room.songs.get(song_id)) is None: return error("Invalid song id") ## update the song room.songs[song_id] = (song_info[0], song_info[1] + int(request.args["increment"])) socketio.emit("new_vote", {"song": asdict(song_info[0]) | {"upvote": song_info[1]}}) return {"success": True} @app.get("/api/room/qrcode") def room_qrcode(): if (room_id := request.args.get("room")) is None: return error("Missing room id") if (pin := request.args.get("pin")) is not None: pin = int(pin) stream = generate_qr( base_uri="https://chillbox.leoinvents.com", room_id=int(room_id), pin=pin, ) return Response(stream, content_type="image/jpeg") @app.get("/api/song/audio") def get_audio_url(): if (song_uuid := request.args.get("song")) is None: return error("Missing song id") if (song := get_song_by_uuid(song_uuid)) is None: return error("Song not found") if (url := yt_get_audio_url(song.youtube_id)) is None: return error("Cannot get audio url") return {"success": True, "url": url} if __name__ == "__main__": socketio.run(app, debug=True)