| """ |
| Music Service |
| |
| High-level service for music generation operations. |
| Abstracts all API complexity from the UI layer. |
| """ |
|
|
| from typing import Callable, Optional, List |
| from dataclasses import dataclass, field |
|
|
| from ..api.client import StackNetClient, MediaAction |
|
|
|
|
| @dataclass |
| class MusicClip: |
| """Generated music clip.""" |
| title: str |
| audio_url: str |
| audio_path: Optional[str] = None |
| duration: Optional[str] = None |
| image_url: Optional[str] = None |
| video_url: Optional[str] = None |
| tags: List[str] = field(default_factory=list) |
|
|
|
|
| @dataclass |
| class StemResult: |
| """Extracted audio stems.""" |
| vocals_path: Optional[str] = None |
| drums_path: Optional[str] = None |
| bass_path: Optional[str] = None |
| other_path: Optional[str] = None |
|
|
|
|
| class MusicService: |
| """ |
| Service for music generation and manipulation. |
| |
| Provides clean interfaces for: |
| - Text-to-music generation |
| - Cover song creation |
| - Stem extraction |
| """ |
|
|
| def __init__(self, client: Optional[StackNetClient] = None): |
| self.client = client or StackNetClient() |
|
|
| async def generate_music( |
| self, |
| prompt: str, |
| title: Optional[str] = None, |
| tags: Optional[str] = None, |
| lyrics: Optional[str] = None, |
| instrumental: bool = False, |
| on_progress: Optional[Callable[[float, str], None]] = None |
| ) -> List[MusicClip]: |
| """ |
| Generate original music from a text prompt. |
| |
| Args: |
| prompt: Description of desired music |
| title: Optional song title |
| tags: Optional genre/style tags (comma-separated) |
| lyrics: Optional lyrics (ignored if instrumental=True) |
| instrumental: Generate instrumental only |
| on_progress: Callback for progress updates |
| |
| Returns: |
| List of generated MusicClip objects |
| """ |
| options = {} |
| if tags: |
| options["tags"] = tags |
| if title: |
| options["title"] = title |
| if instrumental: |
| options["make_instrumental"] = True |
| if lyrics and not instrumental: |
| options["lyrics"] = lyrics |
|
|
| result = await self.client.submit_media_task( |
| action=MediaAction.GENERATE_MUSIC, |
| prompt=prompt, |
| options=options if options else None, |
| on_progress=on_progress |
| ) |
|
|
| if not result.success: |
| raise Exception(result.error or "Music generation failed") |
|
|
| return self._parse_music_result(result.data) |
|
|
| async def create_cover( |
| self, |
| audio_url: str, |
| style_prompt: str, |
| title: Optional[str] = None, |
| tags: Optional[str] = None, |
| on_progress: Optional[Callable[[float, str], None]] = None |
| ) -> List[MusicClip]: |
| """ |
| Create a cover version of audio. |
| |
| Args: |
| audio_url: URL to source audio |
| style_prompt: Style/voice direction for the cover |
| title: Optional title for the cover |
| tags: Optional genre/style tags |
| on_progress: Progress callback |
| |
| Returns: |
| List of generated cover clips |
| """ |
| options = {} |
| if tags: |
| options["tags"] = tags |
| if title: |
| options["title"] = title |
|
|
| result = await self.client.submit_media_task( |
| action=MediaAction.CREATE_COVER, |
| audio_url=audio_url, |
| prompt=style_prompt, |
| options=options if options else None, |
| on_progress=on_progress |
| ) |
|
|
| if not result.success: |
| raise Exception(result.error or "Cover creation failed") |
|
|
| return self._parse_music_result(result.data) |
|
|
| async def extract_stems( |
| self, |
| audio_url: str, |
| on_progress: Optional[Callable[[float, str], None]] = None |
| ) -> StemResult: |
| """ |
| Extract stems (vocals, drums, bass, other) from audio. |
| |
| Args: |
| audio_url: URL to source audio |
| on_progress: Progress callback |
| |
| Returns: |
| StemResult with paths to each stem |
| """ |
| result = await self.client.submit_media_task( |
| action=MediaAction.EXTRACT_STEMS, |
| audio_url=audio_url, |
| on_progress=on_progress |
| ) |
|
|
| if not result.success: |
| raise Exception(result.error or "Stem extraction failed") |
|
|
| stems_data = result.data.get("stems", result.data) |
|
|
| stem_result = StemResult() |
|
|
| |
| if stems_data.get("vocals"): |
| stem_result.vocals_path = await self.client.download_file( |
| stems_data["vocals"], "vocals.mp3" |
| ) |
| if stems_data.get("drums"): |
| stem_result.drums_path = await self.client.download_file( |
| stems_data["drums"], "drums.mp3" |
| ) |
| if stems_data.get("bass"): |
| stem_result.bass_path = await self.client.download_file( |
| stems_data["bass"], "bass.mp3" |
| ) |
| if stems_data.get("other"): |
| stem_result.other_path = await self.client.download_file( |
| stems_data["other"], "other.mp3" |
| ) |
|
|
| return stem_result |
|
|
| def _parse_music_result(self, data: dict) -> List[MusicClip]: |
| """Parse API response into MusicClip objects.""" |
| clips = [] |
|
|
| |
| raw_clips = data.get("clips", []) |
|
|
| |
| if not raw_clips: |
| if data.get("audio_url") or data.get("audioUrl"): |
| raw_clips = [data] |
| elif data.get("url"): |
| raw_clips = [{"audio_url": data["url"], "title": data.get("title", "Generated")}] |
|
|
| for clip_data in raw_clips: |
| audio_url = clip_data.get("audio_url") or clip_data.get("audioUrl") or clip_data.get("url") |
| if audio_url: |
| clips.append(MusicClip( |
| title=clip_data.get("title", "Generated Music"), |
| audio_url=audio_url, |
| duration=clip_data.get("duration"), |
| image_url=clip_data.get("image_url") or clip_data.get("imageUrl"), |
| video_url=clip_data.get("video_url") or clip_data.get("videoUrl"), |
| tags=clip_data.get("tags", []) |
| )) |
|
|
| return clips |
|
|
| async def download_clip(self, clip: MusicClip) -> str: |
| """Download a clip's audio to local file.""" |
| if clip.audio_path: |
| return clip.audio_path |
|
|
| filename = f"{clip.title.replace(' ', '_')[:30]}.mp3" |
| clip.audio_path = await self.client.download_file(clip.audio_url, filename) |
| return clip.audio_path |
|
|
| def cleanup(self): |
| """Clean up temporary files.""" |
| self.client.cleanup() |
|
|