Classroom-Ai-Assistant / backend /TextToVoice.py
0xarchit's picture
added driver
45e9602
import asyncio
import edge_tts
import pygame
import os
import logging
import shutil
import subprocess
from typing import List, Dict, Optional
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("EdgeTTS")
class EdgeTextToSpeech:
def __init__(self):
"""Initialize Microsoft Edge TTS."""
# Always set a default voice regardless of audio device availability
self.current_voice = "en-US-AriaNeural"
self.mixer_available = False
try:
pygame.mixer.init(frequency=22050, size=-16, channels=2, buffer=512)
self.mixer_available = True
logger.info("Edge TTS engine initialized successfully (pygame mixer ready)")
except Exception as e:
# In containers there is no audio device; this is expected. We can still save audio files.
logger.warning(f"Pygame mixer not available (no audio device). File generation will still work. Details: {e}")
async def speak_async(self, text: str, voice: str = None):
"""Convert text to speech using Edge TTS (async)."""
try:
voice_to_use = voice or self.current_voice
logger.info(f"Speaking with {voice_to_use}: {text}")
# Create TTS communication
communicate = edge_tts.Communicate(text, voice_to_use)
# Save to temporary file in current directory
temp_filename = f"temp_edge_audio_{hash(text) % 10000}.mp3"
await communicate.save(temp_filename)
# Play the audio file if a mixer is available (local/dev only)
if self.mixer_available and pygame.mixer.get_init():
pygame.mixer.music.load(temp_filename)
pygame.mixer.music.play()
# Wait for playback to finish
while pygame.mixer.music.get_busy():
await asyncio.sleep(0.1)
# Clean up temporary file
try:
os.remove(temp_filename)
except:
pass
return True
except Exception as e:
logger.error(f"Error in Edge TTS: {e}")
# In server/container contexts, skip local playback fallback.
return False
def speak(self, text: str, voice: str = None):
"""Synchronous wrapper for speak_async."""
return asyncio.run(self.speak_async(text, voice))
async def save_audio_async(self, text: str, filename: str, voice: str = None) -> Optional[str]:
"""Save text-to-speech audio to a file and return the actual saved path.
Primary: Edge TTS (mp3). Fallback: pyttsx3 -> wav, then convert to mp3 if ffmpeg is available.
Returns:
str: Absolute path to the saved file (mp3 or wav) on success.
None: on failure.
"""
try:
voice_to_use = voice or self.current_voice
# Ensure target directory exists
os.makedirs(os.path.dirname(filename), exist_ok=True)
# Prefer mp3 when using Edge TTS
target_path = filename
if not target_path.lower().endswith('.mp3'):
target_path = f"{os.path.splitext(filename)[0]}.mp3"
# Try Edge TTS first
communicate = edge_tts.Communicate(text, voice_to_use)
await communicate.save(target_path)
# Ensure file is fully written
self._wait_for_file(target_path)
logger.info(f"Audio saved to: {target_path}")
return target_path
except Exception as e:
logger.error(f"Error saving audio with Edge TTS: {e}")
# Fallback to local TTS: save WAV via pyttsx3
try:
wav_path = f"{os.path.splitext(filename)[0]}.wav"
await self._fallback_save_wav_async(text, wav_path)
self._wait_for_file(wav_path)
# Convert to mp3 if ffmpeg exists
if self._has_ffmpeg():
mp3_path = f"{os.path.splitext(filename)[0]}.mp3"
self._ffmpeg_wav_to_mp3(wav_path, mp3_path)
self._wait_for_file(mp3_path)
# Remove the intermediate wav
try:
os.remove(wav_path)
except Exception:
pass
logger.info(f"Audio saved to: {mp3_path} (fallback via ffmpeg)")
return mp3_path
# If no ffmpeg, keep WAV
logger.info(f"Audio saved to: {wav_path} (fallback WAV; ffmpeg not found)")
return wav_path
except Exception as fe:
logger.error(f"Fallback TTS save failed: {fe}")
return None
def save_audio(self, text: str, filename: str, voice: str = None):
"""Synchronous wrapper for save_audio_async.
Returns the actual path to the saved file or None.
"""
return asyncio.run(self.save_audio_async(text, filename, voice))
async def get_available_voices(self) -> List[Dict]:
"""Get list of available voices."""
try:
voices = await edge_tts.list_voices()
return voices
except Exception as e:
logger.error(f"Error getting voices: {e}")
return []
def list_voices(self):
"""List available voices (synchronous)."""
voices = asyncio.run(self.get_available_voices())
print("\nAvailable voices:")
print("-" * 50)
# Group by language
lang_groups = {}
for voice in voices:
lang = voice['Locale']
if lang not in lang_groups:
lang_groups[lang] = []
lang_groups[lang].append(voice)
# Show popular languages first
priority_langs = ['en-US', 'en-GB', 'en-AU', 'es-ES', 'fr-FR', 'de-DE', 'it-IT', 'ja-JP']
for lang in priority_langs:
if lang in lang_groups:
print(f"\n{lang}:")
for voice in lang_groups[lang][:3]: # Show first 3 voices per language
gender = voice.get('Gender', 'Unknown')
print(f" {voice['ShortName']} - {voice['FriendlyName']} ({gender})")
print(f"\n... and {len(voices)} total voices available")
return voices
async def _fallback_save_wav_async(self, text: str, wav_path: str):
"""Save speech to a WAV file using pyttsx3 in a background thread."""
def _save():
import pyttsx3
engine = pyttsx3.init()
# Prefer eSpeak NG in Linux containers
# engine.setProperty('voice', 'english') # optional
# Optional: adjust rate/voice here if needed
engine.save_to_file(text, wav_path)
engine.runAndWait()
await asyncio.to_thread(_save)
async def _fallback_play_async(self, text: str):
"""Play speech locally using pyttsx3 (blocking in thread)."""
# In container/server context, do not attempt local playback
return
def _has_ffmpeg(self) -> bool:
return shutil.which("ffmpeg") is not None
def _ffmpeg_wav_to_mp3(self, wav_path: str, mp3_path: str):
cmd = [
"ffmpeg", "-y",
"-i", wav_path,
"-vn",
"-ar", "22050",
"-ac", "2",
"-b:a", "128k",
mp3_path,
]
try:
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except Exception as e:
logger.error(f"ffmpeg conversion failed: {e}")
def _wait_for_file(self, path: str, timeout: float = 3.0):
"""Wait until a file exists and has non-zero size or timeout reached."""
import time
start = time.time()
while time.time() - start < timeout:
try:
if os.path.exists(path) and os.path.getsize(path) > 0:
return
except Exception:
pass
time.sleep(0.05)
def run_interactive_mode(self):
"""Run interactive Edge TTS mode."""
print("\n=== Microsoft Edge Text-to-Speech ===")
print("Commands:")
print(" Type text to speak it")
print(" 'voice <name>' - Change voice (e.g., 'voice en-US-JennyNeural')")
print(" 'voices' - List available voices")
print(" 'save <filename>' - Save last text to file")
print(" 'current' - Show current voice")
print(" 'quit' - Exit program")
print("=" * 38)
last_text = ""
while True:
try:
user_input = input(f"\n[{self.current_voice}] Enter text: ").strip()
if not user_input:
continue
if user_input.lower() == 'quit':
print("Goodbye!")
break
elif user_input.lower() == 'voices':
self.list_voices()
elif user_input.lower() == 'current':
print(f"Current voice: {self.current_voice}")
elif user_input.lower().startswith('voice '):
new_voice = user_input[6:].strip()
if new_voice:
self.current_voice = new_voice
print(f"Voice changed to: {new_voice}")
# Test the new voice
self.speak("Voice changed successfully", new_voice)
else:
print("Please specify a voice name")
elif user_input.lower().startswith('save '):
filename = user_input[5:].strip()
if last_text and filename:
self.save_audio(last_text, filename)
else:
print("No text to save or filename not provided")
else:
# Speak the entered text
last_text = user_input
self.speak(user_input)
except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
logger.error(f"Error in interactive mode: {e}")
if __name__ == "__main__":
# Install required packages first
print("Make sure to install required packages:")
print("pip install edge-tts pygame")
print()
# Create Edge TTS instance
tts = EdgeTextToSpeech()
# Test basic functionality
tts.speak("Hello! Microsoft Edge text to speech is working perfectly.")
# Run interactive mode
tts.run_interactive_mode()