| import socket |
| import sys |
| import os |
| import base64 |
| import time |
| import asyncio |
| import httpx |
| import re |
| import traceback |
| import logging |
| import requests |
| from difflib import SequenceMatcher |
| from urllib.parse import urlparse, parse_qs |
| from bs4 import BeautifulSoup |
| from ytmusicapi import YTMusic |
| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel |
| from typing import Optional, Dict, Any |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
| |
| BYPASS_DOMAINS = [ |
| "youtube.com", "music.youtube.com", "googlevideo.com", "youtu.be" |
| ] |
|
|
| try: |
| import dns.resolver |
| _original_getaddrinfo = socket.getaddrinfo |
|
|
| def patched_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): |
| if host and any(domain in host for domain in BYPASS_DOMAINS): |
| try: |
| res = dns.resolver.Resolver() |
| res.nameservers = ['8.8.8.8', '1.1.1.1'] |
| answers = res.resolve(host, 'A') |
| ip_address = answers[0].to_text() |
| return [(socket.AF_INET, type, proto, '', (ip_address, port))] |
| except Exception: |
| return _original_getaddrinfo(host, port, family, type, proto, flags) |
| return _original_getaddrinfo(host, port, family, type, proto, flags) |
|
|
| socket.getaddrinfo = patched_getaddrinfo |
| print(f"[INIT] DNS Bypass installed for {len(BYPASS_DOMAINS)} domains.", file=sys.stderr) |
| except ImportError: |
| print("❌ CRITICAL: dnspython not installed. DNS Bypass failed.", file=sys.stderr) |
| |
|
|
| |
| app = FastAPI() |
| ytmusic = YTMusic() |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
|
|
| class SearchRequest(BaseModel): |
| query: str |
|
|
| class MatchRequest(BaseModel): |
| url: str |
|
|
| class MatchResponse(BaseModel): |
| url: str |
| filename: str |
| track_id: str |
|
|
| |
| |
| |
| TIDAL_CLIENT_ID = os.getenv('TIDAL_CLIENT_ID') |
| TIDAL_CLIENT_SECRET = os.getenv('TIDAL_CLIENT_SECRET') |
| QOBUZ_APP_ID = os.getenv('QOBUZ_APP_ID') |
| QOBUZ_TOKEN = os.getenv('QOBUZ_TOKEN') |
|
|
| if not all([TIDAL_CLIENT_ID, TIDAL_CLIENT_SECRET]): |
| print("⚠️ Warning: TIDAL_CLIENT_ID or TIDAL_CLIENT_SECRET not found in environment. Tidal search will fail.", file=sys.stderr) |
| if not all([QOBUZ_APP_ID, QOBUZ_TOKEN]): |
| print("⚠️ Warning: QOBUZ_APP_ID or QOBUZ_TOKEN not found in environment. Qobuz lookup will fail.", file=sys.stderr) |
|
|
| cached_tidal_token = None |
| token_expiry_time = 0 |
|
|
| def similar(a: str, b: str) -> float: |
| return SequenceMatcher(None, a.lower(), b.lower()).ratio() |
|
|
| def clean_title(title: str) -> str: |
| title = re.sub(r'(?i)\s*-\s*(single|ep)$', '', title) |
| title = re.sub(r'\s*\([^)]*\)', '', title) |
| return title.strip() |
|
|
| async def get_tidal_access_token(): |
| global cached_tidal_token, token_expiry_time |
| if not TIDAL_CLIENT_ID or not TIDAL_CLIENT_SECRET: |
| return None |
| |
| if cached_tidal_token and time.time() < (token_expiry_time - 60): |
| return cached_tidal_token |
|
|
| try: |
| b64_creds = base64.b64encode(f"{TIDAL_CLIENT_ID}:{TIDAL_CLIENT_SECRET}".encode()).decode() |
| async with httpx.AsyncClient() as client: |
| response = await client.post( |
| "https://auth.tidal.com/v1/oauth2/token", |
| headers={"Authorization": f"Basic {b64_creds}"}, |
| data={"grant_type": "client_credentials"} |
| ) |
| if response.status_code == 200: |
| data = response.json() |
| cached_tidal_token = data.get("access_token") |
| token_expiry_time = time.time() + data.get("expires_in", 3600) |
| return cached_tidal_token |
| except Exception as e: |
| print(f"Tidal Auth Error: {e}") |
| return None |
|
|
| def search_youtube_sync(search_query: str, target_title: str, target_artist: str): |
| """Searches YouTube and returns the BEST match and the SECOND BEST (alternative) match.""" |
| try: |
| yt_results = ytmusic.search(search_query, filter="songs") |
| if not yt_results: |
| return None, None |
|
|
| scored_results = [] |
| for result in yt_results[:5]: |
| yt_title = result.get('title', '') |
| yt_artists = " ".join([a.get('name', '') for a in result.get('artists', [])]) |
| title_score = similar(clean_title(target_title), clean_title(yt_title)) |
| artist_score = similar(target_artist, yt_artists) |
| total_score = (title_score * 0.6) + (artist_score * 0.4) |
| if title_score > 0.8: |
| total_score += 0.2 |
| scored_results.append((total_score, result['videoId'])) |
| scored_results.sort(key=lambda x: x[0], reverse=True) |
| best_id = scored_results[0][1] if scored_results else None |
| alt_id = scored_results[1][1] if len(scored_results) > 1 else None |
| return best_id, alt_id |
| except Exception as e: |
| print(f"YouTube search error: {e}") |
| return None, None |
|
|
| async def search_tidal_async(search_query: str, target_title: str, target_artist: str): |
| access_token = await get_tidal_access_token() |
| if not access_token: |
| return None |
| async with httpx.AsyncClient() as client: |
| try: |
| search_response = await client.get( |
| "https://api.tidal.com/v1/search", |
| headers={"Authorization": f"Bearer {access_token}"}, |
| params={"query": search_query, "types": "TRACKS", "countryCode": "US", "limit": 5} |
| ) |
| if search_response.status_code == 200: |
| tidal_data = search_response.json() |
| if "tracks" in tidal_data and tidal_data["tracks"]["items"]: |
| best_match_id = None |
| highest_score = 0.0 |
| for track in tidal_data["tracks"]["items"]: |
| t_title = track.get("title", "") |
| t_artists = " ".join([a.get("name", "") for a in track.get("artists", [])]) |
| title_score = similar(clean_title(target_title), clean_title(t_title)) |
| artist_score = similar(target_artist, t_artists) |
| total_score = (title_score * 0.6) + (artist_score * 0.4) |
| if title_score > 0.8: total_score += 0.2 |
| if total_score > highest_score: |
| highest_score = total_score |
| best_match_id = track["id"] |
| if highest_score < 0.4: return None |
| if best_match_id: return f"https://tidal.com/browse/track/{best_match_id}" |
| except Exception as e: |
| print(f"Tidal search error: {e}") |
| return None |
|
|
| |
|
|
| def extract_amazon_track_id(url: str) -> Optional[str]: |
| if "music.amazon.com" not in url: return None |
| parsed_url = urlparse(url) |
| query_params = parse_qs(parsed_url.query) |
| if "trackAsin" in query_params: return query_params["trackAsin"][0] |
| path_parts = parsed_url.path.split('/') |
| if "tracks" in path_parts: |
| try: |
| idx = path_parts.index("tracks") + 1 |
| if idx < len(path_parts): return path_parts[idx] |
| except (ValueError, IndexError): pass |
| return None |
|
|
| def get_song_link_info(url: str) -> Optional[Dict[str, Any]]: |
| api_base_url = "https://api.song.link/v1-alpha.1/links" |
| params = {"userCountry": "US"} |
| if "music.amazon.com" in url: |
| track_id = extract_amazon_track_id(url) |
| if track_id: |
| params["platform"] = "amazonMusic" |
| params["id"] = track_id |
| params["type"] = "song" |
| else: params["url"] = url |
| else: params["url"] = url |
| try: |
| response = requests.get(api_base_url, params=params, timeout=10) |
| response.raise_for_status() |
| return response.json() |
| except Exception as e: |
| logger.error(f"Error fetching from Song.link API: {e}") |
| return None |
|
|
| def extract_url_from_songlink(links_by_platform: dict, platform: str) -> Optional[str]: |
| if platform in links_by_platform and links_by_platform[platform].get("url"): |
| return links_by_platform[platform]["url"] |
| return None |
|
|
| |
|
|
| @app.get("/") |
| async def root(): |
| return {"message": "Combined Music API is running. Use /convert, /match, or /searcht."} |
|
|
| @app.post("/searcht") |
| async def searcht(request: SearchRequest): |
| logger.info(f"search query: {request.query}") |
| search_results = ytmusic.search(request.query, filter="songs") |
| first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {} |
| return first_song |
|
|
| @app.post("/match", response_model=MatchResponse) |
| async def match(request: MatchRequest): |
| track_url = request.url |
| logger.info(f"Match endpoint: Processing URL: {track_url}") |
| track_info = get_song_link_info(track_url) |
| if not track_info: |
| raise HTTPException(status_code=404, detail="Could not fetch track info from Song.link API.") |
|
|
| entity_unique_id = track_info.get("entityUniqueId") |
| title, artist = None, None |
| if entity_unique_id and entity_unique_id in track_info.get("entitiesByUniqueId", {}): |
| ent = track_info["entitiesByUniqueId"][entity_unique_id] |
| title, artist = ent.get("title"), ent.get("artistName") |
| else: |
| for _, edata in track_info.get("entitiesByUniqueId", {}).items(): |
| if edata.get("title") and edata.get("artistName"): |
| title, artist = edata.get("title"), edata.get("artistName") |
| break |
|
|
| if not title or not artist: |
| raise HTTPException(status_code=404, detail="Could not determine title and artist.") |
|
|
| youtube_url = extract_url_from_songlink(track_info.get("linksByPlatform", {}), "youtube") |
| if youtube_url: |
| video_id = None |
| if "v=" in youtube_url: video_id = youtube_url.split("v=")[1].split("&")[0] |
| elif "youtu.be/" in youtube_url: video_id = youtube_url.split("youtu.be/")[1].split("?")[0] |
| return MatchResponse(url=youtube_url, filename=f"{title} - {artist}", track_id=video_id) |
| else: |
| search_query = f'{title} {artist}' |
| search_results = ytmusic.search(search_query, filter="songs") |
| if search_results: |
| first = next((song for song in search_results if song.get('videoId')), None) |
| if first: |
| v_id = first["videoId"] |
| ym_url = f'https://music.youtube.com/watch?v={v_id}' |
| a_name = first['artists'][0]['name'] if first.get('artists') else artist |
| return MatchResponse(filename=f"{first.get('title', title)} - {a_name}", url=ym_url, track_id=v_id) |
| raise HTTPException(status_code=404, detail="No matching video found on YouTube Music.") |
|
|
| @app.get("/convert") |
| async def convert_url_to_all(url: str): |
| track_title, artist_clean = "", "" |
| if "music.apple.com" in url: |
| parsed_url = urlparse(url) |
| path_parts = [p for p in parsed_url.path.split('/') if p] |
| if not path_parts: raise HTTPException(status_code=400, detail="Invalid Apple Music URL") |
| country = path_parts[0] if len(path_parts[0]) == 2 else "us" |
| query_params = parse_qs(parsed_url.query) |
| lookup_id = query_params.get('i', [path_parts[-1]])[0] |
| async with httpx.AsyncClient() as client: |
| response = await client.get(f"https://itunes.apple.com/lookup?id={lookup_id}&country={country}") |
| if response.status_code == 200 and response.json().get("resultCount", 0) > 0: |
| result = response.json()["results"][0] |
| track_title = result.get("trackName", result.get("collectionName", "")) |
| artist_clean = result.get("artistName", "") |
| else: raise HTTPException(status_code=404, detail="Track not found on Apple Music") |
| elif "deezer.com" in url: |
| match_obj = re.search(r'track/(\d+)', url) |
| if not match_obj: raise HTTPException(status_code=400, detail="Invalid Deezer URL") |
| async with httpx.AsyncClient() as client: |
| response = await client.get(f"https://api.deezer.com/track/{match_obj.group(1)}") |
| data = response.json() |
| if "error" in data: raise HTTPException(status_code=404, detail="Track not found on Deezer") |
| track_title, artist_clean = data.get("title", ""), data.get("artist", {}).get("name", "") |
| elif "http://googleusercontent.com/spotify.com" in url or "open.spotify.com" in url: |
| headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} |
| async with httpx.AsyncClient(follow_redirects=True) as client: |
| response = await client.get(url, headers=headers) |
| |
| if response.status_code != 200: |
| raise HTTPException(status_code=400, detail="Failed to fetch Spotify page") |
| |
| soup = BeautifulSoup(response.text, 'html.parser') |
| og_title = soup.find("meta", property="og:title") |
| og_desc = soup.find("meta", property="og:description") |
| |
| if not og_title or not og_desc: |
| raise HTTPException(status_code=404, detail="Could not extract Spotify metadata") |
| |
| track_title = og_title["content"] |
| artist_raw = og_desc["content"].split("·")[0] |
| artist_clean = artist_raw.replace("Listen to ", "").replace(" on Spotify.", "").strip() |
| elif "qobuz.com" in url: |
| if not QOBUZ_APP_ID or not QOBUZ_TOKEN: |
| raise HTTPException(status_code=500, detail="Qobuz API credentials not configured.") |
| parsed_url = urlparse(url) |
| path_parts = [p for p in parsed_url.path.split('/') if p] |
| if not path_parts: raise HTTPException(status_code=400, detail="Invalid Qobuz URL") |
| q_id, is_track = path_parts[-1], "track/" in parsed_url.path |
| api_url = f"https://www.qobuz.com/api.json/0.2/{'track/get' if is_track else 'album/get'}" |
| headers = {"X-App-Id": QOBUZ_APP_ID, "X-User-Auth-Token": QOBUZ_TOKEN, "User-Agent": "Mozilla/5.0"} |
| async with httpx.AsyncClient(timeout=10.0, trust_env=False) as client: |
| response = await client.get(api_url, params={"track_id" if is_track else "album_id": q_id}, headers=headers) |
| data = response.json() |
| track_title = data.get("title", "") |
| artist_data = data.get("artist") or data.get("performer") or data.get("album", {}).get("artist") |
| artist_clean = artist_data.get("name", "") if isinstance(artist_data, dict) else (artist_data if isinstance(artist_data, str) else "") |
| if not track_title: raise HTTPException(status_code=404, detail="Qobuz track not found") |
|
|
| clean_search_title = clean_title(track_title) |
| search_query = f"{clean_search_title} {artist_clean}".strip() |
| yt_task = asyncio.to_thread(search_youtube_sync, search_query, track_title, artist_clean) |
| tidal_task = search_tidal_async(search_query, track_title, artist_clean) |
| (best_yt_id, alt_yt_id), tidal_url = await asyncio.gather(yt_task, tidal_task) |
|
|
| return { |
| "track_title": track_title, "artist": artist_clean, "search_query_used": search_query, |
| "source_url": url, "youtube_music_url": f"https://music.youtube.com/watch?v={best_yt_id}" if best_yt_id else None, |
| "youtube_video_url": f"https://www.youtube.com/watch?v={best_yt_id}" if best_yt_id else None, |
| "alternative_youtube_url": f"https://www.youtube.com/watch?v={alt_yt_id}" if alt_yt_id else None, |
| "tidal_url": tidal_url |
| } |
|
|