| from __future__ import annotations |
|
|
| import functools |
| import io |
| import os |
| import tempfile |
|
|
| import numpy as np |
| import uuid |
| import subprocess |
| import time |
|
|
| from src.tts_sentence_parsing import init_sentence_state, get_sentence, clean_sentence, detect_language |
| from src.tts_utils import prepare_speech, get_no_audio, chunk_speed_change |
| from src.utils import cuda_vis_check |
|
|
| import torch |
|
|
| n_gpus1 = torch.cuda.device_count() if torch.cuda.is_available() else 0 |
| n_gpus1, gpu_ids = cuda_vis_check(n_gpus1) |
|
|
|
|
| def list_models(): |
| from TTS.utils.manage import ModelManager |
| return ModelManager().list_tts_models() |
|
|
|
|
| def get_xtt(model_name="tts_models/multilingual/multi-dataset/xtts_v2", deepspeed=True, use_gpu=True, gpu_id='auto'): |
| if n_gpus1 == 0: |
| use_gpu = False |
|
|
| |
| os.environ["COQUI_TOS_AGREED"] = "1" |
|
|
| from TTS.tts.configs.xtts_config import XttsConfig |
| from TTS.tts.models.xtts import Xtts |
| from TTS.utils.generic_utils import get_user_data_dir |
|
|
| |
| print("Downloading if not downloaded Coqui XTTS V2") |
|
|
| from TTS.utils.manage import ModelManager |
|
|
| ModelManager().download_model(model_name) |
| model_path = os.path.join(get_user_data_dir("tts"), model_name.replace("/", "--")) |
| print("XTTS downloaded") |
|
|
| print("Loading XTTS") |
| config = XttsConfig() |
| config.load_json(os.path.join(model_path, "config.json")) |
| |
| |
| supported_languages = config.languages |
|
|
| model = Xtts.init_from_config(config) |
| model.load_checkpoint( |
| config, |
| checkpoint_dir=os.path.dirname(os.path.join(model_path, "model.pth")), |
| checkpoint_path=os.path.join(model_path, "model.pth"), |
| vocab_path=os.path.join(model_path, "vocab.json"), |
| eval=True, |
| use_deepspeed=deepspeed, |
| ) |
| if use_gpu: |
| if gpu_id == 'auto': |
| model.cuda() |
| else: |
| model.cuda(device='cuda:%d' % gpu_id) |
| print("Done loading TTS") |
| return model, supported_languages |
|
|
|
|
| def get_latent(speaker_wav, voice_cleanup=False, model=None, gpt_cond_len=30, max_ref_length=60, sr=24000): |
| if model is None: |
| model, supported_languages = get_xtt() |
|
|
| if voice_cleanup: |
| speaker_wav = filter_wave_1(speaker_wav) |
| |
| else: |
| speaker_wav = speaker_wav |
|
|
| |
| |
| |
| latent = model.get_conditioning_latents(audio_path=speaker_wav, gpt_cond_len=gpt_cond_len, |
| max_ref_length=max_ref_length, load_sr=sr) |
| return latent |
|
|
|
|
| def get_voice_streaming(prompt, language, latent, suffix="0", model=None, sr=24000, tts_speed=1.0): |
| if model is None: |
| model, supported_languages = get_xtt() |
|
|
| gpt_cond_latent, speaker_embedding = latent |
|
|
| try: |
| t0 = time.time() |
| chunks = model.inference_stream( |
| prompt, |
| language, |
| gpt_cond_latent, |
| speaker_embedding, |
| repetition_penalty=7.0, |
| temperature=0.85, |
| ) |
|
|
| first_chunk = True |
| for i, chunk in enumerate(chunks): |
| if first_chunk: |
| first_chunk_time = time.time() - t0 |
| first_chunk = False |
| chunk = chunk.detach().cpu().numpy().squeeze() |
| chunk = (chunk * 32767).astype(np.int16) |
|
|
| chunk = chunk_speed_change(chunk, sr, tts_speed=tts_speed) |
|
|
| yield chunk.tobytes() |
|
|
| except RuntimeError as e: |
| if "device-side assert" in str(e): |
| print(f"Restarted required due to exception: %s" % str(e), flush=True) |
| else: |
| print("Failed to generate wave: %s" % str(e)) |
| except Exception as e: |
| print("Failed to generate wave: %s" % str(e)) |
|
|
|
|
| def generate_speech(response, |
| model=None, |
| language='autodetect', |
| supported_languages=None, |
| latent=None, |
| sentence_state=None, |
| return_as_byte=True, |
| return_nonbyte_as_file=False, |
| sr=24000, |
| tts_speed=1.0, |
| return_gradio=False, |
| is_final=False, |
| verbose=False, |
| debug=False): |
| if model is None or supported_languages is None: |
| model, supported_languages = get_xtt() |
| if sentence_state is None: |
| sentence_state = init_sentence_state() |
| if latent is None: |
| latent = get_latent("models/female.wav", model=model) |
|
|
| sentence, sentence_state, _ = get_sentence(response, sentence_state=sentence_state, is_final=is_final, |
| verbose=verbose) |
| if sentence: |
| t0 = time.time() |
| if verbose: |
| print("sentence_to_wave: %s" % sentence) |
|
|
| audio = sentence_to_wave(sentence, |
| supported_languages, |
| tts_speed, |
| model=model, |
| latent=latent, |
| return_as_byte=return_as_byte, |
| return_nonbyte_as_file=return_nonbyte_as_file, |
| sr=sr, |
| language=language, |
| return_gradio=return_gradio) |
| if verbose: |
| print("done sentence_to_wave: %s" % (time.time() - t0), flush=True) |
| else: |
| if verbose and debug: |
| print("No audio", flush=True) |
| no_audio = get_no_audio(sr=sr, return_as_byte=return_as_byte, return_nonbyte_as_file=return_nonbyte_as_file) |
| if return_gradio: |
| import gradio as gr |
| audio = gr.Audio(value=no_audio, autoplay=False) |
| else: |
| audio = no_audio |
| return audio, sentence, sentence_state |
|
|
|
|
| def sentence_to_wave(sentence, supported_languages, tts_speed, |
| latent=None, |
| return_as_byte=False, |
| return_nonbyte_as_file=False, |
| sr=24000, model=None, |
| return_gradio=True, language='autodetect', verbose=False): |
| """ |
| generate speech audio file per sentence |
| """ |
| import noisereduce as nr |
| import wave |
|
|
| sentence = clean_sentence(sentence, verbose=verbose) |
| sentence_list = [sentence] |
|
|
| try: |
| wav_bytestream = b"" |
| for sentence in sentence_list: |
|
|
| if any(c.isalnum() for c in sentence): |
| if language == "autodetect": |
| |
| language = detect_language(sentence, supported_languages, verbose=verbose) |
|
|
| |
| audio_stream = get_voice_streaming( |
| sentence, language, latent, |
| model=model, |
| tts_speed=tts_speed, |
| ) |
| else: |
| |
| audio_stream = None |
|
|
| if audio_stream is not None: |
| frame_length = 0 |
| for chunk in audio_stream: |
| try: |
| wav_bytestream += chunk |
| frame_length += len(chunk) |
| except Exception as e: |
| print("Exception in chunk appending: %s" % str(e), flush=True) |
| continue |
|
|
| |
| filter_output = False |
| if filter_output: |
| data_s16 = np.frombuffer(wav_bytestream, dtype=np.int16, count=len(wav_bytestream) // 2, offset=0) |
| float_data = data_s16 * 0.5 ** 15 |
| reduced_noise = nr.reduce_noise(y=float_data, sr=sr, prop_decrease=0.8, n_fft=1024) |
| wav_bytestream = (reduced_noise * 32767).astype(np.int16) |
| if return_as_byte: |
| wav_bytestream = wav_bytestream.tobytes() |
|
|
| if audio_stream is not None: |
| if not return_as_byte: |
| if return_nonbyte_as_file: |
| tmpdir = os.getenv('TMPDDIR', tempfile.mkdtemp()) |
| audio_unique_filename = os.path.join(tmpdir, str(uuid.uuid4()) + ".wav") |
| with wave.open(audio_unique_filename, "w") as f: |
| f.setnchannels(1) |
| |
| f.setsampwidth(2) |
| f.setframerate(sr) |
| f.writeframes(wav_bytestream) |
|
|
| ret_value = audio_unique_filename |
| else: |
| data_s16 = np.frombuffer(wav_bytestream, dtype=np.int16, count=len(wav_bytestream) // 2, |
| offset=0) |
| float_data = data_s16 * 0.5 ** 15 |
| reduced_noise = nr.reduce_noise(y=float_data, sr=sr, prop_decrease=0.8, n_fft=1024) |
| wav_np = (reduced_noise * 32767).astype(np.int16) |
| ret_value = wav_np |
| else: |
| ret_value = wav_bytestream |
| if return_gradio: |
| import gradio as gr |
| return gr.Audio(value=ret_value, autoplay=True) |
| else: |
| return ret_value |
| except RuntimeError as e: |
| if "device-side assert" in str(e): |
| print(f"Restarted required due to exception: %s" % str(e), flush=True) |
| else: |
| print("Failed to generate wave: %s" % str(e)) |
| raise |
|
|
|
|
| def get_role_to_wave_map(): |
| |
| roles_map = {} |
| roles_map["Female AI Assistant"] = "models/female.wav" |
| roles_map["Male AI Assistant"] = "models/male.wav" |
| roles_map["AI Beard The Pirate"] = "models/pirate_by_coqui.wav" |
| roles_map["None"] = "" |
| return roles_map |
|
|
|
|
| def allowed_roles(): |
| return list(get_role_to_wave_map().keys()) |
|
|
|
|
| def get_roles(choices=None, value=None): |
| if choices is None: |
| choices = allowed_roles() |
| if value is None: |
| value = choices[0] |
| import gradio as gr |
| chatbot_role = gr.Dropdown( |
| label="Speech Style", |
| choices=choices, |
| value=value, |
| ) |
| return chatbot_role |
|
|
|
|
| def predict_from_text(response, chatbot_role, language, roles_map, tts_speed, |
| model=None, |
| supported_languages=None, |
| return_as_byte=True, sr=24000, verbose=False): |
| if chatbot_role == "None": |
| return |
| audio0 = prepare_speech(sr=sr) |
| yield audio0 |
| latent = get_latent(roles_map[chatbot_role], model=model) |
| sentence_state = init_sentence_state() |
| generate_speech_func = functools.partial(generate_speech, |
| model=model, |
| language=language, |
| supported_languages=supported_languages, |
| latent=latent, |
| sentence_state=sentence_state, |
| return_as_byte=return_as_byte, |
| sr=sr, |
| tts_speed=tts_speed, |
| verbose=verbose) |
| while True: |
| audio1, sentence, sentence_state = generate_speech_func(response, is_final=False) |
| if sentence is not None: |
| yield audio1 |
| else: |
| break |
|
|
| audio1, sentence, sentence_state = generate_speech_func(response, is_final=True) |
| yield audio1 |
|
|
|
|
| def filter_wave_1(speaker_wav): |
| try: |
| cleanup_filter = "lowpass=8000,highpass=75,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02" |
| resample_filter = "-ac 1 -ar 22050" |
| out_filename = speaker_wav + str(uuid.uuid4()) + ".wav" |
| |
| shell_command = f"ffmpeg -y -i {speaker_wav} -af {cleanup_filter} {resample_filter} {out_filename}".split( |
| " ") |
|
|
| command_result = subprocess.run([item for item in shell_command], capture_output=False, text=True, |
| check=True) |
| speaker_wav = out_filename |
| print("Filtered microphone input") |
| except subprocess.CalledProcessError: |
| |
| print("Error: failed filtering, use original microphone input") |
| return speaker_wav |
|
|
|
|
| def filter_wave_2(speaker_wav): |
| |
| |
|
|
| |
| lowpassfilter = denoise = trim = loudness = True |
|
|
| if lowpassfilter: |
| lowpass_highpass = "lowpass=8000,highpass=75," |
| else: |
| lowpass_highpass = "" |
|
|
| if trim: |
| |
| trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02," |
| else: |
| trim_silence = "" |
|
|
| try: |
| out_filename = ( |
| speaker_wav + str(uuid.uuid4()) + ".wav" |
| ) |
|
|
| |
| shell_command = f"./ffmpeg -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split( |
| " " |
| ) |
|
|
| command_result = subprocess.run( |
| [item for item in shell_command], |
| capture_output=False, |
| text=True, |
| check=True, |
| ) |
| speaker_wav = out_filename |
| print("Filtered microphone input") |
| except subprocess.CalledProcessError: |
| |
| print("Error: failed filtering, use original microphone input") |
| return speaker_wav |
|
|
|
|
| def get_languages_gr(visible=True, value=None): |
| import gradio as gr |
| choices = [ |
| "autodetect", |
| "en", |
| "es", |
| "fr", |
| "de", |
| "it", |
| "pt", |
| "pl", |
| "tr", |
| "ru", |
| "nl", |
| "cs", |
| "ar", |
| "zh-cn", |
| "ja", |
| "ko", |
| "hu" |
| ] |
| if value is None: |
| value = choices[0] |
| language_gr = gr.Dropdown( |
| label="Language", |
| info="Select an output language for the synthesised speech", |
| choices=choices, |
| value=value, |
| visible=visible, |
| ) |
| return language_gr |
|
|