match / main.py
Chrunos's picture
Rename spotube_match.py to main.py
344c8a7 verified
import socket
import sys
import os
import base64
import time
import asyncio
import httpx
import re
import traceback
import logging
import requests
from difflib import SequenceMatcher
from urllib.parse import urlparse, parse_qs
from bs4 import BeautifulSoup
from ytmusicapi import YTMusic
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Dict, Any
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# --- DNS BYPASS PATCH START ---
BYPASS_DOMAINS = [
"youtube.com", "music.youtube.com", "googlevideo.com", "youtu.be"
]
try:
import dns.resolver
_original_getaddrinfo = socket.getaddrinfo
def patched_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
if host and any(domain in host for domain in BYPASS_DOMAINS):
try:
res = dns.resolver.Resolver()
res.nameservers = ['8.8.8.8', '1.1.1.1']
answers = res.resolve(host, 'A')
ip_address = answers[0].to_text()
return [(socket.AF_INET, type, proto, '', (ip_address, port))]
except Exception:
return _original_getaddrinfo(host, port, family, type, proto, flags)
return _original_getaddrinfo(host, port, family, type, proto, flags)
socket.getaddrinfo = patched_getaddrinfo
print(f"[INIT] DNS Bypass installed for {len(BYPASS_DOMAINS)} domains.", file=sys.stderr)
except ImportError:
print("❌ CRITICAL: dnspython not installed. DNS Bypass failed.", file=sys.stderr)
# --- DNS BYPASS PATCH END ---
# Initialize FastAPI and YTMusic
app = FastAPI()
ytmusic = YTMusic()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Pydantic Models ---
class SearchRequest(BaseModel):
query: str
class MatchRequest(BaseModel):
url: str
class MatchResponse(BaseModel):
url: str
filename: str
track_id: str
# ==========================================
# TIDAL AUTHENTICATION
# ==========================================
TIDAL_CLIENT_ID = os.getenv('TIDAL_CLIENT_ID')
TIDAL_CLIENT_SECRET = os.getenv('TIDAL_CLIENT_SECRET')
QOBUZ_APP_ID = os.getenv('QOBUZ_APP_ID')
QOBUZ_TOKEN = os.getenv('QOBUZ_TOKEN')
if not all([TIDAL_CLIENT_ID, TIDAL_CLIENT_SECRET]):
print("⚠️ Warning: TIDAL_CLIENT_ID or TIDAL_CLIENT_SECRET not found in environment. Tidal search will fail.", file=sys.stderr)
if not all([QOBUZ_APP_ID, QOBUZ_TOKEN]):
print("⚠️ Warning: QOBUZ_APP_ID or QOBUZ_TOKEN not found in environment. Qobuz lookup will fail.", file=sys.stderr)
cached_tidal_token = None
token_expiry_time = 0
def similar(a: str, b: str) -> float:
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def clean_title(title: str) -> str:
title = re.sub(r'(?i)\s*-\s*(single|ep)$', '', title)
title = re.sub(r'\s*\([^)]*\)', '', title)
return title.strip()
async def get_tidal_access_token():
global cached_tidal_token, token_expiry_time
if not TIDAL_CLIENT_ID or not TIDAL_CLIENT_SECRET:
return None
if cached_tidal_token and time.time() < (token_expiry_time - 60):
return cached_tidal_token
try:
b64_creds = base64.b64encode(f"{TIDAL_CLIENT_ID}:{TIDAL_CLIENT_SECRET}".encode()).decode()
async with httpx.AsyncClient() as client:
response = await client.post(
"https://auth.tidal.com/v1/oauth2/token",
headers={"Authorization": f"Basic {b64_creds}"},
data={"grant_type": "client_credentials"}
)
if response.status_code == 200:
data = response.json()
cached_tidal_token = data.get("access_token")
token_expiry_time = time.time() + data.get("expires_in", 3600)
return cached_tidal_token
except Exception as e:
print(f"Tidal Auth Error: {e}")
return None
def search_youtube_sync(search_query: str, target_title: str, target_artist: str):
"""Searches YouTube and returns the BEST match and the SECOND BEST (alternative) match."""
try:
yt_results = ytmusic.search(search_query, filter="songs")
if not yt_results:
return None, None
scored_results = []
for result in yt_results[:5]:
yt_title = result.get('title', '')
yt_artists = " ".join([a.get('name', '') for a in result.get('artists', [])])
title_score = similar(clean_title(target_title), clean_title(yt_title))
artist_score = similar(target_artist, yt_artists)
total_score = (title_score * 0.6) + (artist_score * 0.4)
if title_score > 0.8:
total_score += 0.2
scored_results.append((total_score, result['videoId']))
scored_results.sort(key=lambda x: x[0], reverse=True)
best_id = scored_results[0][1] if scored_results else None
alt_id = scored_results[1][1] if len(scored_results) > 1 else None
return best_id, alt_id
except Exception as e:
print(f"YouTube search error: {e}")
return None, None
async def search_tidal_async(search_query: str, target_title: str, target_artist: str):
access_token = await get_tidal_access_token()
if not access_token:
return None
async with httpx.AsyncClient() as client:
try:
search_response = await client.get(
"https://api.tidal.com/v1/search",
headers={"Authorization": f"Bearer {access_token}"},
params={"query": search_query, "types": "TRACKS", "countryCode": "US", "limit": 5}
)
if search_response.status_code == 200:
tidal_data = search_response.json()
if "tracks" in tidal_data and tidal_data["tracks"]["items"]:
best_match_id = None
highest_score = 0.0
for track in tidal_data["tracks"]["items"]:
t_title = track.get("title", "")
t_artists = " ".join([a.get("name", "") for a in track.get("artists", [])])
title_score = similar(clean_title(target_title), clean_title(t_title))
artist_score = similar(target_artist, t_artists)
total_score = (title_score * 0.6) + (artist_score * 0.4)
if title_score > 0.8: total_score += 0.2
if total_score > highest_score:
highest_score = total_score
best_match_id = track["id"]
if highest_score < 0.4: return None
if best_match_id: return f"https://tidal.com/browse/track/{best_match_id}"
except Exception as e:
print(f"Tidal search error: {e}")
return None
# --- Match Helper Functions ---
def extract_amazon_track_id(url: str) -> Optional[str]:
if "music.amazon.com" not in url: return None
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
if "trackAsin" in query_params: return query_params["trackAsin"][0]
path_parts = parsed_url.path.split('/')
if "tracks" in path_parts:
try:
idx = path_parts.index("tracks") + 1
if idx < len(path_parts): return path_parts[idx]
except (ValueError, IndexError): pass
return None
def get_song_link_info(url: str) -> Optional[Dict[str, Any]]:
api_base_url = "https://api.song.link/v1-alpha.1/links"
params = {"userCountry": "US"}
if "music.amazon.com" in url:
track_id = extract_amazon_track_id(url)
if track_id:
params["platform"] = "amazonMusic"
params["id"] = track_id
params["type"] = "song"
else: params["url"] = url
else: params["url"] = url
try:
response = requests.get(api_base_url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error fetching from Song.link API: {e}")
return None
def extract_url_from_songlink(links_by_platform: dict, platform: str) -> Optional[str]:
if platform in links_by_platform and links_by_platform[platform].get("url"):
return links_by_platform[platform]["url"]
return None
# --- Endpoints ---
@app.get("/")
async def root():
return {"message": "Combined Music API is running. Use /convert, /match, or /searcht."}
@app.post("/searcht")
async def searcht(request: SearchRequest):
logger.info(f"search query: {request.query}")
search_results = ytmusic.search(request.query, filter="songs")
first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {}
return first_song
@app.post("/match", response_model=MatchResponse)
async def match(request: MatchRequest):
track_url = request.url
logger.info(f"Match endpoint: Processing URL: {track_url}")
track_info = get_song_link_info(track_url)
if not track_info:
raise HTTPException(status_code=404, detail="Could not fetch track info from Song.link API.")
entity_unique_id = track_info.get("entityUniqueId")
title, artist = None, None
if entity_unique_id and entity_unique_id in track_info.get("entitiesByUniqueId", {}):
ent = track_info["entitiesByUniqueId"][entity_unique_id]
title, artist = ent.get("title"), ent.get("artistName")
else:
for _, edata in track_info.get("entitiesByUniqueId", {}).items():
if edata.get("title") and edata.get("artistName"):
title, artist = edata.get("title"), edata.get("artistName")
break
if not title or not artist:
raise HTTPException(status_code=404, detail="Could not determine title and artist.")
youtube_url = extract_url_from_songlink(track_info.get("linksByPlatform", {}), "youtube")
if youtube_url:
video_id = None
if "v=" in youtube_url: video_id = youtube_url.split("v=")[1].split("&")[0]
elif "youtu.be/" in youtube_url: video_id = youtube_url.split("youtu.be/")[1].split("?")[0]
return MatchResponse(url=youtube_url, filename=f"{title} - {artist}", track_id=video_id)
else:
search_query = f'{title} {artist}'
search_results = ytmusic.search(search_query, filter="songs")
if search_results:
first = next((song for song in search_results if song.get('videoId')), None)
if first:
v_id = first["videoId"]
ym_url = f'https://music.youtube.com/watch?v={v_id}'
a_name = first['artists'][0]['name'] if first.get('artists') else artist
return MatchResponse(filename=f"{first.get('title', title)} - {a_name}", url=ym_url, track_id=v_id)
raise HTTPException(status_code=404, detail="No matching video found on YouTube Music.")
@app.get("/convert")
async def convert_url_to_all(url: str):
track_title, artist_clean = "", ""
if "music.apple.com" in url:
parsed_url = urlparse(url)
path_parts = [p for p in parsed_url.path.split('/') if p]
if not path_parts: raise HTTPException(status_code=400, detail="Invalid Apple Music URL")
country = path_parts[0] if len(path_parts[0]) == 2 else "us"
query_params = parse_qs(parsed_url.query)
lookup_id = query_params.get('i', [path_parts[-1]])[0]
async with httpx.AsyncClient() as client:
response = await client.get(f"https://itunes.apple.com/lookup?id={lookup_id}&country={country}")
if response.status_code == 200 and response.json().get("resultCount", 0) > 0:
result = response.json()["results"][0]
track_title = result.get("trackName", result.get("collectionName", ""))
artist_clean = result.get("artistName", "")
else: raise HTTPException(status_code=404, detail="Track not found on Apple Music")
elif "deezer.com" in url:
match_obj = re.search(r'track/(\d+)', url)
if not match_obj: raise HTTPException(status_code=400, detail="Invalid Deezer URL")
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.deezer.com/track/{match_obj.group(1)}")
data = response.json()
if "error" in data: raise HTTPException(status_code=404, detail="Track not found on Deezer")
track_title, artist_clean = data.get("title", ""), data.get("artist", {}).get("name", "")
elif "http://googleusercontent.com/spotify.com" in url or "open.spotify.com" in url:
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(url, headers=headers)
if response.status_code != 200:
raise HTTPException(status_code=400, detail="Failed to fetch Spotify page")
soup = BeautifulSoup(response.text, 'html.parser')
og_title = soup.find("meta", property="og:title")
og_desc = soup.find("meta", property="og:description")
if not og_title or not og_desc:
raise HTTPException(status_code=404, detail="Could not extract Spotify metadata")
track_title = og_title["content"]
artist_raw = og_desc["content"].split("·")[0]
artist_clean = artist_raw.replace("Listen to ", "").replace(" on Spotify.", "").strip()
elif "qobuz.com" in url:
if not QOBUZ_APP_ID or not QOBUZ_TOKEN:
raise HTTPException(status_code=500, detail="Qobuz API credentials not configured.")
parsed_url = urlparse(url)
path_parts = [p for p in parsed_url.path.split('/') if p]
if not path_parts: raise HTTPException(status_code=400, detail="Invalid Qobuz URL")
q_id, is_track = path_parts[-1], "track/" in parsed_url.path
api_url = f"https://www.qobuz.com/api.json/0.2/{'track/get' if is_track else 'album/get'}"
headers = {"X-App-Id": QOBUZ_APP_ID, "X-User-Auth-Token": QOBUZ_TOKEN, "User-Agent": "Mozilla/5.0"}
async with httpx.AsyncClient(timeout=10.0, trust_env=False) as client:
response = await client.get(api_url, params={"track_id" if is_track else "album_id": q_id}, headers=headers)
data = response.json()
track_title = data.get("title", "")
artist_data = data.get("artist") or data.get("performer") or data.get("album", {}).get("artist")
artist_clean = artist_data.get("name", "") if isinstance(artist_data, dict) else (artist_data if isinstance(artist_data, str) else "")
if not track_title: raise HTTPException(status_code=404, detail="Qobuz track not found")
clean_search_title = clean_title(track_title)
search_query = f"{clean_search_title} {artist_clean}".strip()
yt_task = asyncio.to_thread(search_youtube_sync, search_query, track_title, artist_clean)
tidal_task = search_tidal_async(search_query, track_title, artist_clean)
(best_yt_id, alt_yt_id), tidal_url = await asyncio.gather(yt_task, tidal_task)
return {
"track_title": track_title, "artist": artist_clean, "search_query_used": search_query,
"source_url": url, "youtube_music_url": f"https://music.youtube.com/watch?v={best_yt_id}" if best_yt_id else None,
"youtube_video_url": f"https://www.youtube.com/watch?v={best_yt_id}" if best_yt_id else None,
"alternative_youtube_url": f"https://www.youtube.com/watch?v={alt_yt_id}" if alt_yt_id else None,
"tidal_url": tidal_url
}