feat: Multiple updates - HTML optimization, dual API search, text story tuning, Messenger UI
d4f61bc | """ | |
| TTS Handler for Text Story module. | |
| Handles voice generation and audio processing using Kokoro TTS. | |
| """ | |
| import os | |
| import logging | |
| import aiohttp | |
| from pydub import AudioSegment | |
| logger = logging.getLogger(__name__) | |
| class TTSHandler: | |
| """ | |
| Handles Text-to-Speech generation using Kokoro TTS. | |
| Also handles silence trimming and duration detection. | |
| """ | |
| def __init__(self): | |
| self.tts_url = os.getenv("HF_TTS", "") | |
| if not self.tts_url: | |
| logger.warning("TTSHandler: HF_TTS not configured, TTS will fail") | |
| else: | |
| # Remove trailing slash | |
| self.tts_url = self.tts_url.rstrip('/') | |
| logger.info(f"TTSHandler: Using TTS endpoint {self.tts_url}") | |
| async def generate_tts(self, text: str, voice: str, output_path: str) -> float: | |
| """ | |
| Generate TTS audio for text. | |
| Args: | |
| text: Text to speak | |
| voice: Kokoro voice ID (e.g., 'af_heart', 'am_fenrir') | |
| output_path: Path to save WAV file | |
| Returns: | |
| Duration in seconds | |
| """ | |
| if not self.tts_url: | |
| raise ValueError("HF_TTS environment variable not set") | |
| # Correct endpoint format (same as video_creator) | |
| endpoint = f"{self.tts_url}/v1/audio/speech" | |
| logger.info(f"TTS: Generating voice '{voice}' for: {text[:50]}...") | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| # Correct payload format for Kokoro TTS | |
| payload = { | |
| "model": "kokoro", | |
| "input": text, | |
| "voice": voice, | |
| "speed": 1.4 # Faster voice for engaging content | |
| } | |
| async with session.post( | |
| endpoint, | |
| json=payload, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=aiohttp.ClientTimeout(total=120) | |
| ) as response: | |
| if response.status != 200: | |
| error_text = await response.text() | |
| raise Exception(f"TTS API error ({response.status}): {error_text}") | |
| audio_data = await response.read() | |
| logger.info(f"TTS: Received {len(audio_data)} bytes") | |
| # Save raw audio | |
| temp_path = output_path + ".temp.wav" | |
| with open(temp_path, "wb") as f: | |
| f.write(audio_data) | |
| # Trim silence and get duration | |
| duration = self.trim_silence(temp_path, output_path) | |
| # Cleanup temp | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| logger.info(f"TTS: Generated {len(text)} chars, {duration:.2f}s") | |
| return duration | |
| except aiohttp.ClientError as e: | |
| logger.error(f"TTS network error: {type(e).__name__}: {e}") | |
| raise Exception(f"TTS network error: {e}") | |
| except Exception as e: | |
| logger.error(f"TTS generation failed: {type(e).__name__}: {e}") | |
| raise | |
| def trim_silence(self, input_path: str, output_path: str, | |
| silence_thresh: int = -40, min_silence_len: int = 100) -> float: | |
| """ | |
| Trim leading and trailing silence from audio. | |
| Args: | |
| input_path: Input audio file | |
| output_path: Output audio file | |
| silence_thresh: Silence threshold in dB | |
| min_silence_len: Minimum silence length in ms | |
| Returns: | |
| Duration of trimmed audio in seconds | |
| """ | |
| try: | |
| audio = AudioSegment.from_file(input_path) | |
| # Detect non-silent parts | |
| from pydub.silence import detect_nonsilent | |
| nonsilent_ranges = detect_nonsilent( | |
| audio, | |
| min_silence_len=min_silence_len, | |
| silence_thresh=silence_thresh | |
| ) | |
| if nonsilent_ranges: | |
| # Get start and end of non-silent audio | |
| start_ms = max(0, nonsilent_ranges[0][0] - 50) # Add 50ms padding | |
| end_ms = min(len(audio), nonsilent_ranges[-1][1] + 100) # Add 100ms padding | |
| trimmed = audio[start_ms:end_ms] | |
| else: | |
| # No speech detected, use original | |
| trimmed = audio | |
| # Export trimmed audio | |
| trimmed.export(output_path, format="wav") | |
| duration = len(trimmed) / 1000.0 # Convert ms to seconds | |
| return duration | |
| except Exception as e: | |
| logger.error(f"Silence trim failed: {e}") | |
| # Fallback: just copy the file | |
| import shutil | |
| shutil.copy2(input_path, output_path) | |
| audio = AudioSegment.from_file(output_path) | |
| return len(audio) / 1000.0 | |
| def get_duration(self, audio_path: str) -> float: | |
| """Get duration of audio file in seconds.""" | |
| try: | |
| audio = AudioSegment.from_file(audio_path) | |
| return len(audio) / 1000.0 | |
| except Exception as e: | |
| logger.error(f"Failed to get audio duration: {e}") | |
| return 2.0 # Default fallback | |