| import os |
| import tempfile |
| import uuid |
| import time |
| import shutil |
| from datetime import datetime |
| from typing import List, Optional, Dict |
| from pathlib import Path |
|
|
| import requests |
| from fastapi import FastAPI, HTTPException, Form, UploadFile, File |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| import torch |
| import numpy as np |
|
|
| |
| os.makedirs("/tmp/voices", exist_ok=True) |
| os.makedirs("/tmp/output", exist_ok=True) |
|
|
| |
| app = FastAPI(title="Multi-Language TTS API", description="API for text-to-speech with English and Chinese support") |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| OCI_UPLOAD_API_URL = os.getenv("OCI_UPLOAD_API_URL", "https://yukee1992-oci-video-storage.hf.space") |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| print(f"✅ Using device: {DEVICE}") |
|
|
| |
| AVAILABLE_MODELS = { |
| "tacotron2-ddc": { |
| "name": "Tacotron2-DDC", |
| "model_name": "tts_models/en/ljspeech/tacotron2-DDC", |
| "description": "High-quality English TTS", |
| "languages": ["en"], |
| "voice_cloning": False, |
| "size_mb": 150, |
| "quality": "excellent", |
| "multi_speaker": False |
| }, |
| "fastspeech2": { |
| "name": "FastSpeech2-Mandarin", |
| "model_name": "tts_models/zh-CN/baker/fastspeech2", |
| "description": "High-quality Chinese TTS", |
| "languages": ["zh"], |
| "voice_cloning": False, |
| "size_mb": 120, |
| "quality": "excellent", |
| "multi_speaker": False |
| } |
| } |
|
|
| |
| VOICE_STYLES = { |
| |
| "default": { |
| "name": "Default English Voice", |
| "description": "Clear and natural English voice", |
| "gender": "neutral", |
| "language": "en", |
| "recommended_model": "tacotron2-ddc" |
| }, |
| "clear": { |
| "name": "Clear English Voice", |
| "description": "Very clear and articulate English voice", |
| "gender": "neutral", |
| "language": "en", |
| "recommended_model": "tacotron2-ddc" |
| }, |
| "professional": { |
| "name": "Professional English Voice", |
| "description": "Professional and authoritative English voice", |
| "gender": "neutral", |
| "language": "en", |
| "recommended_model": "tacotron2-ddc" |
| }, |
| |
| |
| "chinese_default": { |
| "name": "默认中文语音", |
| "description": "清晰自然的中文语音", |
| "gender": "neutral", |
| "language": "zh", |
| "recommended_model": "fastspeech2" |
| }, |
| "chinese_clear": { |
| "name": "清晰中文语音", |
| "description": "非常清晰和标准的中文语音", |
| "gender": "neutral", |
| "language": "zh", |
| "recommended_model": "fastspeech2" |
| }, |
| "chinese_professional": { |
| "name": "专业中文语音", |
| "description": "专业和正式的中文语音", |
| "gender": "neutral", |
| "language": "zh", |
| "recommended_model": "fastspeech2" |
| } |
| } |
|
|
| |
| tts = None |
| model_loaded = False |
| current_model = "" |
| model_loading = False |
|
|
| |
| class TTSRequest(BaseModel): |
| text: str |
| project_id: str |
| voice_style: Optional[str] = "default" |
| speed: Optional[float] = 1.0 |
| language: Optional[str] = "auto" |
|
|
| class BatchTTSRequest(BaseModel): |
| texts: List[str] |
| project_id: str |
| voice_style: Optional[str] = "default" |
| speed: Optional[float] = 1.0 |
| language: Optional[str] = "auto" |
|
|
| |
| def detect_language(text: str) -> str: |
| """Detect if text is Chinese or English""" |
| import re |
| |
| |
| chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) |
| total_chars = len(text.strip()) |
| |
| if total_chars == 0: |
| return "en" |
| |
| chinese_ratio = chinese_chars / total_chars |
| |
| |
| if chinese_ratio > 0.3: |
| return "zh" |
| else: |
| return "en" |
|
|
| |
| def get_model_for_voice_style(voice_style: str, language: str = "auto") -> str: |
| """Determine which model to use based on voice style and language""" |
| if voice_style in VOICE_STYLES: |
| return VOICE_STYLES[voice_style].get("recommended_model", "tacotron2-ddc") |
| |
| |
| if language == "zh": |
| return "fastspeech2" |
| else: |
| return "tacotron2-ddc" |
|
|
| |
| def cleanup_old_files(): |
| """Clean up old files to free up space""" |
| try: |
| |
| output_dir = Path("/tmp/output") |
| if output_dir.exists(): |
| for file in output_dir.glob("*.wav"): |
| if file.stat().st_mtime < time.time() - 3600: |
| file.unlink() |
| print(f"🧹 Cleaned up old file: {file}") |
| |
| |
| voices_dir = Path("/tmp/voices") |
| if voices_dir.exists(): |
| for file in voices_dir.rglob("*.wav"): |
| if file.stat().st_mtime < time.time() - 86400: |
| file.unlink() |
| print(f"🧹 Cleaned up old voice file: {file}") |
| |
| |
| check_storage_usage() |
| |
| except Exception as e: |
| print(f"⚠️ Cleanup error: {e}") |
|
|
| def check_storage_usage(): |
| """Check and log storage usage""" |
| try: |
| import shutil |
| |
| |
| total, used, free = shutil.disk_usage("/tmp") |
| print(f"💾 Storage: {free // (2**30)}GB free of {total // (2**30)}GB total") |
| |
| |
| if free < 2 * (2**30): |
| print("🚨 WARNING: Low storage space!") |
| return False |
| return True |
| |
| except Exception as e: |
| print(f"⚠️ Storage check error: {e}") |
| return True |
|
|
| |
| def clean_text(text, language="auto"): |
| """Clean text for TTS generation with language support""" |
| import re |
| |
| if not text or not isinstance(text, str): |
| return "Hello" if language != "zh" else "你好" |
| |
| |
| if language == "auto": |
| language = detect_language(text) |
| |
| |
| if language == "zh": |
| |
| text = re.sub(r'[^\w\s\.\,\!\?\-\'\"\:\;\u4e00-\u9fff\u3000-\u303f\uff00-\uffef]', '', text) |
| else: |
| |
| text = re.sub(r'[^\w\s\.\,\!\?\-\'\"\:\;]', '', text) |
| |
| text = re.sub(r'\s+', ' ', text) |
| |
| |
| if len(text) > 10 and not re.search(r'[\.\!\?。!?]$', text): |
| if language == "zh": |
| text = text + '。' |
| else: |
| text = text + '.' |
| |
| text = text.strip() |
| |
| if not text: |
| text = "Hello world" if language != "zh" else "你好世界" |
| |
| return text |
|
|
| def upload_to_oci(file_path: str, filename: str, project_id: str, file_type="voiceover"): |
| """Upload file to OCI""" |
| try: |
| if not OCI_UPLOAD_API_URL: |
| return None, "OCI upload API URL not configured" |
| |
| url = f"{OCI_UPLOAD_API_URL}/api/upload" |
| |
| with open(file_path, "rb") as f: |
| files = {"file": (filename, f, "audio/wav")} |
| data = { |
| "project_id": project_id, |
| "subfolder": "voiceover" |
| } |
| |
| response = requests.post(url, files=files, data=data, timeout=30) |
| |
| if response.status_code == 200: |
| result = response.json() |
| if result.get("status") == "success": |
| return result, None |
| else: |
| return None, result.get("message", "Upload failed") |
| else: |
| return None, f"Upload failed with status {response.status_code}" |
| |
| except Exception as e: |
| return None, f"Upload error: {str(e)}" |
|
|
| |
| def load_tts_model(model_type="tacotron2-ddc"): |
| """Load TTS model with storage optimization""" |
| global tts, model_loaded, current_model, model_loading |
| |
| if model_loading: |
| print("⏳ Model is already being loaded...") |
| return False |
| |
| if model_type not in AVAILABLE_MODELS: |
| print(f"❌ Model type '{model_type}' not found.") |
| return False |
| |
| |
| if model_loaded and current_model == model_type: |
| print(f"✅ Model {model_type} is already loaded") |
| return True |
| |
| model_loading = True |
| |
| try: |
| |
| cleanup_old_files() |
| |
| |
| try: |
| from TTS.api import TTS |
| except ImportError as e: |
| print(f"❌ TTS import failed: {e}") |
| return False |
| |
| |
| import sys |
| from io import StringIO |
| |
| old_stdin = sys.stdin |
| sys.stdin = StringIO('y\n') |
| |
| try: |
| model_config = AVAILABLE_MODELS[model_type] |
| print(f"🚀 Loading {model_config['name']}...") |
| print(f" Languages: {', '.join(model_config['languages'])}") |
| |
| |
| if tts is not None: |
| print("🧹 Clearing previous model from memory...") |
| del tts |
| import gc |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| |
| |
| try: |
| tts = TTS(model_config["model_name"]).to(DEVICE) |
| except Exception as e: |
| print(f"❌ TTS initialization failed: {e}") |
| |
| try: |
| tts = TTS(model_config["model_name"]) |
| print("✅ Model loaded without device specification") |
| except Exception as e2: |
| print(f"❌ Alternative loading also failed: {e2}") |
| return False |
| |
| |
| test_path = "/tmp/test_output.wav" |
| if "zh" in model_config["languages"]: |
| test_text = "你好" |
| else: |
| test_text = "Hello" |
| |
| try: |
| tts.tts_to_file(text=test_text, file_path=test_path) |
| |
| if os.path.exists(test_path): |
| os.remove(test_path) |
| print("✅ Model tested successfully!") |
| except Exception as e: |
| print(f"⚠️ Model test failed but continuing: {e}") |
| |
| |
| model_loaded = True |
| current_model = model_type |
| |
| print(f"✅ {model_config['name']} loaded successfully!") |
| print(f" Size: ~{model_config['size_mb']}MB") |
| print(f" Quality: {model_config['quality']}") |
| print(f" Languages: {model_config['languages']}") |
| |
| return True |
| |
| except Exception as e: |
| print(f"❌ Model failed to load: {e}") |
| return False |
| |
| finally: |
| sys.stdin = old_stdin |
| |
| except Exception as e: |
| print(f"❌ Failed to initialize TTS: {e}") |
| return False |
| finally: |
| model_loading = False |
|
|
| |
| def ensure_correct_model(voice_style: str, text: str, language: str = "auto"): |
| """Ensure the correct model is loaded for the requested voice style and language""" |
| global tts, model_loaded, current_model |
| |
| |
| target_model = get_model_for_voice_style(voice_style, language) |
| |
| print(f"🔍 Model selection: voice_style={voice_style}, language={language}, target_model={target_model}") |
| |
| |
| if not model_loaded or current_model != target_model: |
| print(f"🔄 Switching to model: {target_model} for voice style: {voice_style}, language: {language}") |
| return load_tts_model(target_model) |
| |
| return True |
|
|
| |
| @app.post("/api/tts") |
| async def generate_tts(request: TTSRequest): |
| """Generate TTS with multi-language support""" |
| try: |
| |
| cleanup_old_files() |
| |
| |
| if request.language == "auto": |
| detected_language = detect_language(request.text) |
| print(f"🌐 Auto-detected language: {detected_language}") |
| else: |
| detected_language = request.language |
| |
| |
| if not ensure_correct_model(request.voice_style, request.text, detected_language): |
| return { |
| "status": "error", |
| "message": f"Failed to load appropriate TTS model for {detected_language}", |
| "requires_tos_acceptance": True, |
| "tos_url": "https://coqui.ai/cpml.txt" |
| } |
| |
| print(f"📥 TTS request for project: {request.project_id}") |
| print(f" Voice Style: {request.voice_style}") |
| print(f" Language: {detected_language}") |
| print(f" Text length: {len(request.text)} characters") |
| print(f" Current Model: {current_model}") |
| |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| filename = f"voiceover_{timestamp}.wav" |
| output_path = f"/tmp/output/{filename}" |
| |
| |
| os.makedirs(os.path.dirname(output_path), exist_ok=True) |
| |
| |
| cleaned_text = clean_text(request.text, detected_language) |
| print(f"📝 Text: '{cleaned_text}'") |
| |
| |
| try: |
| |
| if current_model == "fastspeech2" and detected_language == "zh": |
| print("🎯 Using FastSpeech2 for Chinese text") |
| tts.tts_to_file(text=cleaned_text, file_path=output_path) |
| elif current_model == "tacotron2-ddc" and detected_language == "en": |
| print("🎯 Using Tacotron2-DDC for English text") |
| tts.tts_to_file(text=cleaned_text, file_path=output_path) |
| else: |
| |
| print(f"🔄 Language-model mismatch detected, attempting correction...") |
| correct_model = get_model_for_voice_style(request.voice_style, detected_language) |
| if load_tts_model(correct_model): |
| tts.tts_to_file(text=cleaned_text, file_path=output_path) |
| else: |
| raise Exception(f"Cannot process {detected_language} text with current model") |
| |
| except Exception as tts_error: |
| print(f"❌ TTS generation failed: {tts_error}") |
| raise tts_error |
| |
| |
| if not os.path.exists(output_path): |
| raise Exception(f"TTS failed to create output file") |
| |
| file_size = os.path.getsize(output_path) |
| print(f"✅ TTS generated: {output_path} ({file_size} bytes)") |
| |
| |
| upload_result, error = upload_to_oci(output_path, filename, request.project_id) |
| |
| if error: |
| print(f"❌ OCI upload failed: {error}") |
| return { |
| "status": "partial_success", |
| "message": f"TTS generated but upload failed: {error}", |
| "local_file": output_path, |
| "filename": filename, |
| "file_size": file_size |
| } |
| |
| print(f"✅ Upload successful: {filename}") |
| |
| |
| try: |
| os.remove(output_path) |
| print(f"🧹 Cleaned up local file: {output_path}") |
| except Exception as cleanup_error: |
| print(f"⚠️ Could not clean up file: {cleanup_error}") |
| |
| return { |
| "status": "success", |
| "message": "TTS generated and uploaded successfully", |
| "filename": filename, |
| "oci_path": upload_result.get("path", f"{request.project_id}/voiceover/{filename}"), |
| "model_used": current_model, |
| "voice_style": request.voice_style, |
| "language": detected_language, |
| "text_preview": cleaned_text[:100] + "..." if len(cleaned_text) > 100 else cleaned_text |
| } |
| |
| except Exception as e: |
| print(f"❌ TTS generation error: {str(e)}") |
| return { |
| "status": "error", |
| "message": f"TTS generation failed: {str(e)}" |
| } |
|
|
| |
| @app.post("/api/batch-tts") |
| async def batch_generate_tts(request: BatchTTSRequest): |
| """Batch TTS with multi-language support""" |
| try: |
| cleanup_old_files() |
| |
| print(f"📥 Batch TTS request for {len(request.texts)} texts") |
| print(f" Project: {request.project_id}") |
| print(f" Voice Style: {request.voice_style}") |
| print(f" Language: {request.language}") |
| |
| results = [] |
| for i, text in enumerate(request.texts): |
| try: |
| |
| if request.language == "auto": |
| text_language = detect_language(text) |
| else: |
| text_language = request.language |
| |
| print(f" Processing text {i+1}/{len(request.texts)}: {text_language} - {text[:50]}...") |
| |
| single_request = TTSRequest( |
| text=text, |
| project_id=request.project_id, |
| voice_style=request.voice_style, |
| speed=request.speed, |
| language=text_language |
| ) |
| |
| result = await generate_tts(single_request) |
| results.append({ |
| "text_index": i, |
| "text_preview": text[:30] + "..." if len(text) > 30 else text, |
| "status": result.get("status", "error"), |
| "message": result.get("message", ""), |
| "filename": result.get("filename", ""), |
| "oci_path": result.get("oci_path", ""), |
| "language": result.get("language", "unknown") |
| }) |
| |
| except Exception as e: |
| print(f"❌ Failed to process text {i}: {str(e)}") |
| results.append({ |
| "text_index": i, |
| "text_preview": text[:30] + "..." if len(text) > 30 else text, |
| "status": "error", |
| "message": f"Failed to generate TTS: {str(e)}" |
| }) |
| |
| |
| success_count = sum(1 for r in results if r.get("status") == "success") |
| error_count = sum(1 for r in results if r.get("status") == "error") |
| |
| print(f"📊 Batch completed: {success_count} successful, {error_count} failed") |
| |
| return { |
| "status": "completed", |
| "project_id": request.project_id, |
| "summary": { |
| "total": len(results), |
| "successful": success_count, |
| "failed": error_count |
| }, |
| "results": results, |
| "model_used": current_model |
| } |
| |
| except Exception as e: |
| print(f"❌ Batch TTS generation error: {str(e)}") |
| raise HTTPException(status_code=500, detail=f"Batch TTS generation failed: {str(e)}") |
|
|
| @app.get("/api/voice-styles") |
| async def get_voice_styles(): |
| """Get available voice styles""" |
| |
| english_styles = {k: v for k, v in VOICE_STYLES.items() if v.get("language") == "en"} |
| chinese_styles = {k: v for k, v in VOICE_STYLES.items() if v.get("language") == "zh"} |
| |
| return { |
| "status": "success", |
| "voice_styles": VOICE_STYLES, |
| "english_styles": english_styles, |
| "chinese_styles": chinese_styles, |
| "current_model": current_model if model_loaded else None, |
| "supported_languages": ["en", "zh", "auto"] |
| } |
|
|
| |
| @app.post("/api/detect-language") |
| async def detect_text_language(text: str = Form(...)): |
| """Detect the language of input text""" |
| try: |
| language = detect_language(text) |
| confidence = "high" if len(text) > 10 else "medium" |
| |
| return { |
| "status": "success", |
| "language": language, |
| "confidence": confidence, |
| "text_preview": text[:100] + "..." if len(text) > 100 else text |
| } |
| except Exception as e: |
| return { |
| "status": "error", |
| "message": f"Language detection failed: {str(e)}" |
| } |
|
|
| @app.get("/api/health") |
| async def health_check(): |
| """Health check with storage info""" |
| storage_ok = check_storage_usage() |
| |
| return { |
| "status": "healthy" if model_loaded and storage_ok else "warning", |
| "tts_loaded": model_loaded, |
| "model": current_model, |
| "storage_ok": storage_ok, |
| "device": DEVICE, |
| "supported_languages": AVAILABLE_MODELS.get(current_model, {}).get("languages", []) if model_loaded else [] |
| } |
|
|
| @app.post("/api/cleanup") |
| async def manual_cleanup(): |
| """Manual cleanup endpoint""" |
| try: |
| cleanup_old_files() |
| return { |
| "status": "success", |
| "message": "Cleanup completed successfully" |
| } |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Cleanup failed: {str(e)}") |
|
|
| @app.get("/") |
| async def root(): |
| """Root endpoint""" |
| return { |
| "message": "Multi-Language TTS API", |
| "model_loaded": model_loaded, |
| "model": current_model if model_loaded else "None", |
| "languages_supported": ["English", "Chinese"], |
| "storage_optimized": True |
| } |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| print("🚀 Starting Multi-Language TTS API...") |
| print("💾 Storage management enabled") |
| print("🌐 Supporting English and Chinese") |
| print("🔊 Using Tacotron2-DDC (English) and FastSpeech2 (Chinese)") |
| check_storage_usage() |
| uvicorn.run(app, host="0.0.0.0", port=7860) |