| import sys |
| import re |
| import asyncio |
| from supertonic import TTS |
| import base_model |
| import utils |
| class StreamingEngine(base_model.BaseEngine): |
| def __init__(self, name): |
| |
| self.default_voice = "F1" |
| self.voice_mapping = { |
| "alloy": "F1", |
| "echo": "M1", |
| "fable": "M2", |
| "onyx": "M3", |
| "nova": "F2", |
| "shimmer": "F3" |
| } |
| |
| |
| super().__init__(name) |
|
|
| def load_model(self): |
| try: |
| self.tts = TTS(auto_download=True) |
| self.text_processor = self.tts.model.text_processor |
| self.sample_rate = self.tts.sample_rate |
| print(f"Model Loaded. Rate: {self.sample_rate}") |
| except Exception as e: |
| |
| print(f"Error initializing model {self.name}: {e}") |
| raise RuntimeError(f"Failed to load model {self.name}") from e |
|
|
| def get_style_safe(self, voice_name: str): |
| """ |
| Safely retrieves a voice style. |
| """ |
| |
| clean_name = voice_name.lower().strip() |
| target_name = self.voice_mapping.get(clean_name, self.default_voice) |
| print(f"Found voice {target_name}") |
|
|
| try: |
| |
| return self.tts.get_voice_style(voice_name=target_name) |
| except Exception: |
| print(f"WARNING: Voice '{voice_name}' (mapped to '{target_name}') not found. Using '{self.default_voice}'.") |
| |
| |
| try: |
| return self.tts.get_voice_style(voice_name=self.default_voice) |
| except Exception as e: |
| |
| print(f"CRITICAL: Default voice '{self.default_voice}' also failed.") |
| raise e |
|
|
| def preprocess_text(self, text): |
| if not text: |
| return [] |
| |
| split_pattern = r'\n+' |
|
|
|
|
| is_valid, unsupported = self.text_processor.validate_text(text) |
|
|
| if not is_valid: |
| print(f" ⚠️ Contains {len(unsupported)} unsupported character(s): {unsupported[:5]}") |
| |
| pattern = f"[{re.escape(''.join(unsupported))}]" |
| preprocessed = re.sub(pattern, "", text) |
| |
| if preprocessed != text: |
| print(f" After preprocessing: {preprocessed[:50]}...") |
| text = preprocessed |
| else: |
| |
| print(" ✓ All characters supported") |
| |
| chunks = utils.split_text_into_sentences(text, min_chunk_size=150) |
| return chunks |
|
|
| def generate(self, chunks: str, voice_name: str, speed: float): |
| """ |
| Generates audio. |
| Returns: audio_float_array |
| """ |
| |
| |
| |
| |
| |
| audio,_ = self.tts.synthesize(chunks, voice_name) |
| yield audio |