Spaces:
Running
Running
| import asyncio | |
| import edge_tts | |
| import pygame | |
| import os | |
| import logging | |
| import shutil | |
| import subprocess | |
| from typing import List, Dict, Optional | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger("EdgeTTS") | |
| class EdgeTextToSpeech: | |
| def __init__(self): | |
| """Initialize Microsoft Edge TTS.""" | |
| # Always set a default voice regardless of audio device availability | |
| self.current_voice = "en-US-AriaNeural" | |
| self.mixer_available = False | |
| try: | |
| pygame.mixer.init(frequency=22050, size=-16, channels=2, buffer=512) | |
| self.mixer_available = True | |
| logger.info("Edge TTS engine initialized successfully (pygame mixer ready)") | |
| except Exception as e: | |
| # In containers there is no audio device; this is expected. We can still save audio files. | |
| logger.warning(f"Pygame mixer not available (no audio device). File generation will still work. Details: {e}") | |
| async def speak_async(self, text: str, voice: str = None): | |
| """Convert text to speech using Edge TTS (async).""" | |
| try: | |
| voice_to_use = voice or self.current_voice | |
| logger.info(f"Speaking with {voice_to_use}: {text}") | |
| # Create TTS communication | |
| communicate = edge_tts.Communicate(text, voice_to_use) | |
| # Save to temporary file in current directory | |
| temp_filename = f"temp_edge_audio_{hash(text) % 10000}.mp3" | |
| await communicate.save(temp_filename) | |
| # Play the audio file if a mixer is available (local/dev only) | |
| if self.mixer_available and pygame.mixer.get_init(): | |
| pygame.mixer.music.load(temp_filename) | |
| pygame.mixer.music.play() | |
| # Wait for playback to finish | |
| while pygame.mixer.music.get_busy(): | |
| await asyncio.sleep(0.1) | |
| # Clean up temporary file | |
| try: | |
| os.remove(temp_filename) | |
| except: | |
| pass | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error in Edge TTS: {e}") | |
| # In server/container contexts, skip local playback fallback. | |
| return False | |
| def speak(self, text: str, voice: str = None): | |
| """Synchronous wrapper for speak_async.""" | |
| return asyncio.run(self.speak_async(text, voice)) | |
| async def save_audio_async(self, text: str, filename: str, voice: str = None) -> Optional[str]: | |
| """Save text-to-speech audio to a file and return the actual saved path. | |
| Primary: Edge TTS (mp3). Fallback: pyttsx3 -> wav, then convert to mp3 if ffmpeg is available. | |
| Returns: | |
| str: Absolute path to the saved file (mp3 or wav) on success. | |
| None: on failure. | |
| """ | |
| try: | |
| voice_to_use = voice or self.current_voice | |
| # Ensure target directory exists | |
| os.makedirs(os.path.dirname(filename), exist_ok=True) | |
| # Prefer mp3 when using Edge TTS | |
| target_path = filename | |
| if not target_path.lower().endswith('.mp3'): | |
| target_path = f"{os.path.splitext(filename)[0]}.mp3" | |
| # Try Edge TTS first | |
| communicate = edge_tts.Communicate(text, voice_to_use) | |
| await communicate.save(target_path) | |
| # Ensure file is fully written | |
| self._wait_for_file(target_path) | |
| logger.info(f"Audio saved to: {target_path}") | |
| return target_path | |
| except Exception as e: | |
| logger.error(f"Error saving audio with Edge TTS: {e}") | |
| # Fallback to local TTS: save WAV via pyttsx3 | |
| try: | |
| wav_path = f"{os.path.splitext(filename)[0]}.wav" | |
| await self._fallback_save_wav_async(text, wav_path) | |
| self._wait_for_file(wav_path) | |
| # Convert to mp3 if ffmpeg exists | |
| if self._has_ffmpeg(): | |
| mp3_path = f"{os.path.splitext(filename)[0]}.mp3" | |
| self._ffmpeg_wav_to_mp3(wav_path, mp3_path) | |
| self._wait_for_file(mp3_path) | |
| # Remove the intermediate wav | |
| try: | |
| os.remove(wav_path) | |
| except Exception: | |
| pass | |
| logger.info(f"Audio saved to: {mp3_path} (fallback via ffmpeg)") | |
| return mp3_path | |
| # If no ffmpeg, keep WAV | |
| logger.info(f"Audio saved to: {wav_path} (fallback WAV; ffmpeg not found)") | |
| return wav_path | |
| except Exception as fe: | |
| logger.error(f"Fallback TTS save failed: {fe}") | |
| return None | |
| def save_audio(self, text: str, filename: str, voice: str = None): | |
| """Synchronous wrapper for save_audio_async. | |
| Returns the actual path to the saved file or None. | |
| """ | |
| return asyncio.run(self.save_audio_async(text, filename, voice)) | |
| async def get_available_voices(self) -> List[Dict]: | |
| """Get list of available voices.""" | |
| try: | |
| voices = await edge_tts.list_voices() | |
| return voices | |
| except Exception as e: | |
| logger.error(f"Error getting voices: {e}") | |
| return [] | |
| def list_voices(self): | |
| """List available voices (synchronous).""" | |
| voices = asyncio.run(self.get_available_voices()) | |
| print("\nAvailable voices:") | |
| print("-" * 50) | |
| # Group by language | |
| lang_groups = {} | |
| for voice in voices: | |
| lang = voice['Locale'] | |
| if lang not in lang_groups: | |
| lang_groups[lang] = [] | |
| lang_groups[lang].append(voice) | |
| # Show popular languages first | |
| priority_langs = ['en-US', 'en-GB', 'en-AU', 'es-ES', 'fr-FR', 'de-DE', 'it-IT', 'ja-JP'] | |
| for lang in priority_langs: | |
| if lang in lang_groups: | |
| print(f"\n{lang}:") | |
| for voice in lang_groups[lang][:3]: # Show first 3 voices per language | |
| gender = voice.get('Gender', 'Unknown') | |
| print(f" {voice['ShortName']} - {voice['FriendlyName']} ({gender})") | |
| print(f"\n... and {len(voices)} total voices available") | |
| return voices | |
| async def _fallback_save_wav_async(self, text: str, wav_path: str): | |
| """Save speech to a WAV file using pyttsx3 in a background thread.""" | |
| def _save(): | |
| import pyttsx3 | |
| engine = pyttsx3.init() | |
| # Prefer eSpeak NG in Linux containers | |
| # engine.setProperty('voice', 'english') # optional | |
| # Optional: adjust rate/voice here if needed | |
| engine.save_to_file(text, wav_path) | |
| engine.runAndWait() | |
| await asyncio.to_thread(_save) | |
| async def _fallback_play_async(self, text: str): | |
| """Play speech locally using pyttsx3 (blocking in thread).""" | |
| # In container/server context, do not attempt local playback | |
| return | |
| def _has_ffmpeg(self) -> bool: | |
| return shutil.which("ffmpeg") is not None | |
| def _ffmpeg_wav_to_mp3(self, wav_path: str, mp3_path: str): | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", wav_path, | |
| "-vn", | |
| "-ar", "22050", | |
| "-ac", "2", | |
| "-b:a", "128k", | |
| mp3_path, | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| except Exception as e: | |
| logger.error(f"ffmpeg conversion failed: {e}") | |
| def _wait_for_file(self, path: str, timeout: float = 3.0): | |
| """Wait until a file exists and has non-zero size or timeout reached.""" | |
| import time | |
| start = time.time() | |
| while time.time() - start < timeout: | |
| try: | |
| if os.path.exists(path) and os.path.getsize(path) > 0: | |
| return | |
| except Exception: | |
| pass | |
| time.sleep(0.05) | |
| def run_interactive_mode(self): | |
| """Run interactive Edge TTS mode.""" | |
| print("\n=== Microsoft Edge Text-to-Speech ===") | |
| print("Commands:") | |
| print(" Type text to speak it") | |
| print(" 'voice <name>' - Change voice (e.g., 'voice en-US-JennyNeural')") | |
| print(" 'voices' - List available voices") | |
| print(" 'save <filename>' - Save last text to file") | |
| print(" 'current' - Show current voice") | |
| print(" 'quit' - Exit program") | |
| print("=" * 38) | |
| last_text = "" | |
| while True: | |
| try: | |
| user_input = input(f"\n[{self.current_voice}] Enter text: ").strip() | |
| if not user_input: | |
| continue | |
| if user_input.lower() == 'quit': | |
| print("Goodbye!") | |
| break | |
| elif user_input.lower() == 'voices': | |
| self.list_voices() | |
| elif user_input.lower() == 'current': | |
| print(f"Current voice: {self.current_voice}") | |
| elif user_input.lower().startswith('voice '): | |
| new_voice = user_input[6:].strip() | |
| if new_voice: | |
| self.current_voice = new_voice | |
| print(f"Voice changed to: {new_voice}") | |
| # Test the new voice | |
| self.speak("Voice changed successfully", new_voice) | |
| else: | |
| print("Please specify a voice name") | |
| elif user_input.lower().startswith('save '): | |
| filename = user_input[5:].strip() | |
| if last_text and filename: | |
| self.save_audio(last_text, filename) | |
| else: | |
| print("No text to save or filename not provided") | |
| else: | |
| # Speak the entered text | |
| last_text = user_input | |
| self.speak(user_input) | |
| except KeyboardInterrupt: | |
| print("\nGoodbye!") | |
| break | |
| except Exception as e: | |
| logger.error(f"Error in interactive mode: {e}") | |
| if __name__ == "__main__": | |
| # Install required packages first | |
| print("Make sure to install required packages:") | |
| print("pip install edge-tts pygame") | |
| print() | |
| # Create Edge TTS instance | |
| tts = EdgeTextToSpeech() | |
| # Test basic functionality | |
| tts.speak("Hello! Microsoft Edge text to speech is working perfectly.") | |
| # Run interactive mode | |
| tts.run_interactive_mode() |