| from text_preprocess_for_inference import TTSDurAlignPreprocessor, CharTextPreprocessor, TTSPreprocessor |
| from espnet2.bin.tts_inference import Text2Speech |
| from scipy.io.wavfile import write |
| import json |
| import torch |
| import yaml |
| import sys |
| from utilities import SAMPLING_RATE, WARMUP_PARAGRAPHS |
| from datetime import datetime |
| import os |
| import time |
| import numpy as np |
| import openvino as ov |
|
|
| sys.path.append(os.getenv("HIFIGAN_PATH", f"hifigan")) |
| from hifigan.env import AttrDict |
| from hifigan.models import Generator |
| from hifigan.meldataset import MAX_WAV_VALUE |
| import torch.nn.functional as F |
|
|
| import nltk |
| nltk.download('averaged_perceptron_tagger_eng') |
|
|
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| MAX_DEFAULT_VALUE = 600 |
|
|
|
|
| def load_hifigan_vocoder(language: str, gender: str, device: str, dtype: str = "float32"): |
| """ |
| Loads HiFi-GAN vocoder configuration file and generator model. |
| """ |
| vocoder_config = f"vocoder/{gender}/{language}/config.json" |
| vocoder_generator = f"vocoder/{gender}/{language}/generator" |
|
|
| if not os.path.exists(vocoder_config) or not os.path.exists(vocoder_generator): |
| raise FileNotFoundError( |
| f"Vocoder files not found. Expected config: {vocoder_config}, generator: {vocoder_generator}") |
|
|
| with open(vocoder_config, 'r') as f: |
| data = f.read() |
| json_config = json.loads(data) |
| h = AttrDict(json_config) |
| torch.manual_seed(h.seed) |
| device = torch.device(device) |
| generator = Generator(h).to(device) |
| state_dict_g = torch.load(vocoder_generator, map_location=device) |
| generator.load_state_dict(state_dict_g['generator']) |
| generator.eval() |
| generator.remove_weight_norm() |
|
|
| if dtype == "bfloat16": |
| generator = generator.to(torch.bfloat16) |
|
|
| return generator |
|
|
|
|
| def load_fastspeech2_model(language: str, gender: str, device: str, dtype: str = "float32"): |
| """ |
| Loads FastSpeech2 model and updates its configuration with absolute paths. |
| """ |
| config_path = f"{language}/{gender}/model/config.yaml" |
| tts_model_path = f"{language}/{gender}/model/model.pth" |
|
|
| if not os.path.exists(config_path) or not os.path.exists(tts_model_path): |
| raise FileNotFoundError( |
| f"FastSpeech2 model files not found. Expected config: {config_path}, model: {tts_model_path}") |
|
|
| with open(config_path, "r") as file: |
| config = yaml.safe_load(file) |
|
|
| current_working_directory = os.getcwd() |
| feat_rel_path = "model/feats_stats.npz" |
| pitch_rel_path = "model/pitch_stats.npz" |
| energy_rel_path = "model/energy_stats.npz" |
|
|
| feat_path = os.path.join(current_working_directory, |
| language, gender, feat_rel_path) |
| pitch_path = os.path.join( |
| current_working_directory, language, gender, pitch_rel_path) |
| energy_path = os.path.join( |
| current_working_directory, language, gender, energy_rel_path) |
|
|
| config["normalize_conf"]["stats_file"] = feat_path |
| config["pitch_normalize_conf"]["stats_file"] = pitch_path |
| config["energy_normalize_conf"]["stats_file"] = energy_path |
|
|
| |
| with open(config_path, "w") as file: |
| yaml.dump(config, file) |
|
|
| model = Text2Speech(train_config=config_path, model_file=tts_model_path, device=device, vocoder_config=None,vocoder_file=None) |
| model.vocoder=None |
| |
| if dtype == "bfloat16": |
| model.model = model.model.to(torch.bfloat16) |
|
|
| return model |
|
|
|
|
| def split_into_chunks(text: str, words_per_chunk: int = 100): |
| """Splits text into chunks of specified words_per_chunk.""" |
| words = text.split() |
| chunks = [words[i:i + words_per_chunk] |
| for i in range(0, len(words), words_per_chunk)] |
| return [' '.join(chunk) for chunk in chunks] |
|
|
|
|
| class Text2SpeechApp: |
| def __init__(self, language: str, batch_size: str = 1, alpha: float = 1, dtype: str = "bfloat16"): |
| self.alpha = alpha |
| self.lang = language |
| self.batch_size = batch_size |
| self.dtype = dtype |
| self.vocoder_model = {} |
| self.fastspeech2_model = {} |
| self.supported_genders = [] |
| |
| self.preprocessor = TTSDurAlignPreprocessor() |
|
|
| genders = ["male", "female"] |
| for gender in genders: |
| try: |
| self.vocoder_model[gender] = load_hifigan_vocoder( |
| f"{language}_latest", gender, device, self.dtype) |
| with torch.no_grad(): |
| self.vocoder_model[gender] = ov.convert_model(self.vocoder_model[gender], example_input=torch.ones([1, 160, MAX_DEFAULT_VALUE])) |
| |
| self.vocoder_model[gender] = ov.compile_model(self.vocoder_model[gender], device_name="CPU") |
| print( |
| f"Loaded HiFi-GAN vocoder for {language}-{gender}") |
|
|
| self.fastspeech2_model[gender] = load_fastspeech2_model( |
| f"{language}_latest", gender, device, self.dtype) |
| |
| |
| |
| print( |
| f"Loaded FastSpeech2 model for {language}-{gender}") |
| self.supported_genders.append(gender) |
| except FileNotFoundError as e: |
| print( |
| f"Error loading model for {language}-{gender}: {e}. This model key will not be available.") |
| except Exception as e: |
| print( |
| f"An unexpected error occurred while loading model for {language}-{gender}: {e}. This model key will not be available.") |
| self.warmup() |
|
|
| def pre_print(self, print_str: str): |
| print("=================================================") |
| print(print_str) |
| print("=================================================") |
|
|
| def warmup(self): |
| self.pre_print("TTS Warming up!") |
|
|
| lang = self.lang.lower() |
| text = WARMUP_PARAGRAPHS.get(lang) |
|
|
| if not text: |
| print(f"No warmup paragraph available for language: {lang}") |
| return |
|
|
| |
| output_dir = "./warmup_outputs" |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| print(f"Running warmup for language: {lang}") |
| print(f"Warmup text length: {len(text.split())} words") |
|
|
| total_start_time = time.time() |
|
|
| for gender in ["male", "female"]: |
| if gender not in self.fastspeech2_model: |
| print(f"Skipping warmup for {gender} - model not loaded.") |
| continue |
|
|
| print(f"Starting warmup for {lang}-{gender}") |
| try: |
| gender_start_time = time.time() |
| for i in range(2): |
| print(f"Warmup iteration {i + 1} for {gender}") |
| time_taken, _ = self.convert_and_save( |
| text=text, |
| speaker_gender=gender, |
| output_file_dir=output_dir |
| ) |
| print(f"Iteration {i + 1} for {gender} completed in {time_taken:.2f} seconds") |
| gender_total_time = time.time() - gender_start_time |
| print(f"Total warmup time for {gender}: {gender_total_time:.2f} seconds") |
| except Exception as e: |
| print(f"Warmup failed for {lang}-{gender}: {e}") |
|
|
| total_time = time.time() - total_start_time |
| print(f"Total TTS warmup completed in {total_time:.2f} seconds") |
| self.pre_print("TTS Warming finished!") |
|
|
| def save_to_file(self, audio_arr, file_path): |
| write(file_path, SAMPLING_RATE, audio_arr) |
| print(f"Audio saved to {file_path}") |
|
|
| def convert_and_save(self, text: str, speaker_gender="male", output_file_dir: str = "./outputs"): |
| timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") |
| output_file = f"{output_file_dir}/{self.lang}_{speaker_gender}_{timestamp}.wav" |
|
|
| start = time.time() |
| audio_arr = [] |
| result_chunks = split_into_chunks(text) |
|
|
| for chunk_text in result_chunks: |
| |
| preprocessed_text, _ = self.preprocessor.preprocess( |
| chunk_text, self.lang, speaker_gender) |
| preprocessed_text = " ".join(preprocessed_text) |
|
|
| with torch.no_grad(): |
| |
| out = self.fastspeech2_model[speaker_gender](preprocessed_text, |
| decode_conf={"alpha": self.alpha}) |
|
|
| x = out["feat_gen_denorm"].T.unsqueeze(0) * 2.3262 |
|
|
| |
| y_g_hat = self.vocoder_model[speaker_gender](x) |
| audio = y_g_hat.squeeze() |
|
|
| audio = audio * MAX_WAV_VALUE |
| |
| audio = audio.numpy().astype('int16') |
| audio_arr.append(audio) |
|
|
| result_array = np.concatenate(audio_arr, axis=0) |
| self.save_to_file(audio_arr=result_array, file_path=output_file) |
| time_taken = time.time() - start |
|
|
| return time_taken, output_file |
| |
|
|
| def generate_audio_bytes(self, text: str, speaker_gender="male", save_file: bool = False): |
| preprocessed_text, _ = self.preprocessor.preprocess( |
| text, self.lang, speaker_gender) |
| preprocessed_text = " ".join(preprocessed_text) |
|
|
| with torch.no_grad(): |
| |
| st = time.perf_counter() |
| out = self.fastspeech2_model[speaker_gender](preprocessed_text, |
| decode_conf={"alpha": self.alpha}) |
|
|
| x = out["feat_gen_denorm"].T.unsqueeze(0) * 2.3262 |
|
|
| |
| |
| trim_length = int(8 * 8 * 8 * 2 * x.shape[-1]) |
| x = F.pad(x, (0, MAX_DEFAULT_VALUE - x.shape[-1]), value=-12) |
| st = time.perf_counter() |
| y_g_hat = self.vocoder_model[speaker_gender](x) |
| audio = y_g_hat[0][0][0][:trim_length] |
|
|
| audio = audio * MAX_WAV_VALUE |
|
|
| return audio |
| |
|
|
| def evaluate_performance(self, input_sentences: list, save_file: bool = False): |
| total_sentences = len(input_sentences) |
| print(f"\nTotal T2S to be done: {total_sentences}\n") |
| for i, sentence in enumerate(input_sentences): |
| start_time = time.perf_counter() |
| audio = self.generate_audio_bytes(text=sentence) |
| time_taken = time.perf_counter() - start_time |
| print("=================================================") |
| print(f"Sentence {i + 1}/{total_sentences}:{sentence} processed in {time_taken:.2f} seconds") |
|
|
| if save_file: |
| os.makedirs(f"audios_{self.dtype}/numpy_files", exist_ok=True) |
| os.makedirs(f"audios_{self.dtype}/audio_files", exist_ok=True) |
|
|
| output_file = f"audios_{self.dtype}/numpy_files/file_{i}.npy" |
|
|
| if audio.dtype == torch.bfloat16: |
| audio = audio.to(torch.float32) |
| |
| audio = audio.astype('int16') |
| np.save(output_file, audio) |
|
|
| audio_file_path = f"audios_{self.dtype}/audio_files/file_{i}.wav" |
| with open(audio_file_path, "wb") as f: |
| write(f, SAMPLING_RATE, audio) |
| print(f"Audio saved to {audio_file_path}") |
| |
| return time_taken |
|
|
|
|
| def save_to_files(self, byte_ios, file_prefix: str) -> list[str]: |
| file_paths = [] |
| for i in range(len(byte_ios)): |
| timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") |
| file_path = f"{file_prefix}_{timestamp}_{i + 1}.wav" |
| file_paths.append(file_path) |
| with open(file_path, "wb") as f: |
| f.write(byte_ios[i].read()) |
| print(f"Audio saved to {file_path}") |
| return file_paths |
|
|
| def batch_convert_and_save(self, input_sentences: list[str], speaker_gender="male", output_file_dir: str = "./outputs"): |
| start_time = time.time() |
| output_file_paths = [] |
| total_sentences = len(input_sentences) |
| os.makedirs(output_file_dir, exist_ok=True) |
|
|
| print(f"Total T2S to be done: {total_sentences}\n") |
| combined_para = ''.join(input_sentences) |
| paragraph_time, output_path = self.convert_and_save( |
| combined_para, speaker_gender=speaker_gender, output_file_dir=output_file_dir) |
| print(f"Paragraph Time: {paragraph_time}\n") |
| output_file_paths.append(output_path) |
|
|
| time_taken = time.time() - start_time |
| return time_taken, output_file_paths |
|
|
| if __name__ == "__main__": |
| import argparse |
|
|
| parser = argparse.ArgumentParser(description="Text to Speech benchmarking") |
| parser.add_argument("--batch_size", type=int, default=1, help="Batch size for TTS inference") |
| parser.add_argument("--language", type=str, default="hindi", help="Language for TTS") |
| parser.add_argument("--alpha", type=float, default=1.0, help="Alpha value for FastSpeech2 decoding") |
| parser.add_argument("--dtype", type=str, default="float32", help="Data type for model inference") |
| args = parser.parse_args() |
|
|
| batch_size = 1 |
| language = "hindi" |
| alpha = 1 |
| tts = Text2SpeechApp(batch_size=batch_size, alpha=alpha, language=language, dtype=args.dtype) |
| st = time.perf_counter() |
| texts = [ |
| "जीवन में सफलता पाने के लिए केवल सपने देखना ही नहीं, बल्कि उन्हें पूरा करने के लिए निरंतर प्रयास और आत्मविश्वास भी ज़रूरी होता है।", |
| "कठिन परिस्थितियाँ हमें तोड़ने नहीं आतीं, बल्कि हमें मज़बूत बनाकर जीवन के असली अर्थ से परिचित कराती हैं।", |
| "सकारात्मक सोच और सही दृष्टिकोण के साथ किया गया हर छोटा प्रयास भी एक दिन बड़ी उपलब्धि में बदल जाता है।", |
| "जब हम निस्वार्थ भाव से दूसरों की मदद करते हैं, तब हमारे अपने जीवन में भी शांति और संतुलन अपने आप आ जाता है।" |
| ] |
|
|
| total_time = tts.evaluate_performance(texts, save_file=True) |
| et = time.perf_counter() |
| print(f"Total time for evaluating {len(texts)} sentences: {et - st:.2f} seconds") |
| print(f"Average time per sentence: {(et - st)/len(texts):.2f} seconds") |