This commit is contained in:
Mat12143 2025-08-01 22:05:01 +02:00
commit 51aafb9771
6 changed files with 85 additions and 26 deletions

View file

@ -2,7 +2,7 @@ FROM python:3.13.5-alpine
WORKDIR /app
RUN apk update && apk add git
RUN apk update && apk add git ffmpeg
RUN git clone --depth 1 'https://github.com/yt-dlp/yt-dlp.git' /yt-dlp

View file

@ -3,7 +3,9 @@ from flask import Flask, Response, jsonify, request
from flask_cors import CORS
from .room import Room
from .song import init_db
from .song import Song, init_db
# from .song_fetch import *
dotenv.load_dotenv()
@ -12,7 +14,18 @@ app = Flask(__name__)
CORS(app)
ROOMS: dict[int, Room] = {} # { room_id: room, ... }
ROOMS: dict[int, Room] = {
1234: Room(
1234,
(0.0, 1.0),
"Test Room",
None,
{"Rock", "Metal"},
{},
[],
[Song(mbid="test", title="<title placeholder>", artist="<artist placeholder>", tags=["Metal"], image_id="img-id", youtube_id="yt-id")],
)
} # { room_id: room, ... }
def error(msg: str, status: int = 400) -> Response:
@ -21,12 +34,7 @@ def error(msg: str, status: int = 400) -> Response:
return res
@app.route("/api")
def index():
return "hello from flask"
@app.route("/api/join")
@app.get("/api/join")
def join():
room_id = request.args.get("room")
code = request.args.get("code")
@ -43,6 +51,25 @@ def join():
return {"success": True, "ws": f"/ws/{room_id}"}
@app.get("/api/queue")
def queue():
if (room_id := request.args.get("room")) is None:
return error("Missing room id")
if (room := ROOMS.get(int(room_id))) is None:
return error("Invalid room")
return {"success": True, "queue": room.playing}
@app.post("/api/queue/next")
def queue_next(): ...
@app.post("/api/queue/renew")
def queue_renew(): ...
init_db()
if __name__ == "__main__":
app.run(debug=True)

View file

@ -3,4 +3,5 @@ import sqlite3
def get_connection():
conn = sqlite3.connect(".data/jukebox.db")
conn.row_factory = sqlite3.Row
return conn

View file

@ -21,8 +21,9 @@ class Room:
pin: int | None
tags: set[str]
songs: dict[str, UserScoredSong]
history: list[Song]
songs: dict[str, UserScoredSong] # canzoni + voto
history: list[Song] # canzoni riprodotte (in ordine)
playing: list[Song] # canzoni che sono i riproduzione
def rank_song(self, song: Song, user_score: int) -> float:
rank = 0.0

View file

@ -27,3 +27,32 @@ class Song:
tags: list[str]
image_id: str
youtube_id: str
def get_song_by_mbid(mbid: str) -> Song:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM songs WHERE mbid = ?", (mbid,))
row = cursor.fetchone()
conn.close()
if row is None:
raise ValueError(f"Song with MBID {mbid} not found")
song = Song(mbid=row["mbid"], title=row["title"], artist=row["artist"], tags=row["tags"].split(","), image_id=row["lastfm_image_id"], youtube_id=row["youtube_id"])
return song
def add_song(song: Song):
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
INSERT OR REPLACE INTO songs (mbid, title, artist, tags, lastfm_image_id, youtube_id)
VALUES (?, ?, ?, ?, ?, ?)
""",
(song.mbid, song.title, song.artist, ",".join(song.tags), song.image_id, song.youtube_id),
) # Updates song info if it already exists
conn.commit()
conn.close()

View file

@ -21,9 +21,7 @@ def lastfm_search(query: str) -> tuple[str, str]:
return track_info["name"], track_info["artist"]
def lastfm_getinfo(
name: str, artist: str
) -> tuple[str, str, str, str, list[str]]: # ( id, image_id, tags )
def lastfm_getinfo(name: str, artist: str) -> tuple[str, str, str, str, list[str]]: # ( id, image_id, tags )
response = requests.get(
url="https://ws.audioscrobbler.com/2.0/?method=track.getInfo&format=json",
params={
@ -44,20 +42,23 @@ def lastfm_getinfo(
)
print(yt_dlp, flush=True)
def download_song_mp3(name: str, artist: str) -> tuple[str, str] | None: # ( id, audio )
ydl_opts = {
"format": "bestaudio",
"default_search": "ytsearch1",
"outtmpl": "%(title)s.%(ext)s",
"skip_download": True,
}
# # def get_yt_mp3link(name: str, artist: str) -> str: ...
# # os.popen("/yt-dlp ")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(f"{name!r} - {artist!r}", download=False)
# # /yt-dlp/yt-dlp.sh "ytsearch1:Never gonna give you up" --get-url -f "ba"
first_entry = info["entries"][0]
# import json
video_id = first_entry["id"]
# print(json.dumps(lastfm_getinfo(*lastfm_search("money")), indent=2))
# exit(1)
for fmt in first_entry["formats"]:
if "acodec" in fmt and fmt["acodec"] != "none":
return video_id, fmt["url"]
# # def
# ## query ==> lastfm ==> list of songs ==> take first ==> request song info ==> get YT link ==> save in DB ==>
return None