| from __future__ import annotations |
|
|
| import random |
| from typing import Any, Dict, List, Optional |
|
|
| try: |
| from ytmusicapi import YTMusic, OAuthCredentials |
| except ImportError: |
| YTMusic = None |
| OAuthCredentials = None |
|
|
|
|
| class YouTubeMusicError(Exception): |
| def __init__(self, message: str, status_code: int = 500): |
| self.message = message |
| self.status_code = status_code |
| super().__init__(self.message) |
|
|
|
|
| def _get_ytmusic_client(oauth_file: Optional[str] = None, client_id: Optional[str] = None, client_secret: Optional[str] = None) -> Any: |
| if YTMusic is None: |
| raise YouTubeMusicError( |
| "ytmusicapi is not installed. Install it with: pip install ytmusicapi==1.11.5", |
| status_code=500 |
| ) |
| |
| try: |
| import os |
| from dotenv import load_dotenv |
| load_dotenv() |
| |
| |
| oauth_file = oauth_file or os.getenv("YTMUSIC_OAUTH_FILE", "oauth.json") |
| client_id = client_id or os.getenv("YTMUSIC_CLIENT_ID") |
| client_secret = client_secret or os.getenv("YTMUSIC_CLIENT_SECRET") |
| |
| |
| if os.path.exists(oauth_file): |
| import json |
| try: |
| with open(oauth_file, 'r') as f: |
| oauth_data = json.load(f) |
| if "oauth_credentials" in oauth_data: |
| if client_id and client_secret: |
| |
| if OAuthCredentials is None: |
| raise YouTubeMusicError("OAuthCredentials not available. Update ytmusicapi.", status_code=500) |
| return YTMusic(oauth_file, oauth_credentials=OAuthCredentials(client_id=client_id, client_secret=client_secret)) |
| else: |
| |
| return YTMusic(oauth_file) |
| except Exception: |
| pass |
| |
| |
| return YTMusic() |
| except Exception as e: |
| raise YouTubeMusicError(f"Failed to initialize YTMusic client: {str(e)}", status_code=500) |
|
|
|
|
| def search_songs(query: str, limit: int = 20, oauth_file: Optional[str] = None) -> List[Dict[str, Any]]: |
| try: |
| yt = _get_ytmusic_client(oauth_file) |
| results = yt.search(query, filter="songs", limit=limit) |
| |
| songs = [] |
| for item in results: |
| if item.get("resultType") == "song": |
| video_id = item.get("videoId") |
| if not video_id: |
| continue |
| |
| artists = [] |
| if "artists" in item: |
| for artist in item["artists"]: |
| if isinstance(artist, dict): |
| artists.append(artist.get("name", "")) |
| elif isinstance(artist, str): |
| artists.append(artist) |
| |
| album = None |
| if "album" in item and isinstance(item["album"], dict): |
| album = item["album"].get("name") |
| elif isinstance(item.get("album"), str): |
| album = item["album"] |
| |
| thumbnails = item.get("thumbnails", []) |
| image_url = None |
| if thumbnails: |
| image_url = thumbnails[-1].get("url") if isinstance(thumbnails[-1], dict) else None |
| |
| songs.append({ |
| "video_id": video_id, |
| "title": item.get("title", ""), |
| "artists": artists, |
| "album": album, |
| "duration": item.get("duration"), |
| "image_url": image_url, |
| "playlist_id": item.get("playlistId"), |
| }) |
| |
| return songs[:limit] |
| except YouTubeMusicError: |
| raise |
| except Exception as e: |
| raise YouTubeMusicError(f"Search failed: {str(e)}", status_code=500) |
|
|
|
|
| def get_song_info(video_id: str, oauth_file: Optional[str] = None) -> Dict[str, Any]: |
| try: |
| yt = _get_ytmusic_client(oauth_file) |
| song = yt.get_song(video_id) |
| |
| if not song: |
| raise YouTubeMusicError(f"Song not found: {video_id}", status_code=404) |
| |
| return song |
| except YouTubeMusicError: |
| raise |
| except Exception as e: |
| raise YouTubeMusicError(f"Failed to get song info: {str(e)}", status_code=500) |
|
|
|
|
| def search_artists(query: str, limit: int = 10, oauth_file: Optional[str] = None) -> List[Dict[str, Any]]: |
| try: |
| yt = _get_ytmusic_client(oauth_file) |
| results = yt.search(query, filter="artists", limit=limit) |
| |
| artists = [] |
| for item in results: |
| if item.get("resultType") == "artist": |
| thumbnails = item.get("thumbnails", []) |
| image_url = None |
| if thumbnails: |
| image_url = thumbnails[-1].get("url") if isinstance(thumbnails[-1], dict) else None |
| |
| artists.append({ |
| "artist_id": item.get("browseId"), |
| "name": item.get("artist", ""), |
| "image_url": image_url, |
| }) |
| |
| return artists[:limit] |
| except YouTubeMusicError: |
| raise |
| except Exception as e: |
| raise YouTubeMusicError(f"Artist search failed: {str(e)}", status_code=500) |
|
|
|
|
| def get_artist_songs(artist_id: str, limit: int = 50, oauth_file: Optional[str] = None) -> List[Dict[str, Any]]: |
| try: |
| yt = _get_ytmusic_client(oauth_file) |
| artist = yt.get_artist(artist_id) |
| |
| if not artist: |
| raise YouTubeMusicError(f"Artist not found: {artist_id}", status_code=404) |
| |
| songs = [] |
| if "songs" in artist: |
| for song in artist["songs"][:limit]: |
| video_id = song.get("videoId") |
| if not video_id: |
| continue |
| |
| artists = [] |
| if "artists" in song: |
| for artist_info in song["artists"]: |
| if isinstance(artist_info, dict): |
| artists.append(artist_info.get("name", "")) |
| elif isinstance(artist_info, str): |
| artists.append(artist_info) |
| |
| thumbnails = song.get("thumbnails", []) |
| image_url = None |
| if thumbnails: |
| image_url = thumbnails[-1].get("url") if isinstance(thumbnails[-1], dict) else None |
| |
| songs.append({ |
| "video_id": video_id, |
| "title": song.get("title", ""), |
| "artists": artists, |
| "album": song.get("album", {}).get("name") if isinstance(song.get("album"), dict) else None, |
| "duration": song.get("duration"), |
| "image_url": image_url, |
| }) |
| |
| return songs |
| except YouTubeMusicError: |
| raise |
| except Exception as e: |
| raise YouTubeMusicError(f"Failed to get artist songs: {str(e)}", status_code=500) |
|
|
|
|
| def emotion_to_ytmusic_queries(emotion: str) -> List[str]: |
| emotion = emotion.lower() |
| |
| if emotion in {"joy", "happy", "happiness"}: |
| return [ |
| "happy music", |
| "upbeat songs", |
| "feel good music", |
| "dance music", |
| "party songs", |
| "energetic music", |
| "positive vibes", |
| ] |
| elif emotion in {"sad", "sadness"}: |
| return [ |
| "sad songs", |
| "emotional music", |
| "melancholic songs", |
| "ballads", |
| "heartbreak songs", |
| "calm music", |
| "relaxing music", |
| ] |
| elif emotion in {"anger", "angry"}: |
| return [ |
| "angry music", |
| "rock music", |
| "metal songs", |
| "intense music", |
| "aggressive songs", |
| "punk rock", |
| "hard rock", |
| ] |
| elif emotion in {"fear"}: |
| return [ |
| "calm music", |
| "ambient music", |
| "peaceful songs", |
| "meditation music", |
| "soothing music", |
| "relaxing instrumental", |
| ] |
| elif emotion in {"surprise"}: |
| return [ |
| "surprising music", |
| "unexpected songs", |
| "experimental music", |
| "indie music", |
| "alternative music", |
| "unique songs", |
| ] |
| elif emotion in {"neutral"}: |
| return [ |
| "chill music", |
| "background music", |
| "easy listening", |
| "soft music", |
| "ambient playlist", |
| ] |
| elif emotion in {"disgust"}: |
| return [ |
| "alternative rock", |
| "indie music", |
| "experimental songs", |
| "unique music", |
| ] |
| |
| return ["music", "songs", "popular music"] |
|
|
|
|
| def recommend_song_for_emotion(emotion: str, source: str = "text", oauth_file: Optional[str] = None) -> Dict[str, Any]: |
| emotion_norm = (emotion or "").lower() |
| queries = emotion_to_ytmusic_queries(emotion_norm) |
| |
| items: List[Dict[str, Any]] = [] |
| for q in queries: |
| try: |
| songs = search_songs(q, limit=10, oauth_file=oauth_file) |
| if songs: |
| items.extend(songs) |
| if len(items) >= 20: |
| break |
| except Exception: |
| continue |
| |
| if not items: |
| try: |
| fallback = search_songs("popular music", limit=5, oauth_file=oauth_file) |
| if fallback: |
| items = fallback |
| except Exception: |
| pass |
| |
| if not items: |
| return {} |
| |
| song = random.choice(items) |
| return { |
| "video_id": song.get("video_id"), |
| "title": song.get("title", ""), |
| "artists": song.get("artists", []), |
| "album": song.get("album"), |
| "duration": song.get("duration"), |
| "image_url": song.get("image_url"), |
| "external_url": f"https://music.youtube.com/watch?v={song.get('video_id')}" if song.get("video_id") else None, |
| } |
|
|