| """Models module for Kokoro TTS Local""" |
| from typing import Optional, Tuple, List |
| import torch |
| from kokoro import KPipeline |
| import os |
| import json |
| import codecs |
| from pathlib import Path |
| import numpy as np |
| import shutil |
|
|
| |
| os.environ["PYTHONIOENCODING"] = "utf-8" |
| |
| os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" |
|
|
| |
| VOICE_FILES = [ |
| |
| "af_alloy.pt", "af_aoede.pt", "af_bella.pt", "af_jessica.pt", |
| "af_kore.pt", "af_nicole.pt", "af_nova.pt", "af_river.pt", |
| "af_sarah.pt", "af_sky.pt", |
| |
| "am_adam.pt", "am_echo.pt", "am_eric.pt", "am_fenrir.pt", |
| "am_liam.pt", "am_michael.pt", "am_onyx.pt", "am_puck.pt", |
| "am_santa.pt", |
| |
| "bf_alice.pt", "bf_emma.pt", "bf_isabella.pt", "bf_lily.pt", |
| |
| "bm_daniel.pt", "bm_fable.pt", "bm_george.pt", "bm_lewis.pt", |
| |
| "el_dora.pt", "em_alex.pt", "em_santa.pt", |
| "ff_siwis.pt", |
| "hf_alpha.pt", "hf_beta.pt", |
| "hm_omega.pt", "hm_psi.pt", |
| "jf_sara.pt", "jm_nicola.pt", |
| "jf_alpha.pt", "jf_gongtsuene.pt", "jf_nezumi.pt", "jf_tebukuro.pt", |
| "jm_kumo.pt", |
| "pf_dora.pt", "pm_alex.pt", "pm_santa.pt", |
| "zf_xiaobei.pt", "zf_xiaoni.pt", "zf_xiaoqiao.pt", "zf_xiaoyi.pt" |
| ] |
|
|
| |
| original_load_voice = KPipeline.load_voice |
|
|
| def patched_load_voice(self, voice_path): |
| """Load voice model with weights_only=False for compatibility""" |
| if not os.path.exists(voice_path): |
| raise FileNotFoundError(f"Voice file not found: {voice_path}") |
| voice_name = Path(voice_path).stem |
| voice_model = torch.load(voice_path, weights_only=False) |
| if voice_model is None: |
| raise ValueError(f"Failed to load voice model from {voice_path}") |
| |
| if not hasattr(self, 'device'): |
| self.device = 'cpu' |
| |
| self.voices[voice_name] = voice_model.to(self.device) |
| return self.voices[voice_name] |
|
|
| KPipeline.load_voice = patched_load_voice |
|
|
| def patch_json_load(): |
| """Patch json.load to handle UTF-8 encoded files with special characters""" |
| original_load = json.load |
| |
| def custom_load(fp, *args, **kwargs): |
| try: |
| |
| if hasattr(fp, 'buffer'): |
| content = fp.buffer.read().decode('utf-8') |
| else: |
| content = fp.read() |
| return json.loads(content) |
| except UnicodeDecodeError: |
| |
| fp.seek(0) |
| content = fp.read() |
| if isinstance(content, bytes): |
| content = content.decode('utf-8-sig', errors='replace') |
| return json.loads(content) |
| |
| json.load = custom_load |
|
|
| def load_config(config_path: str) -> dict: |
| """Load configuration file with proper encoding handling""" |
| try: |
| with codecs.open(config_path, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| except UnicodeDecodeError: |
| |
| with codecs.open(config_path, 'r', encoding='utf-8-sig') as f: |
| return json.load(f) |
|
|
| |
| phonemizer_available = False |
| try: |
| from phonemizer.backend.espeak.wrapper import EspeakWrapper |
| from phonemizer import phonemize |
| import espeakng_loader |
| |
| |
| library_path = espeakng_loader.get_library_path() |
| data_path = espeakng_loader.get_data_path() |
| espeakng_loader.make_library_available() |
| |
| |
| EspeakWrapper.library_path = library_path |
| EspeakWrapper.data_path = data_path |
| |
| |
| try: |
| test_phonemes = phonemize('test', language='en-us') |
| if test_phonemes: |
| phonemizer_available = True |
| print("Phonemizer successfully initialized") |
| else: |
| print("Note: Phonemization returned empty result") |
| print("TTS will work, but phoneme visualization will be disabled") |
| except Exception as e: |
| |
| print(f"Note: Phonemizer not available: {e}") |
| print("TTS will work, but phoneme visualization will be disabled") |
|
|
| except ImportError as e: |
| print(f"Installing required phonemizer packages...") |
| import subprocess |
| try: |
| subprocess.check_call(["pip", "install", "espeakng-loader", "phonemizer-fork"]) |
| |
| |
| from phonemizer.backend.espeak.wrapper import EspeakWrapper |
| from phonemizer import phonemize |
| import espeakng_loader |
| |
| library_path = espeakng_loader.get_library_path() |
| data_path = espeakng_loader.get_data_path() |
| espeakng_loader.make_library_available() |
| EspeakWrapper.library_path = library_path |
| EspeakWrapper.data_path = data_path |
| |
| |
| try: |
| test_phonemes = phonemize('test', language='en-us') |
| if test_phonemes: |
| phonemizer_available = True |
| print("Phonemizer successfully initialized") |
| else: |
| print("Note: Phonemization returned empty result") |
| print("TTS will work, but phoneme visualization will be disabled") |
| except Exception as e: |
| print(f"Note: Phonemizer still not functional: {e}") |
| print("TTS will work, but phoneme visualization will be disabled") |
| except Exception as e: |
| print(f"Note: Could not install or initialize phonemizer: {e}") |
| print("TTS will work, but phoneme visualization will be disabled") |
|
|
| |
| _pipeline = None |
|
|
| def download_voice_files(): |
| """Download voice files from Hugging Face.""" |
| voices_dir = Path("voices") |
| voices_dir.mkdir(exist_ok=True) |
| |
| from huggingface_hub import hf_hub_download |
| downloaded_voices = [] |
| |
| print("\nDownloading voice files...") |
| for voice_file in VOICE_FILES: |
| try: |
| |
| voice_path = voices_dir / voice_file |
| |
| if not voice_path.exists(): |
| print(f"Downloading {voice_file}...") |
| |
| temp_path = hf_hub_download( |
| repo_id="hexgrad/Kokoro-82M", |
| filename=f"voices/{voice_file}", |
| local_dir="temp_voices", |
| force_download=True |
| ) |
| |
| |
| os.makedirs(os.path.dirname(voice_path), exist_ok=True) |
| shutil.move(temp_path, voice_path) |
| downloaded_voices.append(voice_file) |
| print(f"Successfully downloaded {voice_file}") |
| else: |
| print(f"Voice file {voice_file} already exists") |
| downloaded_voices.append(voice_file) |
| except Exception as e: |
| print(f"Warning: Failed to download {voice_file}: {e}") |
| continue |
| |
| |
| if os.path.exists("temp_voices"): |
| shutil.rmtree("temp_voices") |
| |
| if not downloaded_voices: |
| print("Warning: No voice files could be downloaded. Please check your internet connection.") |
| else: |
| print(f"Successfully processed {len(downloaded_voices)} voice files") |
| |
| return downloaded_voices |
|
|
| def build_model(model_path: str, device: str) -> KPipeline: |
| """Build and return the Kokoro pipeline with proper encoding configuration""" |
| global _pipeline |
| if _pipeline is None: |
| try: |
| |
| patch_json_load() |
| |
| |
| if model_path is None: |
| model_path = 'kokoro-v1_0.pth' |
| |
| if not os.path.exists(model_path): |
| print(f"Downloading model file {model_path}...") |
| from huggingface_hub import hf_hub_download |
| model_path = hf_hub_download( |
| repo_id="hexgrad/Kokoro-82M", |
| filename="kokoro-v1_0.pth", |
| local_dir=".", |
| force_download=True |
| ) |
| print(f"Model downloaded to {model_path}") |
| |
| |
| config_path = "config.json" |
| if not os.path.exists(config_path): |
| print("Downloading config file...") |
| config_path = hf_hub_download( |
| repo_id="hexgrad/Kokoro-82M", |
| filename="config.json", |
| local_dir=".", |
| force_download=True |
| ) |
| print(f"Config downloaded to {config_path}") |
| |
| |
| downloaded_voices = download_voice_files() |
| |
| if not downloaded_voices: |
| print("Error: No voice files available. Cannot proceed.") |
| raise ValueError("No voice files available") |
| |
| |
| _pipeline = KPipeline(lang_code='a') |
| if _pipeline is None: |
| raise ValueError("Failed to initialize KPipeline - pipeline is None") |
| |
| |
| _pipeline.device = device |
| |
| |
| if not hasattr(_pipeline, 'voices'): |
| _pipeline.voices = {} |
| |
| |
| for voice_file in downloaded_voices: |
| voice_path = f"voices/{voice_file}" |
| if os.path.exists(voice_path): |
| try: |
| _pipeline.load_voice(voice_path) |
| print(f"Successfully loaded voice: {voice_file}") |
| break |
| except Exception as e: |
| print(f"Warning: Failed to load voice {voice_file}: {e}") |
| continue |
| |
| except Exception as e: |
| print(f"Error initializing pipeline: {e}") |
| raise |
| return _pipeline |
|
|
| def list_available_voices() -> List[str]: |
| """List all available voice models""" |
| voices_dir = Path("voices") |
| |
| |
| if not voices_dir.exists(): |
| print(f"Creating voices directory at {voices_dir.absolute()}") |
| voices_dir.mkdir(exist_ok=True) |
| return [] |
| |
| |
| voice_files = list(voices_dir.glob("*.pt")) |
| |
| |
| if not voice_files: |
| print(f"No voice files found in {voices_dir.absolute()}") |
| |
| root_voices = list(Path(".").glob("voices/*.pt")) |
| if root_voices: |
| print("Found voice files in root voices directory, moving them...") |
| for voice_file in root_voices: |
| target_path = voices_dir / voice_file.name |
| if not target_path.exists(): |
| shutil.move(str(voice_file), str(target_path)) |
| |
| voice_files = list(voices_dir.glob("*.pt")) |
| |
| if not voice_files: |
| print("No voice files found. Please run the application again to download voices.") |
| return [] |
| |
| return [f.stem for f in voice_files] |
|
|
| def load_voice(voice_name: str, device: str) -> torch.Tensor: |
| """Load a voice model""" |
| pipeline = build_model(None, device) |
| |
| voice_name = voice_name.replace('.pt', '') |
| voice_path = f"voices/{voice_name}.pt" |
| if not os.path.exists(voice_path): |
| raise ValueError(f"Voice file not found: {voice_path}") |
| return pipeline.load_voice(voice_path) |
|
|
| def generate_speech( |
| model: KPipeline, |
| text: str, |
| voice: str, |
| lang: str = 'a', |
| device: str = 'cpu', |
| speed: float = 1.0 |
| ) -> Tuple[Optional[torch.Tensor], Optional[str]]: |
| """Generate speech using the Kokoro pipeline |
| |
| Args: |
| model: KPipeline instance |
| text: Text to synthesize |
| voice: Voice name (e.g. 'af_bella') |
| lang: Language code ('a' for American English, 'b' for British English) |
| device: Device to use ('cuda' or 'cpu') |
| speed: Speech speed multiplier (default: 1.0) |
| |
| Returns: |
| Tuple of (audio tensor, phonemes string) or (None, None) on error |
| """ |
| try: |
| if model is None: |
| raise ValueError("Model is None - pipeline not properly initialized") |
| |
| |
| if not hasattr(model, 'voices'): |
| model.voices = {} |
| |
| |
| if not hasattr(model, 'device'): |
| model.device = device |
| |
| |
| voice_name = voice.replace('.pt', '') |
| voice_path = f"voices/{voice_name}.pt" |
| if not os.path.exists(voice_path): |
| raise ValueError(f"Voice file not found: {voice_path}") |
| |
| |
| if voice_name not in model.voices: |
| print(f"Loading voice {voice_name}...") |
| model.load_voice(voice_path) |
| |
| if voice_name not in model.voices: |
| raise ValueError(f"Failed to load voice {voice_name}") |
| |
| |
| print(f"Generating speech with device: {model.device}") |
| generator = model( |
| text, |
| voice=voice_path, |
| speed=speed, |
| split_pattern=r'\n+' |
| ) |
| |
| |
| for gs, ps, audio in generator: |
| if audio is not None: |
| if isinstance(audio, np.ndarray): |
| audio = torch.from_numpy(audio).float() |
| return audio, ps |
| |
| return None, None |
| except Exception as e: |
| print(f"Error generating speech: {e}") |
| return None, None |