from __future__ import annotations import random from typing import Any, Dict, List, Optional try: from ytmusicapi import YTMusic, OAuthCredentials except ImportError: YTMusic = None OAuthCredentials = None class YouTubeMusicError(Exception): def __init__(self, message: str, status_code: int = 500): self.message = message self.status_code = status_code super().__init__(self.message) def _get_ytmusic_client(oauth_file: Optional[str] = None, client_id: Optional[str] = None, client_secret: Optional[str] = None) -> Any: if YTMusic is None: raise YouTubeMusicError( "ytmusicapi is not installed. Install it with: pip install ytmusicapi==1.11.5", status_code=500 ) try: import os from dotenv import load_dotenv load_dotenv() # Get OAuth credentials from environment or parameters oauth_file = oauth_file or os.getenv("YTMUSIC_OAUTH_FILE", "oauth.json") client_id = client_id or os.getenv("YTMUSIC_CLIENT_ID") client_secret = client_secret or os.getenv("YTMUSIC_CLIENT_SECRET") # If OAuth file exists and has credentials, use it if os.path.exists(oauth_file): import json try: with open(oauth_file, 'r') as f: oauth_data = json.load(f) if "oauth_credentials" in oauth_data: if client_id and client_secret: # Use OAuth with Client ID/Secret (required as of Nov 2024) if OAuthCredentials is None: raise YouTubeMusicError("OAuthCredentials not available. Update ytmusicapi.", status_code=500) return YTMusic(oauth_file, oauth_credentials=OAuthCredentials(client_id=client_id, client_secret=client_secret)) else: # Try without credentials (may work for some operations) return YTMusic(oauth_file) except Exception: pass # Fall back to public access (works for search and recommendations) return YTMusic() except Exception as e: raise YouTubeMusicError(f"Failed to initialize YTMusic client: {str(e)}", status_code=500) def search_songs(query: str, limit: int = 20, oauth_file: Optional[str] = None) -> List[Dict[str, Any]]: try: yt = _get_ytmusic_client(oauth_file) results = yt.search(query, filter="songs", limit=limit) songs = [] for item in results: if item.get("resultType") == "song": video_id = item.get("videoId") if not video_id: continue artists = [] if "artists" in item: for artist in item["artists"]: if isinstance(artist, dict): artists.append(artist.get("name", "")) elif isinstance(artist, str): artists.append(artist) album = None if "album" in item and isinstance(item["album"], dict): album = item["album"].get("name") elif isinstance(item.get("album"), str): album = item["album"] thumbnails = item.get("thumbnails", []) image_url = None if thumbnails: image_url = thumbnails[-1].get("url") if isinstance(thumbnails[-1], dict) else None songs.append({ "video_id": video_id, "title": item.get("title", ""), "artists": artists, "album": album, "duration": item.get("duration"), "image_url": image_url, "playlist_id": item.get("playlistId"), }) return songs[:limit] except YouTubeMusicError: raise except Exception as e: raise YouTubeMusicError(f"Search failed: {str(e)}", status_code=500) def get_song_info(video_id: str, oauth_file: Optional[str] = None) -> Dict[str, Any]: try: yt = _get_ytmusic_client(oauth_file) song = yt.get_song(video_id) if not song: raise YouTubeMusicError(f"Song not found: {video_id}", status_code=404) return song except YouTubeMusicError: raise except Exception as e: raise YouTubeMusicError(f"Failed to get song info: {str(e)}", status_code=500) def search_artists(query: str, limit: int = 10, oauth_file: Optional[str] = None) -> List[Dict[str, Any]]: try: yt = _get_ytmusic_client(oauth_file) results = yt.search(query, filter="artists", limit=limit) artists = [] for item in results: if item.get("resultType") == "artist": thumbnails = item.get("thumbnails", []) image_url = None if thumbnails: image_url = thumbnails[-1].get("url") if isinstance(thumbnails[-1], dict) else None artists.append({ "artist_id": item.get("browseId"), "name": item.get("artist", ""), "image_url": image_url, }) return artists[:limit] except YouTubeMusicError: raise except Exception as e: raise YouTubeMusicError(f"Artist search failed: {str(e)}", status_code=500) def get_artist_songs(artist_id: str, limit: int = 50, oauth_file: Optional[str] = None) -> List[Dict[str, Any]]: try: yt = _get_ytmusic_client(oauth_file) artist = yt.get_artist(artist_id) if not artist: raise YouTubeMusicError(f"Artist not found: {artist_id}", status_code=404) songs = [] if "songs" in artist: for song in artist["songs"][:limit]: video_id = song.get("videoId") if not video_id: continue artists = [] if "artists" in song: for artist_info in song["artists"]: if isinstance(artist_info, dict): artists.append(artist_info.get("name", "")) elif isinstance(artist_info, str): artists.append(artist_info) thumbnails = song.get("thumbnails", []) image_url = None if thumbnails: image_url = thumbnails[-1].get("url") if isinstance(thumbnails[-1], dict) else None songs.append({ "video_id": video_id, "title": song.get("title", ""), "artists": artists, "album": song.get("album", {}).get("name") if isinstance(song.get("album"), dict) else None, "duration": song.get("duration"), "image_url": image_url, }) return songs except YouTubeMusicError: raise except Exception as e: raise YouTubeMusicError(f"Failed to get artist songs: {str(e)}", status_code=500) def emotion_to_ytmusic_queries(emotion: str) -> List[str]: emotion = emotion.lower() if emotion in {"joy", "happy", "happiness"}: return [ "happy music", "upbeat songs", "feel good music", "dance music", "party songs", "energetic music", "positive vibes", ] elif emotion in {"sad", "sadness"}: return [ "sad songs", "emotional music", "melancholic songs", "ballads", "heartbreak songs", "calm music", "relaxing music", ] elif emotion in {"anger", "angry"}: return [ "angry music", "rock music", "metal songs", "intense music", "aggressive songs", "punk rock", "hard rock", ] elif emotion in {"fear"}: return [ "calm music", "ambient music", "peaceful songs", "meditation music", "soothing music", "relaxing instrumental", ] elif emotion in {"surprise"}: return [ "surprising music", "unexpected songs", "experimental music", "indie music", "alternative music", "unique songs", ] elif emotion in {"neutral"}: return [ "chill music", "background music", "easy listening", "soft music", "ambient playlist", ] elif emotion in {"disgust"}: return [ "alternative rock", "indie music", "experimental songs", "unique music", ] return ["music", "songs", "popular music"] def recommend_song_for_emotion(emotion: str, source: str = "text", oauth_file: Optional[str] = None) -> Dict[str, Any]: emotion_norm = (emotion or "").lower() queries = emotion_to_ytmusic_queries(emotion_norm) items: List[Dict[str, Any]] = [] for q in queries: try: songs = search_songs(q, limit=10, oauth_file=oauth_file) if songs: items.extend(songs) if len(items) >= 20: break except Exception: continue if not items: try: fallback = search_songs("popular music", limit=5, oauth_file=oauth_file) if fallback: items = fallback except Exception: pass if not items: return {} song = random.choice(items) return { "video_id": song.get("video_id"), "title": song.get("title", ""), "artists": song.get("artists", []), "album": song.get("album"), "duration": song.get("duration"), "image_url": song.get("image_url"), "external_url": f"https://music.youtube.com/watch?v={song.get('video_id')}" if song.get("video_id") else None, }