FastSpeech2_HS / main_ov.py
Aditya02's picture
Upload folder using huggingface_hub
1d87ed5 verified
from text_preprocess_for_inference import TTSDurAlignPreprocessor, CharTextPreprocessor, TTSPreprocessor
from espnet2.bin.tts_inference import Text2Speech
from scipy.io.wavfile import write
import json
import torch
import yaml
import sys
from utilities import SAMPLING_RATE, WARMUP_PARAGRAPHS
from datetime import datetime
import os
import time
import numpy as np
import openvino as ov
sys.path.append(os.getenv("HIFIGAN_PATH", f"hifigan"))
from hifigan.env import AttrDict
from hifigan.models import Generator
from hifigan.meldataset import MAX_WAV_VALUE
import torch.nn.functional as F
import nltk
nltk.download('averaged_perceptron_tagger_eng')
device = "cuda" if torch.cuda.is_available() else "cpu"
MAX_DEFAULT_VALUE = 600
def load_hifigan_vocoder(language: str, gender: str, device: str, dtype: str = "float32"):
"""
Loads HiFi-GAN vocoder configuration file and generator model.
"""
vocoder_config = f"vocoder/{gender}/{language}/config.json"
vocoder_generator = f"vocoder/{gender}/{language}/generator"
if not os.path.exists(vocoder_config) or not os.path.exists(vocoder_generator):
raise FileNotFoundError(
f"Vocoder files not found. Expected config: {vocoder_config}, generator: {vocoder_generator}")
with open(vocoder_config, 'r') as f:
data = f.read()
json_config = json.loads(data)
h = AttrDict(json_config)
torch.manual_seed(h.seed)
device = torch.device(device)
generator = Generator(h).to(device)
state_dict_g = torch.load(vocoder_generator, map_location=device)
generator.load_state_dict(state_dict_g['generator'])
generator.eval()
generator.remove_weight_norm()
if dtype == "bfloat16":
generator = generator.to(torch.bfloat16)
return generator
def load_fastspeech2_model(language: str, gender: str, device: str, dtype: str = "float32"):
"""
Loads FastSpeech2 model and updates its configuration with absolute paths.
"""
config_path = f"{language}/{gender}/model/config.yaml"
tts_model_path = f"{language}/{gender}/model/model.pth"
if not os.path.exists(config_path) or not os.path.exists(tts_model_path):
raise FileNotFoundError(
f"FastSpeech2 model files not found. Expected config: {config_path}, model: {tts_model_path}")
with open(config_path, "r") as file:
config = yaml.safe_load(file)
current_working_directory = os.getcwd()
feat_rel_path = "model/feats_stats.npz"
pitch_rel_path = "model/pitch_stats.npz"
energy_rel_path = "model/energy_stats.npz"
feat_path = os.path.join(current_working_directory,
language, gender, feat_rel_path)
pitch_path = os.path.join(
current_working_directory, language, gender, pitch_rel_path)
energy_path = os.path.join(
current_working_directory, language, gender, energy_rel_path)
config["normalize_conf"]["stats_file"] = feat_path
config["pitch_normalize_conf"]["stats_file"] = pitch_path
config["energy_normalize_conf"]["stats_file"] = energy_path
# Temporarily write the modified config to a new file or use a BytesIO object if preferred
with open(config_path, "w") as file:
yaml.dump(config, file)
model = Text2Speech(train_config=config_path, model_file=tts_model_path, device=device, vocoder_config=None,vocoder_file=None)
model.vocoder=None
if dtype == "bfloat16":
model.model = model.model.to(torch.bfloat16)
return model
def split_into_chunks(text: str, words_per_chunk: int = 100):
"""Splits text into chunks of specified words_per_chunk."""
words = text.split()
chunks = [words[i:i + words_per_chunk]
for i in range(0, len(words), words_per_chunk)]
return [' '.join(chunk) for chunk in chunks]
class Text2SpeechApp:
def __init__(self, language: str, batch_size: str = 1, alpha: float = 1, dtype: str = "bfloat16"):
self.alpha = alpha
self.lang = language
self.batch_size = batch_size
self.dtype = dtype
self.vocoder_model = {}
self.fastspeech2_model = {}
self.supported_genders = []
self.preprocessor = TTSDurAlignPreprocessor()
genders = ["male", "female"]
for gender in genders:
try:
self.vocoder_model[gender] = load_hifigan_vocoder(
f"{language}_latest", gender, device, self.dtype)
with torch.no_grad():
self.vocoder_model[gender] = ov.convert_model(self.vocoder_model[gender], example_input=torch.ones([1, 160, MAX_DEFAULT_VALUE]))
self.vocoder_model[gender] = ov.compile_model(self.vocoder_model[gender], device_name="CPU")
print(
f"Loaded HiFi-GAN vocoder for {language}-{gender}")
self.fastspeech2_model[gender] = load_fastspeech2_model(
f"{language}_latest", gender, device, self.dtype)
# with torch.no_grad():
# self.fastspeech2_model[gender] = ov.convert_model(self.fastspeech2_model[gender])
print(
f"Loaded FastSpeech2 model for {language}-{gender}")
self.supported_genders.append(gender)
except FileNotFoundError as e:
print(
f"Error loading model for {language}-{gender}: {e}. This model key will not be available.")
except Exception as e:
print(
f"An unexpected error occurred while loading model for {language}-{gender}: {e}. This model key will not be available.")
self.warmup()
def pre_print(self, print_str: str):
print("=================================================")
print(print_str)
print("=================================================")
def warmup(self):
self.pre_print("TTS Warming up!")
lang = self.lang.lower()
text = WARMUP_PARAGRAPHS.get(lang)
if not text:
print(f"No warmup paragraph available for language: {lang}")
return
# Ensure warmup output directory exists
output_dir = "./warmup_outputs"
os.makedirs(output_dir, exist_ok=True)
print(f"Running warmup for language: {lang}")
print(f"Warmup text length: {len(text.split())} words")
total_start_time = time.time()
for gender in ["male", "female"]:
if gender not in self.fastspeech2_model:
print(f"Skipping warmup for {gender} - model not loaded.")
continue
print(f"Starting warmup for {lang}-{gender}")
try:
gender_start_time = time.time()
for i in range(2): # Run twice; adjust as needed
print(f"Warmup iteration {i + 1} for {gender}")
time_taken, _ = self.convert_and_save(
text=text,
speaker_gender=gender,
output_file_dir=output_dir
)
print(f"Iteration {i + 1} for {gender} completed in {time_taken:.2f} seconds")
gender_total_time = time.time() - gender_start_time
print(f"Total warmup time for {gender}: {gender_total_time:.2f} seconds")
except Exception as e:
print(f"Warmup failed for {lang}-{gender}: {e}")
total_time = time.time() - total_start_time
print(f"Total TTS warmup completed in {total_time:.2f} seconds")
self.pre_print("TTS Warming finished!")
def save_to_file(self, audio_arr, file_path):
write(file_path, SAMPLING_RATE, audio_arr)
print(f"Audio saved to {file_path}")
def convert_and_save(self, text: str, speaker_gender="male", output_file_dir: str = "./outputs"):
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
output_file = f"{output_file_dir}/{self.lang}_{speaker_gender}_{timestamp}.wav"
start = time.time()
audio_arr = []
result_chunks = split_into_chunks(text)
for chunk_text in result_chunks:
# Preprocess the text
preprocessed_text, _ = self.preprocessor.preprocess(
chunk_text, self.lang, speaker_gender)
preprocessed_text = " ".join(preprocessed_text)
with torch.no_grad():
# Generate mel-spectrograms
out = self.fastspeech2_model[speaker_gender](preprocessed_text,
decode_conf={"alpha": self.alpha})
x = out["feat_gen_denorm"].T.unsqueeze(0) * 2.3262
# Convert mel-spectrograms to raw audio waveforms
y_g_hat = self.vocoder_model[speaker_gender](x)
audio = y_g_hat.squeeze()
audio = audio * MAX_WAV_VALUE
audio = audio.numpy().astype('int16')
audio_arr.append(audio)
result_array = np.concatenate(audio_arr, axis=0)
self.save_to_file(audio_arr=result_array, file_path=output_file)
time_taken = time.time() - start
return time_taken, output_file
def generate_audio_bytes(self, text: str, speaker_gender="male", save_file: bool = False):
preprocessed_text, _ = self.preprocessor.preprocess(
text, self.lang, speaker_gender)
preprocessed_text = " ".join(preprocessed_text)
with torch.no_grad():
# Generate mel-spectrograms
st = time.perf_counter()
out = self.fastspeech2_model[speaker_gender](preprocessed_text,
decode_conf={"alpha": self.alpha})
x = out["feat_gen_denorm"].T.unsqueeze(0) * 2.3262
# Convert mel-spectrograms to raw audio waveforms
# [8,8,8,2]; default sr = 44100
trim_length = int(8 * 8 * 8 * 2 * x.shape[-1])
x = F.pad(x, (0, MAX_DEFAULT_VALUE - x.shape[-1]), value=-12)
st = time.perf_counter()
y_g_hat = self.vocoder_model[speaker_gender](x)
audio = y_g_hat[0][0][0][:trim_length]
audio = audio * MAX_WAV_VALUE
return audio
def evaluate_performance(self, input_sentences: list, save_file: bool = False):
total_sentences = len(input_sentences)
print(f"\nTotal T2S to be done: {total_sentences}\n")
for i, sentence in enumerate(input_sentences):
start_time = time.perf_counter()
audio = self.generate_audio_bytes(text=sentence)
time_taken = time.perf_counter() - start_time
print("=================================================")
print(f"Sentence {i + 1}/{total_sentences}:{sentence} processed in {time_taken:.2f} seconds")
if save_file:
os.makedirs(f"audios_{self.dtype}/numpy_files", exist_ok=True)
os.makedirs(f"audios_{self.dtype}/audio_files", exist_ok=True)
output_file = f"audios_{self.dtype}/numpy_files/file_{i}.npy"
if audio.dtype == torch.bfloat16:
audio = audio.to(torch.float32)
audio = audio.astype('int16')
np.save(output_file, audio)
audio_file_path = f"audios_{self.dtype}/audio_files/file_{i}.wav"
with open(audio_file_path, "wb") as f:
write(f, SAMPLING_RATE, audio)
print(f"Audio saved to {audio_file_path}")
return time_taken
def save_to_files(self, byte_ios, file_prefix: str) -> list[str]:
file_paths = []
for i in range(len(byte_ios)):
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
file_path = f"{file_prefix}_{timestamp}_{i + 1}.wav"
file_paths.append(file_path)
with open(file_path, "wb") as f:
f.write(byte_ios[i].read())
print(f"Audio saved to {file_path}")
return file_paths
def batch_convert_and_save(self, input_sentences: list[str], speaker_gender="male", output_file_dir: str = "./outputs"):
start_time = time.time()
output_file_paths = []
total_sentences = len(input_sentences)
os.makedirs(output_file_dir, exist_ok=True)
print(f"Total T2S to be done: {total_sentences}\n")
combined_para = ''.join(input_sentences)
paragraph_time, output_path = self.convert_and_save(
combined_para, speaker_gender=speaker_gender, output_file_dir=output_file_dir)
print(f"Paragraph Time: {paragraph_time}\n")
output_file_paths.append(output_path)
time_taken = time.time() - start_time
return time_taken, output_file_paths
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Text to Speech benchmarking")
parser.add_argument("--batch_size", type=int, default=1, help="Batch size for TTS inference")
parser.add_argument("--language", type=str, default="hindi", help="Language for TTS")
parser.add_argument("--alpha", type=float, default=1.0, help="Alpha value for FastSpeech2 decoding")
parser.add_argument("--dtype", type=str, default="float32", help="Data type for model inference")
args = parser.parse_args()
batch_size = 1
language = "hindi"
alpha = 1
tts = Text2SpeechApp(batch_size=batch_size, alpha=alpha, language=language, dtype=args.dtype)
st = time.perf_counter()
texts = [
"जीवन में सफलता पाने के लिए केवल सपने देखना ही नहीं, बल्कि उन्हें पूरा करने के लिए निरंतर प्रयास और आत्मविश्वास भी ज़रूरी होता है।",
"कठिन परिस्थितियाँ हमें तोड़ने नहीं आतीं, बल्कि हमें मज़बूत बनाकर जीवन के असली अर्थ से परिचित कराती हैं।",
"सकारात्मक सोच और सही दृष्टिकोण के साथ किया गया हर छोटा प्रयास भी एक दिन बड़ी उपलब्धि में बदल जाता है।",
"जब हम निस्वार्थ भाव से दूसरों की मदद करते हैं, तब हमारे अपने जीवन में भी शांति और संतुलन अपने आप आ जाता है।"
]
total_time = tts.evaluate_performance(texts, save_file=True)
et = time.perf_counter()
print(f"Total time for evaluating {len(texts)} sentences: {et - st:.2f} seconds")
print(f"Average time per sentence: {(et - st)/len(texts):.2f} seconds")