| import io |
| import re |
| import time |
| import asyncio |
| from typing import List, Optional |
| from gtts import gTTS |
| import edge_tts |
| from config.settings import settings |
| from models.schemas import TTSRequest |
|
|
| class EnhancedTTSService: |
| def __init__(self): |
| self.supported_languages = settings.SUPPORTED_LANGUAGES |
| self.max_chunk_length = settings.MAX_CHUNK_LENGTH |
| |
| def detect_language(self, text: str) -> str: |
| """Đơn giản phát hiện ngôn ngữ dựa trên ký tự""" |
| vietnamese_chars = set('àáâãèéêìíòóôõùúýăđĩũơưạảấầẩẫậắằẳẵặẹẻẽếềểễệỉịọỏốồổỗộớờởỡợụủứừửữựỳỵỷỹ') |
| if any(char in vietnamese_chars for char in text.lower()): |
| return 'vi' |
| elif any(char in text for char in 'あいうえお'): |
| return 'ja' |
| elif any(char in text for char in '你好'): |
| return 'zh' |
| elif any(char in text for char in '안녕'): |
| return 'ko' |
| else: |
| return 'en' |
| |
| def split_text_into_chunks(self, text: str, max_length: int = None) -> List[str]: |
| """Chia văn bản thành các đoạn nhỏ cho TTS""" |
| if max_length is None: |
| max_length = self.max_chunk_length |
| |
| sentences = re.split(r'[.!?]+', text) |
| chunks = [] |
| current_chunk = "" |
| |
| for sentence in sentences: |
| sentence = sentence.strip() |
| if not sentence: |
| continue |
| |
| if len(sentence) > max_length: |
| parts = re.split(r'[,;:]', sentence) |
| for part in parts: |
| part = part.strip() |
| if not part: |
| continue |
| if len(current_chunk) + len(part) + 2 <= max_length: |
| if current_chunk: |
| current_chunk += ". " + part |
| else: |
| current_chunk = part |
| else: |
| if current_chunk: |
| chunks.append(current_chunk) |
| current_chunk = part |
| else: |
| if len(current_chunk) + len(sentence) + 2 <= max_length: |
| if current_chunk: |
| current_chunk += ". " + sentence |
| else: |
| current_chunk = sentence |
| else: |
| if current_chunk: |
| chunks.append(current_chunk) |
| current_chunk = sentence |
| |
| if current_chunk: |
| chunks.append(current_chunk) |
| |
| return chunks |
| |
| def text_to_speech_gtts(self, text: str, language: str = 'vi') -> Optional[bytes]: |
| """Sử dụng gTTS (Google Text-to-Speech) library""" |
| try: |
| chunks = self.split_text_into_chunks(text) |
| audio_chunks = [] |
| |
| for chunk in chunks: |
| if not chunk.strip(): |
| continue |
| |
| tts = gTTS(text=chunk, lang=language, slow=False) |
| audio_buffer = io.BytesIO() |
| tts.write_to_fp(audio_buffer) |
| audio_buffer.seek(0) |
| audio_chunks.append(audio_buffer.read()) |
| |
| time.sleep(0.1) |
| |
| if audio_chunks: |
| return b''.join(audio_chunks) |
| return None |
| |
| except Exception as e: |
| print(f"❌ Lỗi gTTS: {e}") |
| return None |
| |
| async def text_to_speech_edgetts(self, text: str, voice: str = 'vi-VN-NamMinhNeural') -> Optional[bytes]: |
| """Sử dụng Edge-TTS (Microsoft Edge) - async version""" |
| try: |
| communicate = edge_tts.Communicate(text, voice) |
| audio_buffer = io.BytesIO() |
| |
| async for chunk in communicate.stream(): |
| if chunk["type"] == "audio": |
| audio_buffer.write(chunk["data"]) |
| |
| audio_buffer.seek(0) |
| return audio_buffer.read() |
| |
| except Exception as e: |
| print(f"❌ Lỗi Edge-TTS: {e}") |
| return None |
| |
| def text_to_speech_edgetts_sync(self, text: str, voice: str = 'vi-VN-NamMinhNeural') -> Optional[bytes]: |
| """Sync wrapper for Edge-TTS""" |
| try: |
| return asyncio.run(self.text_to_speech_edgetts(text, voice)) |
| except Exception as e: |
| print(f"❌ Lỗi Edge-TTS sync: {e}") |
| return None |
| |
| def text_to_speech(self, text: str, language: str = None, provider: str = "auto") -> Optional[bytes]: |
| """Chuyển văn bản thành giọng nói với nhiều nhà cung cấp""" |
| if not text or len(text.strip()) == 0: |
| return None |
| |
| if language is None: |
| language = self.detect_language(text) |
| |
| text = self.clean_text(text) |
| |
| try: |
| if provider == "auto" or provider == "gtts": |
| print(f"🔊 Đang sử dụng gTTS cho văn bản {len(text)} ký tự...") |
| audio_bytes = self.text_to_speech_gtts(text, language) |
| if audio_bytes: |
| return audio_bytes |
| |
| if provider == "auto" or provider == "edgetts": |
| print(f"🔊 Đang thử Edge-TTS cho văn bản {len(text)} ký tự...") |
| voice_map = { |
| 'vi': 'vi-VN-NamMinhNeural', |
| 'en': 'en-US-AriaNeural', |
| 'fr': 'fr-FR-DeniseNeural', |
| 'es': 'es-ES-ElviraNeural', |
| 'de': 'de-DE-KatjaNeural', |
| 'ja': 'ja-JP-NanamiNeural', |
| 'ko': 'ko-KR-SunHiNeural', |
| 'zh': 'zh-CN-XiaoxiaoNeural' |
| } |
| voice = voice_map.get(language, 'vi-VN-NamMinhNeural') |
| audio_bytes = self.text_to_speech_edgetts_sync(text, voice) |
| if audio_bytes: |
| return audio_bytes |
| |
| return self.text_to_speech_gtts(text, language) |
| |
| except Exception as e: |
| print(f"❌ Lỗi TTS tổng hợp: {e}") |
| return None |
| |
| def clean_text(self, text: str) -> str: |
| """Làm sạch văn bản trước khi chuyển thành giọng nói""" |
| text = re.sub(r'http\S+', '', text) |
| text = re.sub(r'[^\w\sàáâãèéêìíòóôõùúýăđĩũơưạảấầẩẫậắằẳẵặẹẻẽếềểễệỉịọỏốồổỗộớờởỡợụủứừửữựỳỵỷỹ.,!?;:()-]', '', text) |
| text = re.sub(r'\s+', ' ', text) |
| return text.strip() |
| |
| def save_audio_to_file(self, audio_bytes: bytes, filename: str = None) -> str: |
| """Lưu audio bytes thành file tạm thời""" |
| if audio_bytes is None: |
| return None |
| |
| if filename is None: |
| filename = f"tts_output_{int(time.time())}.mp3" |
| |
| import os |
| temp_dir = "temp_audio" |
| os.makedirs(temp_dir, exist_ok=True) |
| |
| filepath = os.path.join(temp_dir, filename) |
| with open(filepath, 'wb') as f: |
| f.write(audio_bytes) |
| |
| return filepath |
| def save_tts_audio(self, audio_bytes: bytes, filename: str = None) -> str: |
| """Lưu audio bytes thành file tạm thời - tương thích với chat service""" |
| if audio_bytes is None: |
| return None |
| |
| if filename is None: |
| import time |
| filename = f"tts_output_{int(time.time())}.mp3" |
| |
| import os |
| temp_dir = "temp_audio" |
| os.makedirs(temp_dir, exist_ok=True) |
| |
| filepath = os.path.join(temp_dir, filename) |
| with open(filepath, 'wb') as f: |
| f.write(audio_bytes) |
| |
| return filepath |