| import sys |
| import io, os, stat |
| import torch |
|
|
| import subprocess |
| import random |
| from zipfile import ZipFile |
| import uuid |
| import time |
| import torchaudio |
| import numpy as np |
|
|
| |
| |
| print("install unidic") |
| os.system('python -m unidic download') |
|
|
| |
| os.environ["COQUI_TOS_AGREED"] = "1" |
|
|
| |
| |
| import langid |
| import base64 |
| import csv |
| from io import StringIO |
| import datetime |
| import re |
|
|
| from scipy.io.wavfile import write |
| from pydub import AudioSegment |
|
|
| from TTS.api import TTS |
| from TTS.tts.configs.xtts_config import XttsConfig |
| from TTS.tts.models.xtts import Xtts |
| from TTS.utils.generic_utils import get_user_data_dir |
| from huggingface_hub import HfApi |
|
|
| |
| print("Export newer ffmpeg binary for denoise filter") |
| ZipFile("ffmpeg.zip").extractall() |
| print("Make ffmpeg binary executable") |
| st = os.stat("ffmpeg") |
| os.chmod("ffmpeg", st.st_mode | stat.S_IEXEC) |
|
|
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| if not HF_TOKEN: |
| raise ValueError("HF_TOKEN environment variable is not set") |
|
|
| |
| api = HfApi(token=HF_TOKEN) |
| repo_id = "coqui/xtts" |
|
|
| |
| print("Downloading if not downloaded Coqui XTTS V2") |
| from TTS.utils.manage import ModelManager |
|
|
| model_name = "tts_models/multilingual/multi-dataset/xtts_v2" |
| ModelManager().download_model(model_name) |
| model_path = os.path.join(get_user_data_dir("tts"), model_name.replace("/", "--")) |
| print("XTTS downloaded") |
|
|
| |
| os.system(f'chown -R appuser:appgroup {model_path}') |
| os.system(f'chmod -R 755 {model_path}') |
| |
| if not os.access(model_path, os.W_OK): |
| raise PermissionError(f"Write permission denied for model directory: {model_path}") |
|
|
| config = XttsConfig() |
| config.load_json(os.path.join(model_path, "config.json")) |
|
|
| model = Xtts.init_from_config(config) |
| checkpoint_path = os.path.join(model_path, "model.pth") |
| vocab_path = os.path.join(model_path, "vocab.json") |
|
|
| if not os.path.exists(checkpoint_path): |
| raise FileNotFoundError(f"Checkpoint file not found at {checkpoint_path}") |
| if not os.path.exists(vocab_path): |
| raise FileNotFoundError(f"Vocab file not found at {vocab_path}") |
|
|
| if not os.environ.get('CUDA_HOME'): |
| print(f"ENV var CUDA_HOME is not set, defaulting to: '/usr/local/cuda'") |
| os.environ['CUDA_HOME'] = f"/usr/local/cuda" |
|
|
| model.load_checkpoint( |
| config, |
| checkpoint_dir=model_path, |
| vocab_path=vocab_path, |
| eval=True, |
| use_deepspeed=True, |
| ) |
| model.cuda() |
|
|
| |
| DEVICE_ASSERT_DETECTED = 0 |
| DEVICE_ASSERT_PROMPT = None |
| DEVICE_ASSERT_LANG = None |
|
|
| supported_languages = config.languages |
| def numpy_to_mp3(audio_array, sampling_rate): |
| |
| if np.issubdtype(audio_array.dtype, np.floating): |
| max_val = np.max(np.abs(audio_array)) |
| audio_array = (audio_array / max_val) * 32767 |
| audio_array = audio_array.astype(np.int16) |
|
|
| |
| audio_segment = AudioSegment( |
| audio_array.tobytes(), |
| frame_rate=sampling_rate, |
| sample_width=audio_array.dtype.itemsize, |
| channels=1 |
| ) |
|
|
| |
| mp3_io = io.BytesIO() |
| audio_segment.export(mp3_io, format="mp3", bitrate="320k") |
|
|
| |
| mp3_bytes = mp3_io.getvalue() |
| mp3_io.close() |
|
|
| return mp3_bytes |
|
|
| def predict( |
| prompt, |
| language, |
| audio_file_pth, |
| mic_file_path, |
| use_mic, |
| voice_cleanup, |
| no_lang_auto_detect, |
| agree, |
| ): |
| print("####################################### Predict Called ##############################") |
| print("promp:",prompt) |
| print("language:",language) |
| print("audio_file_pth:",audio_file_pth) |
| print("mic_file_path:",mic_file_path) |
| print("use_mic:",use_mic) |
| print("voice_cleanup:",voice_cleanup) |
| print("no_lang_auto_detect:",no_lang_auto_detect) |
| print("agree:",agree) |
| if agree == True: |
| if language not in supported_languages: |
| print( |
| f"Language you put {language} in is not in is not in our Supported Languages, please choose from dropdown" |
| ) |
|
|
| return ( |
| None, |
| ) |
|
|
| language_predicted = langid.classify(prompt)[ |
| 0 |
| ].strip() |
|
|
| |
| if language_predicted == "zh": |
| |
| language_predicted = "zh-cn" |
|
|
| print(f"Detected language:{language_predicted}, Chosen language:{language}") |
|
|
| |
| if len(prompt) > 15: |
| |
| |
| |
| if language_predicted != language and not no_lang_auto_detect: |
| |
| |
| print( |
| f"It looks like your text isn’t the language you chose , if you’re sure the text is the same language you chose, please check disable language auto-detection checkbox" |
| ) |
|
|
| return ( |
| None, |
| ) |
|
|
| if use_mic == True: |
| if mic_file_path is not None: |
| speaker_wav = mic_file_path |
| else: |
| print( |
| "Please record your voice with Microphone, or uncheck Use Microphone to use reference audios" |
| ) |
| return ( |
| None, |
| ) |
|
|
| else: |
| speaker_wav = audio_file_pth |
|
|
| |
| |
|
|
| |
| lowpassfilter = denoise = trim = loudness = True |
|
|
| if lowpassfilter: |
| lowpass_highpass = "lowpass=8000,highpass=75," |
| else: |
| lowpass_highpass = "" |
|
|
| if trim: |
| |
| trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02," |
| else: |
| trim_silence = "" |
|
|
| if voice_cleanup: |
| try: |
| out_filename = ( |
| speaker_wav + str(uuid.uuid4()) + ".wav" |
| ) |
|
|
| |
| shell_command = f"./ffmpeg -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split( |
| " " |
| ) |
|
|
| command_result = subprocess.run( |
| [item for item in shell_command], |
| capture_output=False, |
| text=True, |
| check=True, |
| ) |
| speaker_wav = out_filename |
| print("Filtered microphone input") |
| except subprocess.CalledProcessError: |
| |
| print("Error: failed filtering, use original microphone input") |
| else: |
| speaker_wav = speaker_wav |
|
|
| if len(prompt) < 2: |
| print("Please give a longer prompt text") |
| return ( |
| None, |
| ) |
| if len(prompt) > 1000: |
| print( |
| "Text length limited to 200 characters for this demo, please try shorter text. You can clone this space and edit code for your own usage" |
| ) |
| return ( |
| None, |
| ) |
| global DEVICE_ASSERT_DETECTED |
| if DEVICE_ASSERT_DETECTED: |
| global DEVICE_ASSERT_PROMPT |
| global DEVICE_ASSERT_LANG |
| |
| print( |
| f"Unrecoverable exception caused by language:{DEVICE_ASSERT_LANG} prompt:{DEVICE_ASSERT_PROMPT}" |
| ) |
|
|
| |
| space = api.get_space_runtime(repo_id=repo_id) |
| if space.stage != "BUILDING": |
| api.restart_space(repo_id=repo_id) |
| else: |
| print("TRIED TO RESTART but space is building") |
|
|
| try: |
| metrics_text = "" |
| t_latent = time.time() |
|
|
| |
| try: |
| ( |
| gpt_cond_latent, |
| speaker_embedding, |
| ) = model.get_conditioning_latents(audio_path=speaker_wav, gpt_cond_len=30, gpt_cond_chunk_len=4, max_ref_length=60) |
| except Exception as e: |
| print("Speaker encoding error", str(e)) |
| print( |
| "It appears something wrong with reference, did you unmute your microphone?" |
| ) |
| return ( |
| None, |
| ) |
|
|
| latent_calculation_time = time.time() - t_latent |
| |
|
|
| |
| prompt = re.sub("([^\x00-\x7F]|\w)(\.|\。|\?)", r"\1 \2\2", prompt) |
|
|
| wav_chunks = [] |
| |
| """ |
| print("I: Generating new audio...") |
| t0 = time.time() |
| out = model.inference( |
| prompt, |
| language, |
| gpt_cond_latent, |
| speaker_embedding, |
| repetition_penalty=5.0, |
| temperature=0.75, |
| ) |
| inference_time = time.time() - t0 |
| print(f"I: Time to generate audio: {round(inference_time*1000)} milliseconds") |
| metrics_text+=f"Time to generate audio: {round(inference_time*1000)} milliseconds\n" |
| real_time_factor= (time.time() - t0) / out['wav'].shape[-1] * 24000 |
| print(f"Real-time factor (RTF): {real_time_factor}") |
| metrics_text+=f"Real-time factor (RTF): {real_time_factor:.2f}\n" |
| torchaudio.save("output.wav", torch.tensor(out["wav"]).unsqueeze(0), 24000) |
| """ |
| print("I: Generating new audio in streaming mode...") |
| t0 = time.time() |
| chunks = model.inference_stream( |
| prompt, |
| language, |
| gpt_cond_latent, |
| speaker_embedding, |
| repetition_penalty=7.0, |
| temperature=0.85, |
| ) |
|
|
| first_chunk = True |
| for i, chunk in enumerate(chunks): |
| if first_chunk: |
| first_chunk_time = time.time() - t0 |
| metrics_text += f"Latency to first audio chunk: {round(first_chunk_time*1000)} milliseconds\n" |
| first_chunk = False |
|
|
| |
| chunk_np = chunk.cpu().numpy() |
| print('chunk',i) |
| yield numpy_to_mp3(chunk_np,24000) |
| wav_chunks.append(chunk) |
| |
| print(f"Received chunk {i} of audio length {chunk.shape[-1]}") |
| inference_time = time.time() - t0 |
| print( |
| f"I: Time to generate audio: {round(inference_time*1000)} milliseconds" |
| ) |
| |
| |
| |
|
|
| except RuntimeError as e: |
| if "device-side assert" in str(e): |
| |
| print( |
| f"Exit due to: Unrecoverable exception caused by language:{language} prompt:{prompt}", |
| flush=True, |
| ) |
| print("Unhandled Exception encounter, please retry in a minute") |
| print("Cuda device-assert Runtime encountered need restart") |
| if not DEVICE_ASSERT_DETECTED: |
| DEVICE_ASSERT_DETECTED = 1 |
| DEVICE_ASSERT_PROMPT = prompt |
| DEVICE_ASSERT_LANG = language |
|
|
| |
| |
| error_time = datetime.datetime.now().strftime("%d-%m-%Y-%H:%M:%S") |
| error_data = [ |
| error_time, |
| prompt, |
| language, |
| audio_file_pth, |
| mic_file_path, |
| use_mic, |
| voice_cleanup, |
| no_lang_auto_detect, |
| agree, |
| ] |
| error_data = [str(e) if type(e) != str else e for e in error_data] |
| print(error_data) |
| print(speaker_wav) |
| write_io = StringIO() |
| csv.writer(write_io).writerows([error_data]) |
| csv_upload = write_io.getvalue().encode() |
|
|
| filename = error_time + "_" + str(uuid.uuid4()) + ".csv" |
| print("Writing error csv") |
| error_api = HfApi() |
| error_api.upload_file( |
| path_or_fileobj=csv_upload, |
| path_in_repo=filename, |
| repo_id="coqui/xtts-flagged-dataset", |
| repo_type="dataset", |
| ) |
|
|
| |
| print("Writing error reference audio") |
| speaker_filename = ( |
| error_time + "_reference_" + str(uuid.uuid4()) + ".wav" |
| ) |
| error_api = HfApi() |
| error_api.upload_file( |
| path_or_fileobj=speaker_wav, |
| path_in_repo=speaker_filename, |
| repo_id="coqui/xtts-flagged-dataset", |
| repo_type="dataset", |
| ) |
|
|
| |
| space = api.get_space_runtime(repo_id=repo_id) |
| if space.stage != "BUILDING": |
| api.restart_space(repo_id=repo_id) |
| else: |
| print("TRIED TO RESTART but space is building") |
|
|
| else: |
| if "Failed to decode" in str(e): |
| print("Speaker encoding error", str(e)) |
| print( |
| "It appears something wrong with reference, did you unmute your microphone?" |
| ) |
| else: |
| print("RuntimeError: non device-side assert error:", str(e)) |
| print("Something unexpected happened please retry again.") |
| return ( |
| None, |
| ) |
|
|
| else: |
| print("Please accept the Terms & Condition!") |
| return ( |
| None, |
| ) |