| import os |
| import gc |
| import ast |
| import requests |
| import sys |
| import shutil |
| import zipfile |
| import gradio as gr |
| import urllib.request |
| import gdown |
| import tempfile |
| from datetime import datetime |
|
|
| current_dir = os.getcwd() |
| dirs = [ |
| "voice_models", |
| "vbach", |
| os.path.join("vbach", "cli"), |
| os.path.join("vbach", "infer"), |
| os.path.join("vbach", "lib"), |
| os.sep.join(["vbach", "lib", "algorithm"]), |
| os.sep.join(["vbach", "lib", "predictors"]), |
| os.path.join("vbach", "models"), |
| os.sep.join(["vbach", "models", "predictors"]), |
| os.sep.join(["vbach", "models", "embedders"]), |
| os.path.join("vbach", "scripts"), |
| os.path.join("vbach", "utils") |
| ] |
|
|
| RMVPE_PATH = os.path.join(dirs[8], "rmvpe.pt") |
| FCPE_PATH = os.path.join(dirs[8], "fcpe.pt") |
| RVC_MODELS_DIR = dirs[0] |
| HUBERT_MODEL_PATH = os.path.join( |
| dirs[9], "hubert_base.pt" |
| ) |
| CURRENT_LANG = "ru" |
| OUTPUT_FORMAT = ["mp3", "wav", "flac", "aiff", "m4a", "aac", "ogg", "opus"] |
| TRANSLATIONS = { |
| "ru": { |
| "app_title": "VBach", |
| "inference": "Инференс", |
| "select_file": "Выберите файл", |
| "audio_path": "Путь к файлу", |
| "audio_path_info": "Здесь можно ввести путь к файлу/список путей к файлам , либо загрузить его/их выше и получить путь к нему/их список", |
| "audio_processing": "Режим обработки аудио", |
| "output_format": "Формат вывода", |
| "name_format": "Шаблон", |
| "name_format_info": """Доступные ключи для формата: |
| NAME - Имя входного файла |
| MODEL - Название модели |
| PITCH - Высота тона |
| F0_METHOD - Метод извлечения тона |
| DATETIME - Время и дата создания результата |
| |
| Пример - NAME_MODEL_PITCH → name_your-model_12""", |
| "convert_single": "Конвертировать один", |
| "convert_batch": "Конвертировать несколько", |
| "model_name": "Имя модели", |
| "pitch_method": "Метод извлечения тона", |
| "pitch": "Высота тона", |
| "hop_length": "Длина шага", |
| "bitrate": "Битрейт (Кбит/сек)", |
| "f0_min": "Нижний лимит определения высоты тона", |
| "f0_max": "Верхний лимит определения высоты тона", |
| "advanced_settings": "Дополнительные настройки", |
| "filter_radius": "Радиус фильтра", |
| "index_rate": "Влияние индекса", |
| "rms": "Огибающая громкости", |
| "protect": "Защита согласных", |
| "model_manager": "Менеджер моделей", |
| "download_url": "Загрузить по ссылке", |
| "download_zip": "Загрузить ZIP архивом", |
| "download_files": "Загрузить файлами", |
| "delete_model": "Удалить модель", |
| "download_link": "Ссылка на загрузку модели", |
| "unique_name": "Дайте вашей загружаемой модели уникальное имя, отличное от других голосовых моделей.", |
| "download_button": "Загрузить модель", |
| "supported_sites": "Поддерживаемые сайты", |
| "output_message": "Сообщение вывода", |
| "zip_file": "Zip-файл", |
| "upload_steps": "<h3>1. Найдите и скачайте файлы: .pth и необязательный файл .index</h3><h3>2. Закиньте файл(-ы) в ZIP-архив и поместите его в область загрузки</h3><h3>3. Дождитесь полной загрузки ZIP-архива в интерфейс</h3>", |
| "pth_file": "pth-файл", |
| "index_file": "index-файл", |
| "delete_info": "Выберите модель, которую надо удалить", |
| "refresh_button": "Обновить список моделей", |
| "delete_button": "Удалить модель", |
| "batch_upload": "Пакетная загрузка", |
| "single_upload": "Одиночная загрузка", |
| "converted_voice": "Преобразованный вокал", |
| "converted_voices": "Преобразованные вокалы", |
| "update_button": "Обновить", |
| "processing": "Сейчас обрабатывается - {namefile}", |
| "files": "файлов", |
| "error_no_audio": "Не удалось найти аудиофайл(ы). Убедитесь, что файл загрузился или проверьте правильность пути к нему.", |
| "error_no_model": "Выберите модель голоса для преобразования голоса", |
| "warning_file_not_found": "Файл {file} не найден.", |
| "success_single": "Вокал успешно преобразован", |
| "success_batch": "Вокалы успешно преобразованы", |
| "language": "Язык", |
| "stereo_modes": { |
| "mono": "Моно", |
| "left/right": "Левый/Правый", |
| "sim/dif": "Сходство/Различия" |
| }, |
| |
| 'downloading_google': "[~] Загрузка модели с Google Drive...", |
| 'downloading_huggingface': "[~] Загрузка модели с HuggingFace...", |
| 'downloading_pixeldrain': "[~] Загрузка модели с Pixeldrain...", |
| 'downloading_yandex': "[~] Загрузка модели с Яндекс Диска...", |
| 'downloading_model': "[~] Загрузка голосовой модели {dir_name}...", |
| 'unpacking_zip': "[~] Распаковка zip-файла...", |
| |
| |
| 'unsupported_source': "Неподдерживаемый источник: {url}", |
| 'download_error': "Ошибка при скачивании: {error}", |
| 'yandex_api_error': "Ошибка при получении ссылки с Яндекс Диска: {status}", |
| 'pth_not_found': "Не найден файл модели .pth в распакованном zip-файле. Проверьте содержимое в {folder}.", |
| 'model_exists': "Директория голосовой модели {dir_name} уже существует! Выберите другое имя.", |
| 'model_load_error': "Ошибка при загрузке модели: {error}", |
| 'model_delete_error': "Ошибка при удалении модели: {error}", |
| |
| |
| 'mega_unsupported': "Mega не поддерживается!", |
| 'model_uploaded': "[+] Модель {dir_name} успешно загружена!", |
| 'model_deleted': "[-] Модель {dir_name} успешно удалена!", |
| 'model_not_found': "[-] Модели {dir_name} не существует", |
| "error_strlist_is_not_list": "Эта строка не является списком файлов", |
| "error_path_is_list": "Путь к файлу является списком" |
| }, |
| "en": { |
| "app_title": "VBach", |
| "inference": "Inference", |
| "select_file": "Select File", |
| "audio_path": "Audio path", |
| "audio_path_info": "You can enter a file path or a list of file paths here, or upload the file(s) above to obtain their path(s)", |
| "audio_processing": "Audio Processing Mode", |
| "output_format": "Output Format", |
| "name_format": "Template", |
| "name_format_info": """Available format keys: |
| NAME - Input file name |
| MODEL - Model name |
| PITCH - Pitch |
| F0_METHOD - Method extraction pitch |
| DATETIME - Date & time create results |
| |
| Example - NAME_MODEL_PITCH → name_your-model_12""", |
| "convert_single": "Convert Single", |
| "convert_batch": "Convert Batch", |
| "model_name": "Model Name", |
| "pitch_method": "Pitch Extraction Method", |
| "pitch": "Pitch", |
| "hop_length": "Hop Length", |
| "bitrate": "Bitrate (Kbit/sec)", |
| "f0_min": "F0 Min", |
| "f0_max": "F0 Max", |
| "advanced_settings": "Advanced Settings", |
| "filter_radius": "Filter Radius", |
| "index_rate": "Index Rate", |
| "rms": "RMS Envelope", |
| "protect": "Consonant Protection", |
| "model_manager": "Model Manager", |
| "download_url": "Download by URL", |
| "download_zip": "Upload ZIP Archive", |
| "download_files": "Upload Files", |
| "delete_model": "Delete Model", |
| "download_link": "Model Download Link", |
| "unique_name": "Give your model a unique name different from other voice models.", |
| "download_button": "Download Model", |
| "supported_sites": "Supported Sites", |
| "output_message": "Output Message", |
| "zip_file": "Zip File", |
| "upload_steps": "<h3>1. Find and download files: .pth and optional .index</h3><h3>2. Put file(s) in a ZIP archive and upload it</h3><h3>3. Wait for the ZIP archive to be fully uploaded</h3>", |
| "pth_file": "PTH File", |
| "index_file": "Index File", |
| "delete_info": "Select the model to delete", |
| "refresh_button": "Refresh Model List", |
| "delete_button": "Delete Model", |
| "batch_upload": "Batch Upload", |
| "single_upload": "Single Upload", |
| "converted_voice": "Converted Voice", |
| "converted_voices": "Converted Voices", |
| "update_button": "Refresh", |
| "processing": "Processing - {namefile}", |
| "files": "files", |
| "error_no_audio": "Could not find audio file(s). Make sure the file is uploaded or check the file path.", |
| "error_no_model": "Select a voice model for voice conversion", |
| "warning_file_not_found": "File {file} not found.", |
| "success_single": "Voice successfully converted", |
| "success_batch": "Voices successfully converted", |
| "language": "Language", |
| "stereo_modes": { |
| "mono": "Mono", |
| "left/right": "Left/Right", |
| "sim/dif": "Similarity/Difference" |
| }, |
| 'downloading_google': "[~] Downloading model from Google Drive...", |
| 'downloading_huggingface': "[~] Downloading model from HuggingFace...", |
| 'downloading_pixeldrain': "[~] Downloading model from Pixeldrain...", |
| 'downloading_yandex': "[~] Downloading model from Yandex Disk...", |
| 'downloading_model': "[~] Downloading voice model {dir_name}...", |
| 'unpacking_zip': "[~] Unpacking zip file...", |
| |
| |
| 'unsupported_source': "Unsupported source: {url}", |
| 'download_error': "Download error: {error}", |
| 'yandex_api_error': "Yandex Disk API error: {status}", |
| 'pth_not_found': "Model .pth file not found in unzipped archive. Check contents in {folder}.", |
| 'model_exists': "Voice model directory {dir_name} already exists! Choose another name.", |
| 'model_load_error': "Error loading model: {error}", |
| 'model_delete_error': "Error deleting model: {error}", |
| |
| |
| 'mega_unsupported': "Mega is not supported!", |
| 'model_uploaded': "[+] Model {dir_name} uploaded successfully!", |
| 'model_deleted': "[-] Model {dir_name} deleted successfully!", |
| 'model_not_found': "[-] Model {dir_name} does not exist", |
| "error_strlist_is_not_list": "This string is not a file list", |
| "error_path_is_list": "The file path is a list" |
| } |
| } |
|
|
|
|
| for dir in dirs: |
| os.makedirs(os.path.join(current_dir, dir), exist_ok=True) |
|
|
| for url, file in [["https://huggingface.co/Politrees/RVC_resources/resolve/main/predictors/rmvpe.pt", RMVPE_PATH], ["https://huggingface.co/Politrees/RVC_resources/resolve/main/predictors/fcpe.pt", FCPE_PATH], ["https://huggingface.co/Politrees/RVC_resources/resolve/main/embedders/hubert_base.pt", HUBERT_MODEL_PATH]]: |
| if not os.path.exists(file): |
| try: |
| r = requests.get(url, stream=True) |
| r.raise_for_status() |
| with open(os.path.join(file), "wb") as f: |
| for chunk in r.iter_content(chunk_size=8192): |
| f.write(chunk) |
| except requests.exceptions.RequestException as e: |
| print(f"Произошла ошибка при загрузке модели: {e}") |
| except Exception as e: |
| print(f"Произошла непредвиденная ошибка: {e}") |
| |
|
|
| inference = ''' |
| import torch |
| import numpy as np |
| import librosa |
| from multiprocessing import cpu_count |
| from fairseq import checkpoint_utils |
| |
| from vbach.lib.algorithm.synthesizers import Synthesizer |
| from .pipeline import VC |
| |
| from separator.audio_writer import write_audio_file |
| |
| from vbach.utils.remove_center import remove_center |
| |
| def overlay_mono_on_stereo(mono_audio, stereo_audio, gain=0.5): |
| if mono_audio is None or stereo_audio is None: |
| raise ValueError("Input audio arrays cannot be None") |
| |
| # Ensure float32 for processing |
| mono_audio = mono_audio.astype(np.float32) |
| stereo_audio = stereo_audio.astype(np.float32) |
| |
| # Convert mono to stereo if needed |
| if mono_audio.ndim == 1: |
| mono_audio = np.vstack([mono_audio, mono_audio]) |
| elif mono_audio.shape[0] == 1: |
| mono_audio = np.vstack([mono_audio[0], mono_audio[0]]) |
| |
| if mono_audio.shape[0] != 2 or stereo_audio.shape[0] != 2: |
| raise ValueError("Shapes must be (2, N)") |
| |
| min_len = min(mono_audio.shape[1], stereo_audio.shape[1]) |
| if min_len == 0: |
| raise ValueError("Audio arrays cannot be empty") |
| |
| mono_audio = mono_audio[:, :min_len] |
| stereo_audio = stereo_audio[:, :min_len] |
| |
| result = stereo_audio + mono_audio * gain |
| |
| # Normalize to prevent clipping |
| max_amp = np.max(np.abs(result)) |
| if max_amp > 0: |
| result /= max_amp |
| |
| # Convert back to int16 for output (if needed) |
| result = (result * 32767).astype(np.int16) |
| |
| return result |
| |
| def load_audio( |
| file_path: str, |
| target_sr: int, |
| stereo_mode: str |
| ) -> np.ndarray: |
| """ |
| Загружает аудиофайл с помощью librosa, обрабатывает и возвращает аудиосигнал |
| |
| Параметры: |
| file_path: Путь к аудиофайлу |
| target_sr: Целевая частота дискретизации |
| mono: Преобразовать в моно (по умолчанию True) |
| normalize: Нормализовать аудио (по умолчанию False) |
| duration: Загрузить только указанную длительность (в секундах) |
| offset: Начальное смещение для загрузки (в секундах) |
| |
| Возвращает: |
| Аудиоданные в виде numpy array (моно: (samples,), стерео: (channels, samples)) |
| |
| Исключения: |
| RuntimeError: При ошибках загрузки или обработки аудио |
| """ |
| try: |
| mid, left, right = None, None, None |
| |
| if stereo_mode == "mono": |
| # Загрузка аудио с помощью librosa |
| mid_audio, sr = librosa.load( |
| file_path, |
| sr=None, |
| mono=True |
| ) |
| mid_audio = librosa.resample( |
| mid_audio, # Исправлено: было audio |
| orig_sr=sr, |
| target_sr=target_sr |
| ) |
| mid = mid_audio.flatten() |
| |
| elif stereo_mode == "left/right" or stereo_mode == "sim/dif": |
| # Загрузка аудио с помощью librosa |
| stereo_audio, sr = librosa.load( |
| file_path, |
| sr=None, |
| mono=False |
| ) |
| |
| if stereo_mode == "left/right": |
| left_audio = stereo_audio[0] # Исправлено: было [:, 0] |
| right_audio = stereo_audio[1] # Исправлено: было [:, 1] |
| left_audio = librosa.resample( |
| left_audio, |
| orig_sr=sr, |
| target_sr=target_sr |
| ) |
| right_audio = librosa.resample( |
| right_audio, |
| orig_sr=sr, |
| target_sr=target_sr |
| ) |
| |
| left = left_audio.flatten() |
| right = right_audio.flatten() |
| |
| elif stereo_mode == "sim/dif": |
| mid_left, mid_right, dif_left, dif_right = remove_center(input_array=stereo_audio, samplerate=sr) |
| mid_audio = (mid_left + mid_right) * 0.5 |
| |
| mid_audio = librosa.resample( |
| mid_audio, |
| orig_sr=sr, |
| target_sr=target_sr |
| ) |
| dif_left = librosa.resample( |
| dif_left, |
| orig_sr=sr, |
| target_sr=target_sr |
| ) |
| dif_right = librosa.resample( |
| dif_right, |
| orig_sr=sr, |
| target_sr=target_sr |
| ) |
| |
| mid = mid_audio.flatten() |
| left = dif_left.flatten() # Исправлено: было left_audio |
| right = dif_right.flatten() # Исправлено: было right_audio |
| |
| return mid, left, right |
| |
| except Exception as e: |
| raise RuntimeError(f"Ошибка загрузки аудио '{file_path}': {str(e)}") |
| |
| class Config: |
| def __init__(self): |
| self.device = self.get_device() |
| self.is_half = self.device == "cpu" |
| self.n_cpu = cpu_count() |
| self.gpu_name = None |
| self.gpu_mem = None |
| self.x_pad, self.x_query, self.x_center, self.x_max = self.device_config() |
| |
| def get_device(self): |
| if torch.cuda.is_available(): |
| return "cuda" |
| elif torch.backends.mps.is_available(): |
| return "mps" |
| else: |
| return "cpu" |
| |
| def device_config(self): |
| if torch.cuda.is_available(): |
| print("Используется устройство CUDA") |
| self._configure_gpu() |
| elif torch.backends.mps.is_available(): |
| print("Используется устройство MPS") |
| self.device = "mps" |
| else: |
| print("Используется CPU") |
| self.device = "cpu" |
| self.is_half = True |
| |
| x_pad, x_query, x_center, x_max = ( |
| (3, 10, 60, 65) if self.is_half else (1, 6, 38, 41) |
| ) |
| if self.gpu_mem is not None and self.gpu_mem <= 4: |
| x_pad, x_query, x_center, x_max = (1, 5, 30, 32) |
| |
| return x_pad, x_query, x_center, x_max |
| |
| def _configure_gpu(self): |
| self.gpu_name = torch.cuda.get_device_name(self.device) |
| low_end_gpus = ["16", "P40", "P10", "1060", "1070", "1080"] |
| if ( |
| any(gpu in self.gpu_name for gpu in low_end_gpus) |
| and "V100" not in self.gpu_name.upper() |
| ): |
| self.is_half = False |
| self.gpu_mem = int( |
| torch.cuda.get_device_properties(self.device).total_memory |
| / 1024 |
| / 1024 |
| / 1024 |
| + 0.4 |
| ) |
| |
| # Загрузка модели Hubert |
| def load_hubert(device, is_half, model_path): |
| models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task( |
| [model_path], suffix="" |
| ) |
| hubert = models[0].to(device) |
| hubert = hubert.half() if is_half else hubert.float() |
| hubert.eval() |
| return hubert |
| |
| # Получение голосового преобразователя |
| def get_vc(device, is_half, config, model_path): |
| cpt = torch.load(model_path, map_location="cpu", weights_only=False) |
| if "config" not in cpt or "weight" not in cpt: |
| raise ValueError( |
| f"Некорректный формат для {model_path}. " |
| "Используйте голосовую модель, обученную с использованием RVC v2." |
| ) |
| |
| tgt_sr = cpt["config"][-1] |
| cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] |
| pitch_guidance = cpt.get("f0", 1) |
| version = cpt.get("version", "v1") |
| input_dim = 768 if version == "v2" else 256 |
| |
| net_g = Synthesizer( |
| *cpt["config"], |
| use_f0=pitch_guidance, |
| input_dim=input_dim, |
| is_half=is_half, |
| ) |
| |
| del net_g.enc_q |
| print(net_g.load_state_dict(cpt["weight"], strict=False)) |
| net_g.eval().to(device) |
| net_g = net_g.half() if is_half else net_g.float() |
| |
| vc = VC(tgt_sr, config) |
| return cpt, version, net_g, tgt_sr, vc |
| |
| def rvc_infer( |
| index_path, |
| index_rate, |
| input_path, |
| output_path, |
| pitch, |
| f0_method, |
| cpt, |
| version, |
| net_g, |
| filter_radius, |
| tgt_sr, |
| volume_envelope, |
| protect, |
| hop_length, |
| vc, |
| hubert_model, |
| f0_min=50, |
| f0_max=1100, |
| format_output="wav", |
| output_bitrate="320k", |
| stereo_mode="mono" |
| ): |
| |
| mid, left, right = load_audio(input_path, 16000, stereo_mode) |
| pitch_guidance = cpt.get("f0", 1) |
| |
| if stereo_mode == "mono": |
| if mid is None: |
| raise ValueError("Mono audio data is None") |
| audio_opt = vc.pipeline( |
| hubert_model, |
| net_g, |
| 0, |
| mid, |
| input_path, |
| pitch, |
| f0_method, |
| index_path, |
| index_rate, |
| pitch_guidance, |
| filter_radius, |
| tgt_sr, |
| 0, |
| volume_envelope, |
| version, |
| protect, |
| hop_length, |
| f0_file=None, |
| f0_min=f0_min, |
| f0_max=f0_max, |
| ) |
| |
| elif stereo_mode == "left/right": |
| if left is None or right is None: |
| raise ValueError("Left or right audio channel is None") |
| |
| left_audio_opt = vc.pipeline( |
| hubert_model, |
| net_g, |
| 0, |
| left, |
| input_path, |
| pitch, |
| f0_method, |
| index_path, |
| index_rate, |
| pitch_guidance, |
| filter_radius, |
| tgt_sr, |
| 0, |
| volume_envelope, |
| version, |
| protect, |
| hop_length, |
| f0_file=None, |
| f0_min=f0_min, |
| f0_max=f0_max, |
| ) |
| right_audio_opt = vc.pipeline( |
| hubert_model, |
| net_g, |
| 0, |
| right, |
| input_path, |
| pitch, |
| f0_method, |
| index_path, |
| index_rate, |
| pitch_guidance, |
| filter_radius, |
| tgt_sr, |
| 0, |
| volume_envelope, |
| version, |
| protect, |
| hop_length, |
| f0_file=None, |
| f0_min=f0_min, |
| f0_max=f0_max, |
| ) |
| |
| # Ensure both channels have the same length |
| min_len = min(len(left_audio_opt), len(right_audio_opt)) |
| if min_len == 0: |
| raise ValueError("Processed audio is empty") |
| |
| left_audio_opt = left_audio_opt[:min_len] |
| right_audio_opt = right_audio_opt[:min_len] |
| |
| audio_opt = np.stack((left_audio_opt, right_audio_opt), axis=0) |
| |
| elif stereo_mode == "sim/dif": |
| if mid is None or left is None or right is None: |
| raise ValueError("Mid, left or right audio channel is None") |
| |
| mid_audio_opt = vc.pipeline( |
| hubert_model, |
| net_g, |
| 0, |
| mid, |
| input_path, |
| pitch, |
| f0_method, |
| index_path, |
| index_rate, |
| pitch_guidance, |
| filter_radius, |
| tgt_sr, |
| 0, |
| volume_envelope, |
| version, |
| protect, |
| hop_length, |
| f0_file=None, |
| f0_min=f0_min, |
| f0_max=f0_max, |
| ) |
| left_audio_opt = vc.pipeline( |
| hubert_model, |
| net_g, |
| 0, |
| left, |
| input_path, |
| pitch, |
| f0_method, |
| index_path, |
| index_rate, |
| pitch_guidance, |
| filter_radius, |
| tgt_sr, |
| 0, |
| volume_envelope, |
| version, |
| protect, |
| hop_length, |
| f0_file=None, |
| f0_min=f0_min, |
| f0_max=f0_max, |
| ) |
| right_audio_opt = vc.pipeline( |
| hubert_model, |
| net_g, |
| 0, |
| right, |
| input_path, |
| pitch, |
| f0_method, |
| index_path, |
| index_rate, |
| pitch_guidance, |
| filter_radius, |
| tgt_sr, |
| 0, |
| volume_envelope, |
| version, |
| protect, |
| hop_length, |
| f0_file=None, |
| f0_min=f0_min, |
| f0_max=f0_max, |
| ) |
| |
| # Ensure all channels have the same length |
| min_len = min(len(mid_audio_opt), len(left_audio_opt), len(right_audio_opt)) |
| if min_len == 0: |
| raise ValueError("Processed audio is empty") |
| |
| mid_audio_opt = mid_audio_opt[:min_len] |
| left_audio_opt = left_audio_opt[:min_len] |
| right_audio_opt = right_audio_opt[:min_len] |
| |
| dif_audio_opt = np.stack((left_audio_opt, right_audio_opt), axis=0) |
| |
| audio_opt = overlay_mono_on_stereo(mid_audio_opt, dif_audio_opt) |
| |
| write_audio_file(output_path, audio_opt, tgt_sr, format_output, output_bitrate) |
| return output_path |
| ''' |
|
|
| pipeline = ''' |
| import os |
| import gc |
| import torch |
| import torch.nn.functional as F |
| import torchcrepe |
| import faiss |
| import librosa |
| import numpy as np |
| from scipy import signal |
| |
| from vbach.lib.predictors.FCPE import FCPEF0Predictor |
| from vbach.lib.predictors.RMVPE import RMVPE0Predictor |
| |
| PREDICTORS_DIR = os.path.join(os.getcwd(), "vbach", "models", "predictors") |
| RMVPE_DIR = os.path.join(PREDICTORS_DIR, "rmvpe.pt") |
| FCPE_DIR = os.path.join(PREDICTORS_DIR, "fcpe.pt") |
| |
| # Фильтр Баттерворта для высоких частот |
| FILTER_ORDER = 5 # Порядок фильтра |
| CUTOFF_FREQUENCY = 48 # Частота среза (в Гц) |
| SAMPLE_RATE = 16000 # Частота дискретизации (в Гц) |
| bh, ah = signal.butter(N=FILTER_ORDER, Wn=CUTOFF_FREQUENCY, btype="high", fs=SAMPLE_RATE) |
| |
| |
| input_audio_path2wav = {} |
| |
| |
| # Класс для обработки аудио |
| class AudioProcessor: |
| @staticmethod |
| def change_rms(source_audio, source_rate, target_audio, target_rate, rate): |
| """ |
| Изменяет RMS (среднеквадратичное значение) аудио. |
| """ |
| rms1 = librosa.feature.rms( |
| y=source_audio, |
| frame_length=source_rate // 2 * 2, |
| hop_length=source_rate // 2, |
| ) |
| rms2 = librosa.feature.rms( |
| y=target_audio, |
| frame_length=target_rate // 2 * 2, |
| hop_length=target_rate // 2, |
| ) |
| |
| rms1 = F.interpolate( |
| torch.from_numpy(rms1).float().unsqueeze(0), |
| size=target_audio.shape[0], |
| mode="linear", |
| ).squeeze() |
| rms2 = F.interpolate( |
| torch.from_numpy(rms2).float().unsqueeze(0), |
| size=target_audio.shape[0], |
| mode="linear", |
| ).squeeze() |
| rms2 = torch.maximum(rms2, torch.zeros_like(rms2) + 1e-6) |
| |
| adjusted_audio = ( |
| target_audio * (torch.pow(rms1, 1 - rate) * torch.pow(rms2, rate - 1)).numpy() |
| ) |
| return adjusted_audio |
| |
| |
| # Класс для преобразования голоса |
| class VC: |
| def __init__(self, tgt_sr, config): |
| """ |
| Инициализация параметров для преобразования голоса. |
| """ |
| self.x_pad = config.x_pad |
| self.x_query = config.x_query |
| self.x_center = config.x_center |
| self.x_max = config.x_max |
| self.is_half = config.is_half |
| self.sample_rate = 16000 |
| self.window = 160 |
| self.t_pad = self.sample_rate * self.x_pad |
| self.t_pad_tgt = tgt_sr * self.x_pad |
| self.t_pad2 = self.t_pad * 2 |
| self.t_query = self.sample_rate * self.x_query |
| self.t_center = self.sample_rate * self.x_center |
| self.t_max = self.sample_rate * self.x_max |
| self.time_step = self.window / self.sample_rate * 1000 |
| self.device = config.device |
| |
| def get_f0_crepe(self, x, f0_min, f0_max, p_len, hop_length, model="full"): |
| """ |
| Получает F0 с использованием модели crepe. |
| """ |
| x = x.astype(np.float32) |
| x /= np.quantile(np.abs(x), 0.999) |
| audio = torch.from_numpy(x).to(self.device, copy=True).unsqueeze(0) |
| if audio.ndim == 2 and audio.shape[0] > 1: |
| audio = torch.mean(audio, dim=0, keepdim=True) |
| |
| pitch = torchcrepe.predict( |
| audio, |
| self.sample_rate, |
| hop_length, |
| f0_min, |
| f0_max, |
| model, |
| batch_size=hop_length * 2, |
| device=self.device, |
| pad=True, |
| ) |
| |
| p_len = p_len or x.shape[0] // hop_length |
| source = np.array(pitch.squeeze(0).cpu().float().numpy()) |
| source[source < 0.001] = np.nan |
| target = np.interp( |
| np.arange(0, len(source) * p_len, len(source)) / p_len, |
| np.arange(0, len(source)), |
| source, |
| ) |
| f0 = np.nan_to_num(target) |
| return f0 |
| |
| def get_f0_rmvpe(self, x, f0_min=1, f0_max=40000, *args, **kwargs): |
| """ |
| Получает F0 с использованием модели rmvpe. |
| """ |
| if not hasattr(self, "model_rmvpe"): |
| self.model_rmvpe = RMVPE0Predictor( |
| RMVPE_DIR, is_half=self.is_half, device=self.device |
| ) |
| f0 = self.model_rmvpe.infer_from_audio_with_pitch( |
| x, thred=0.03, f0_min=f0_min, f0_max=f0_max |
| ) |
| return f0 |
| |
| def get_f0( |
| self, |
| input_audio_path, |
| x, |
| p_len, |
| pitch, |
| f0_method, |
| filter_radius, |
| hop_length, |
| inp_f0=None, |
| f0_min=50, |
| f0_max=1100, |
| ): |
| """ |
| Получает F0 с использованием выбранного метода. |
| """ |
| global input_audio_path2wav |
| f0_mel_min = 1127 * np.log(1 + f0_min / 700) |
| f0_mel_max = 1127 * np.log(1 + f0_max / 700) |
| |
| if f0_method == "mangio-crepe": |
| f0 = self.get_f0_crepe(x, f0_min, f0_max, p_len, int(hop_length)) |
| |
| elif f0_method == "rmvpe+": |
| params = { |
| "x": x, |
| "p_len": p_len, |
| "pitch": pitch, |
| "f0_min": f0_min, |
| "f0_max": f0_max, |
| "time_step": self.time_step, |
| "filter_radius": filter_radius, |
| "crepe_hop_length": int(hop_length), |
| "model": "full", |
| } |
| f0 = self.get_f0_rmvpe(**params) |
| |
| elif f0_method == "fcpe": |
| self.model_fcpe = FCPEF0Predictor( |
| FCPE_DIR, |
| f0_min=int(f0_min), |
| f0_max=int(f0_max), |
| dtype=torch.float32, |
| device=self.device, |
| sample_rate=self.sample_rate, |
| threshold=0.03, |
| ) |
| f0 = self.model_fcpe.compute_f0(x, p_len=p_len) |
| del self.model_fcpe |
| gc.collect() |
| |
| f0 *= pow(2, pitch / 12) |
| tf0 = self.sample_rate // self.window |
| if inp_f0 is not None: |
| delta_t = np.round( |
| (inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1 |
| ).astype("int16") |
| replace_f0 = np.interp(list(range(delta_t)), inp_f0[:, 0] * 100, inp_f0[:, 1]) |
| shape = f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)].shape[0] |
| f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)] = replace_f0[:shape] |
| |
| f0bak = f0.copy() |
| f0_mel = 1127 * np.log(1 + f0 / 700) |
| f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / ( |
| f0_mel_max - f0_mel_min |
| ) + 1 |
| f0_mel[f0_mel <= 1] = 1 |
| f0_mel[f0_mel > 255] = 255 |
| f0_coarse = np.rint(f0_mel).astype(int) |
| return f0_coarse, f0bak |
| |
| def vc( |
| self, |
| model, |
| net_g, |
| sid, |
| audio0, |
| pitch, |
| pitchf, |
| index, |
| big_npy, |
| index_rate, |
| version, |
| protect, |
| ): |
| """ |
| Преобразует аудио с использованием модели. |
| """ |
| feats = torch.from_numpy(audio0) |
| feats = feats.half() if self.is_half else feats.float() |
| if feats.dim() == 2: |
| feats = feats.mean(-1) |
| assert feats.dim() == 1, feats.dim() |
| feats = feats.view(1, -1) |
| padding_mask = torch.BoolTensor(feats.shape).to(self.device).fill_(False) |
| |
| inputs = { |
| "source": feats.to(self.device), |
| "padding_mask": padding_mask, |
| "output_layer": 9 if version == "v1" else 12, |
| } |
| |
| with torch.no_grad(): |
| logits = model.extract_features(**inputs) |
| feats = model.final_proj(logits[0]) if version == "v1" else logits[0] |
| if protect < 0.5 and pitch is not None and pitchf is not None: |
| feats0 = feats.clone() |
| if index is not None and big_npy is not None and index_rate != 0: |
| npy = feats[0].cpu().numpy() |
| npy = npy.astype("float32") if self.is_half else npy |
| score, ix = index.search(npy, k=8) |
| weight = np.square(1 / score) |
| weight /= weight.sum(axis=1, keepdims=True) |
| npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1) |
| npy = npy.astype("float16") if self.is_half else npy |
| feats = ( |
| torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate |
| + (1 - index_rate) * feats |
| ) |
| |
| feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1) |
| if protect < 0.5 and pitch is not None and pitchf is not None: |
| feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute( |
| 0, 2, 1 |
| ) |
| p_len = audio0.shape[0] // self.window |
| if feats.shape[1] < p_len: |
| p_len = feats.shape[1] |
| if pitch is not None and pitchf is not None: |
| pitch = pitch[:, :p_len] |
| pitchf = pitchf[:, :p_len] |
| |
| if protect < 0.5 and pitch is not None and pitchf is not None: |
| pitchff = pitchf.clone() |
| pitchff[pitchf > 0] = 1 |
| pitchff[pitchf < 1] = protect |
| pitchff = pitchff.unsqueeze(-1) |
| feats = feats * pitchff + feats0 * (1 - pitchff) |
| feats = feats.to(feats0.dtype) |
| p_len = torch.tensor([p_len], device=self.device).long() |
| with torch.no_grad(): |
| if pitch is not None and pitchf is not None: |
| audio1 = ( |
| (net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0]) |
| .data.cpu() |
| .float() |
| .numpy() |
| ) |
| else: |
| audio1 = ( |
| (net_g.infer(feats, p_len, sid)[0][0, 0]).data.cpu().float().numpy() |
| ) |
| del feats, p_len, padding_mask |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| return audio1 |
| |
| def pipeline( |
| self, |
| model, |
| net_g, |
| sid, |
| audio, |
| input_audio_path, |
| pitch, |
| f0_method, |
| file_index, |
| index_rate, |
| pitch_guidance, |
| filter_radius, |
| tgt_sr, |
| resample_sr, |
| volume_envelope, |
| version, |
| protect, |
| hop_length, |
| f0_file, |
| f0_min=50, |
| f0_max=1100, |
| ): |
| """ |
| Основной конвейер для преобразования аудио. |
| """ |
| if ( |
| file_index is not None |
| and file_index != "" |
| and os.path.exists(file_index) |
| and index_rate != 0 |
| ): |
| try: |
| index = faiss.read_index(file_index) |
| big_npy = index.reconstruct_n(0, index.ntotal) |
| except Exception as e: |
| print(f"Произошла ошибка при чтении индекса FAISS: {e}") |
| index = big_npy = None |
| else: |
| index = big_npy = None |
| audio = signal.filtfilt(bh, ah, audio) |
| audio_pad = np.pad(audio, (self.window // 2, self.window // 2), mode="reflect") |
| opt_ts = [] |
| if audio_pad.shape[0] > self.t_max: |
| audio_sum = np.zeros_like(audio) |
| for i in range(self.window): |
| audio_sum += audio_pad[i : i - self.window] |
| for t in range(self.t_center, audio.shape[0], self.t_center): |
| opt_ts.append( |
| t |
| - self.t_query |
| + np.where( |
| np.abs(audio_sum[t - self.t_query : t + self.t_query]) |
| == np.abs(audio_sum[t - self.t_query : t + self.t_query]).min() |
| )[0][0] |
| ) |
| s = 0 |
| audio_opt = [] |
| t = None |
| audio_pad = np.pad(audio, (self.t_pad, self.t_pad), mode="reflect") |
| p_len = audio_pad.shape[0] // self.window |
| inp_f0 = None |
| if f0_file and hasattr(f0_file, "name"): |
| try: |
| with open(f0_file.name, "r") as f: |
| lines = f.read().strip("\\n").split("\\n") |
| inp_f0 = np.array( |
| [[float(i) for i in line.split(",")] for line in lines], |
| dtype="float32", |
| ) |
| except Exception as e: |
| print(f"Произошла ошибка при чтении файла F0: {e}") |
| sid = torch.tensor(sid, device=self.device).unsqueeze(0).long() |
| if pitch_guidance: |
| pitch, pitchf = self.get_f0( |
| input_audio_path, |
| audio_pad, |
| p_len, |
| pitch, |
| f0_method, |
| filter_radius, |
| hop_length, |
| inp_f0, |
| f0_min, |
| f0_max, |
| ) |
| pitch = pitch[:p_len] |
| pitchf = pitchf[:p_len] |
| if self.device == "mps": |
| pitchf = pitchf.astype(np.float32) |
| pitch = torch.tensor(pitch, device=self.device).unsqueeze(0).long() |
| pitchf = torch.tensor(pitchf, device=self.device).unsqueeze(0).float() |
| for t in opt_ts: |
| t = t // self.window * self.window |
| if pitch_guidance: |
| audio_opt.append( |
| self.vc( |
| model, |
| net_g, |
| sid, |
| audio_pad[s : t + self.t_pad2 + self.window], |
| pitch[:, s // self.window : (t + self.t_pad2) // self.window], |
| pitchf[:, s // self.window : (t + self.t_pad2) // self.window], |
| index, |
| big_npy, |
| index_rate, |
| version, |
| protect, |
| )[self.t_pad_tgt : -self.t_pad_tgt] |
| ) |
| else: |
| audio_opt.append( |
| self.vc( |
| model, |
| net_g, |
| sid, |
| audio_pad[s : t + self.t_pad2 + self.window], |
| None, |
| None, |
| index, |
| big_npy, |
| index_rate, |
| version, |
| protect, |
| )[self.t_pad_tgt : -self.t_pad_tgt] |
| ) |
| s = t |
| if pitch_guidance: |
| audio_opt.append( |
| self.vc( |
| model, |
| net_g, |
| sid, |
| audio_pad[t:], |
| pitch[:, t // self.window :] if t is not None else pitch, |
| pitchf[:, t // self.window :] if t is not None else pitchf, |
| index, |
| big_npy, |
| index_rate, |
| version, |
| protect, |
| )[self.t_pad_tgt : -self.t_pad_tgt] |
| ) |
| else: |
| audio_opt.append( |
| self.vc( |
| model, |
| net_g, |
| sid, |
| audio_pad[t:], |
| None, |
| None, |
| index, |
| big_npy, |
| index_rate, |
| version, |
| protect, |
| )[self.t_pad_tgt : -self.t_pad_tgt] |
| ) |
| |
| audio_opt = np.concatenate(audio_opt) |
| if volume_envelope != 1: |
| audio_opt = AudioProcessor.change_rms( |
| audio, self.sample_rate, audio_opt, tgt_sr, volume_envelope |
| ) |
| if resample_sr >= self.sample_rate and tgt_sr != resample_sr: |
| audio_opt = librosa.resample(audio_opt, orig_sr=tgt_sr, target_sr=resample_sr) |
| |
| audio_max = np.abs(audio_opt).max() / 0.99 |
| max_int16 = 32768 |
| if audio_max > 1: |
| max_int16 /= audio_max |
| audio_opt = (audio_opt * max_int16).astype(np.int16) |
| |
| del pitch, pitchf, sid |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| |
| return audio_opt |
| ''' |
|
|
| for path, text in [[os.sep.join([current_dir, dirs[3], "infer.py"]), inference], [os.sep.join([current_dir, dirs[3], "pipeline.py"]), pipeline]]: |
| with open(path, 'w') as f: |
| f.write(text) |
|
|
| remove_center = ''' |
| import numpy as np |
| from scipy import signal |
| |
| def remove_center(input_array, samplerate, rdf=0.99999, window_size=2048, overlap=2, window_type="blackman", stereo_mode="stereo"): |
| # Validate input |
| # if input_array.ndim != 2 or input_array.shape[1] != 2: |
| # raise ValueError("Input must be a stereo array with shape (samples, 2)") |
| |
| left = input_array[0] |
| right = input_array[1] |
| # mono = np.mean(input_array, axis=1) |
| |
| # Adjust window size if input is too short |
| nperseg = min(window_size, len(left)) |
| if nperseg < 16: # Minimum reasonable window size |
| nperseg = 16 |
| if len(left) < 16: |
| # For very short inputs, just return the original with warning |
| import warnings |
| warnings.warn(f"Input too short ({len(left)} samples), returning original audio") |
| return left, right, left, right |
| |
| noverlap = nperseg // overlap # Ensure noverlap < nperseg |
| if noverlap >= nperseg: |
| noverlap = nperseg - 1 # Ensure at least 1 sample difference |
| |
| # Compute STFT |
| f, t, Z_left = signal.stft(left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
| f, t, Z_right = signal.stft(right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
| # f, t, Z_mono = signal.stft(mono, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
| |
| if stereo_mode == "mono": |
| Z_common_left = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_mono)) |
| Z_common_right = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_mono)) |
| else: |
| Z_common_left = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_right)) |
| Z_common_right = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_left)) |
| |
| reduction_factor = rdf |
| |
| Z_new_left = Z_left - Z_common_left * reduction_factor |
| Z_new_right = Z_right - Z_common_right * reduction_factor |
| |
| # Compute ISTFT |
| _, new_left = signal.istft(Z_new_left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
| _, new_right = signal.istft(Z_new_right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
| _, common_signal_left = signal.istft(Z_common_left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
| _, common_signal_right = signal.istft(Z_common_right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
| |
| # Trim to original length |
| new_left = new_left[:len(left)] |
| new_right = new_right[:len(right)] |
| common_signal_left = common_signal_left[:len(left)] |
| common_signal_right = common_signal_right[:len(left)] |
| |
| # Normalize |
| peak = np.max([np.abs(new_left).max(), np.abs(new_right).max()]) |
| if peak > 1.0: |
| new_left = new_left / peak |
| new_right = new_right / peak |
| |
| inverted_center_left = -common_signal_left |
| inverted_center_right = -common_signal_right |
| |
| mixed_left = left + inverted_center_left |
| mixed_right = right + inverted_center_right |
| |
| peak_mixed = np.max([np.abs(mixed_left).max(), np.abs(mixed_right).max()]) |
| if peak_mixed > 1.0: |
| mixed_left = mixed_left / peak_mixed |
| mixed_right = mixed_right / peak_mixed |
| |
| return common_signal_left, common_signal_right, new_left, new_right |
| ''' |
| |
| for path, text in [[os.sep.join([current_dir, dirs[11], "remove_center.py"]), remove_center]]: |
| with open(path, 'w') as f: |
| f.write(text) |
| |
| lib_algorithm = { |
| "synthesizers" : ["synthesizers.py", ''' |
| import torch |
| from torch import nn |
| from torch.nn.utils.weight_norm import remove_weight_norm |
| from typing import Optional |
| |
| from .commons import slice_segments, rand_slice_segments |
| from .encoders import TextEncoder, PosteriorEncoder |
| from .generators import Generator |
| from .nsf import GeneratorNSF |
| from .residuals import ResidualCouplingBlock |
| |
| |
| class Synthesizer(nn.Module): |
| def __init__( |
| self, |
| spec_channels, |
| segment_size, |
| inter_channels, |
| hidden_channels, |
| filter_channels, |
| n_heads, |
| n_layers, |
| kernel_size, |
| p_dropout, |
| resblock, |
| resblock_kernel_sizes, |
| resblock_dilation_sizes, |
| upsample_rates, |
| upsample_initial_channel, |
| upsample_kernel_sizes, |
| spk_embed_dim, |
| gin_channels, |
| sr, |
| use_f0, |
| input_dim=768, |
| **kwargs |
| ): |
| super(Synthesizer, self).__init__() |
| self.spec_channels = spec_channels |
| self.inter_channels = inter_channels |
| self.hidden_channels = hidden_channels |
| self.filter_channels = filter_channels |
| self.n_heads = n_heads |
| self.n_layers = n_layers |
| self.kernel_size = kernel_size |
| self.p_dropout = float(p_dropout) |
| self.resblock = resblock |
| self.resblock_kernel_sizes = resblock_kernel_sizes |
| self.resblock_dilation_sizes = resblock_dilation_sizes |
| self.upsample_rates = upsample_rates |
| self.upsample_initial_channel = upsample_initial_channel |
| self.upsample_kernel_sizes = upsample_kernel_sizes |
| self.segment_size = segment_size |
| self.gin_channels = gin_channels |
| self.spk_embed_dim = spk_embed_dim |
| self.use_f0 = use_f0 |
| |
| self.enc_p = TextEncoder( |
| inter_channels, |
| hidden_channels, |
| filter_channels, |
| n_heads, |
| n_layers, |
| kernel_size, |
| float(p_dropout), |
| input_dim, |
| f0=use_f0, |
| ) |
| |
| if use_f0: |
| self.dec = GeneratorNSF( |
| inter_channels, |
| resblock, |
| resblock_kernel_sizes, |
| resblock_dilation_sizes, |
| upsample_rates, |
| upsample_initial_channel, |
| upsample_kernel_sizes, |
| gin_channels=gin_channels, |
| sr=sr, |
| is_half=kwargs["is_half"], |
| ) |
| else: |
| self.dec = Generator( |
| inter_channels, |
| resblock, |
| resblock_kernel_sizes, |
| resblock_dilation_sizes, |
| upsample_rates, |
| upsample_initial_channel, |
| upsample_kernel_sizes, |
| gin_channels=gin_channels, |
| ) |
| |
| self.enc_q = PosteriorEncoder( |
| spec_channels, |
| inter_channels, |
| hidden_channels, |
| 5, |
| 1, |
| 16, |
| gin_channels=gin_channels, |
| ) |
| self.flow = ResidualCouplingBlock( |
| inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels |
| ) |
| self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels) |
| |
| def remove_weight_norm(self): |
| self.dec.remove_weight_norm() |
| self.flow.remove_weight_norm() |
| self.enc_q.remove_weight_norm() |
| |
| def __prepare_scriptable__(self): |
| for hook in self.dec._forward_pre_hooks.values(): |
| if ( |
| hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
| and hook.__class__.__name__ == "_WeightNorm" |
| ): |
| remove_weight_norm(self.dec) |
| for hook in self.flow._forward_pre_hooks.values(): |
| if ( |
| hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
| and hook.__class__.__name__ == "_WeightNorm" |
| ): |
| remove_weight_norm(self.flow) |
| if hasattr(self, "enc_q"): |
| for hook in self.enc_q._forward_pre_hooks.values(): |
| if ( |
| hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
| and hook.__class__.__name__ == "_WeightNorm" |
| ): |
| remove_weight_norm(self.enc_q) |
| return self |
| |
| @torch.jit.ignore |
| def forward( |
| self, |
| phone: torch.Tensor, |
| phone_lengths: torch.Tensor, |
| pitch: Optional[torch.Tensor] = None, |
| pitchf: Optional[torch.Tensor] = None, |
| y: torch.Tensor = None, |
| y_lengths: torch.Tensor = None, |
| ds: Optional[torch.Tensor] = None, |
| ): |
| g = self.emb_g(ds).unsqueeze(-1) |
| m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) |
| if y is not None: |
| z, m_q, logs_q, y_mask = self.enc_q(y, y_lengths, g=g) |
| z_p = self.flow(z, y_mask, g=g) |
| z_slice, ids_slice = rand_slice_segments(z, y_lengths, self.segment_size) |
| if self.use_f0: |
| pitchf = slice_segments(pitchf, ids_slice, self.segment_size, 2) |
| o = self.dec(z_slice, pitchf, g=g) |
| else: |
| o = self.dec(z_slice, g=g) |
| return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q) |
| else: |
| return None, None, x_mask, None, (None, None, m_p, logs_p, None, None) |
| |
| @torch.jit.export |
| def infer( |
| self, |
| phone: torch.Tensor, |
| phone_lengths: torch.Tensor, |
| pitch: Optional[torch.Tensor] = None, |
| nsff0: Optional[torch.Tensor] = None, |
| sid: torch.Tensor = None, |
| rate: Optional[torch.Tensor] = None, |
| ): |
| g = self.emb_g(sid).unsqueeze(-1) |
| m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) |
| z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask |
| if rate is not None: |
| assert isinstance(rate, torch.Tensor) |
| head = int(z_p.shape[2] * (1.0 - rate.item())) |
| z_p = z_p[:, :, head:] |
| x_mask = x_mask[:, :, head:] |
| if self.use_f0: |
| nsff0 = nsff0[:, head:] |
| if self.use_f0: |
| z = self.flow(z_p, x_mask, g=g, reverse=True) |
| o = self.dec(z * x_mask, nsff0, g=g) |
| else: |
| z = self.flow(z_p, x_mask, g=g, reverse=True) |
| o = self.dec(z * x_mask, g=g) |
| return o, x_mask, (z, z_p, m_p, logs_p) |
| |
| '''], |
| "residuals" : ["residuals.py", ''' |
| import torch |
| from torch import nn |
| from torch.nn import functional as F |
| from torch.nn.utils.weight_norm import remove_weight_norm |
| from torch.nn.utils.parametrizations import weight_norm |
| from typing import Optional |
| |
| from .commons import get_padding, init_weights |
| from .modules import WaveNet |
| |
| |
| LRELU_SLOPE = 0.1 |
| |
| |
| def create_conv1d_layer(channels, kernel_size, dilation): |
| return weight_norm( |
| nn.Conv1d( |
| channels, |
| channels, |
| kernel_size, |
| 1, |
| dilation=dilation, |
| padding=get_padding(kernel_size, dilation), |
| ) |
| ) |
| |
| |
| def apply_mask(tensor, mask): |
| return tensor * mask if mask is not None else tensor |
| |
| |
| class ResBlockBase(nn.Module): |
| def __init__(self, channels, kernel_size, dilations): |
| super(ResBlockBase, self).__init__() |
| self.convs1 = nn.ModuleList( |
| [create_conv1d_layer(channels, kernel_size, d) for d in dilations] |
| ) |
| self.convs1.apply(init_weights) |
| |
| self.convs2 = nn.ModuleList( |
| [create_conv1d_layer(channels, kernel_size, 1) for _ in dilations] |
| ) |
| self.convs2.apply(init_weights) |
| |
| def forward(self, x, x_mask=None): |
| for c1, c2 in zip(self.convs1, self.convs2): |
| xt = F.leaky_relu(x, LRELU_SLOPE) |
| xt = apply_mask(xt, x_mask) |
| xt = F.leaky_relu(c1(xt), LRELU_SLOPE) |
| xt = apply_mask(xt, x_mask) |
| xt = c2(xt) |
| x = xt + x |
| return apply_mask(x, x_mask) |
| |
| def remove_weight_norm(self): |
| for conv in self.convs1 + self.convs2: |
| remove_weight_norm(conv) |
| |
| |
| class ResBlock1(ResBlockBase): |
| def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)): |
| super(ResBlock1, self).__init__(channels, kernel_size, dilation) |
| |
| |
| class ResBlock2(ResBlockBase): |
| def __init__(self, channels, kernel_size=3, dilation=(1, 3)): |
| super(ResBlock2, self).__init__(channels, kernel_size, dilation) |
| |
| |
| class Log(nn.Module): |
| def forward(self, x, x_mask, reverse=False, **kwargs): |
| if not reverse: |
| y = torch.log(torch.clamp_min(x, 1e-5)) * x_mask |
| logdet = torch.sum(-y, [1, 2]) |
| return y, logdet |
| else: |
| x = torch.exp(x) * x_mask |
| return x |
| |
| |
| class Flip(nn.Module): |
| def forward(self, x, *args, reverse=False, **kwargs): |
| x = torch.flip(x, [1]) |
| if not reverse: |
| logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device) |
| return x, logdet |
| else: |
| return x |
| |
| |
| class ElementwiseAffine(nn.Module): |
| def __init__(self, channels): |
| super().__init__() |
| self.channels = channels |
| self.m = nn.Parameter(torch.zeros(channels, 1)) |
| self.logs = nn.Parameter(torch.zeros(channels, 1)) |
| |
| def forward(self, x, x_mask, reverse=False, **kwargs): |
| if not reverse: |
| y = self.m + torch.exp(self.logs) * x |
| y = y * x_mask |
| logdet = torch.sum(self.logs * x_mask, [1, 2]) |
| return y, logdet |
| else: |
| x = (x - self.m) * torch.exp(-self.logs) * x_mask |
| return x |
| |
| |
| class ResidualCouplingBlock(nn.Module): |
| def __init__( |
| self, |
| channels, |
| hidden_channels, |
| kernel_size, |
| dilation_rate, |
| n_layers, |
| n_flows=4, |
| gin_channels=0, |
| ): |
| super(ResidualCouplingBlock, self).__init__() |
| self.channels = channels |
| self.hidden_channels = hidden_channels |
| self.kernel_size = kernel_size |
| self.dilation_rate = dilation_rate |
| self.n_layers = n_layers |
| self.n_flows = n_flows |
| self.gin_channels = gin_channels |
| |
| self.flows = nn.ModuleList() |
| for i in range(n_flows): |
| self.flows.append( |
| ResidualCouplingLayer( |
| channels, |
| hidden_channels, |
| kernel_size, |
| dilation_rate, |
| n_layers, |
| gin_channels=gin_channels, |
| mean_only=True, |
| ) |
| ) |
| self.flows.append(Flip()) |
| |
| def forward( |
| self, |
| x: torch.Tensor, |
| x_mask: torch.Tensor, |
| g: Optional[torch.Tensor] = None, |
| reverse: bool = False, |
| ): |
| if not reverse: |
| for flow in self.flows: |
| x, _ = flow(x, x_mask, g=g, reverse=reverse) |
| else: |
| for flow in reversed(self.flows): |
| x = flow.forward(x, x_mask, g=g, reverse=reverse) |
| return x |
| |
| def remove_weight_norm(self): |
| for i in range(self.n_flows): |
| self.flows[i * 2].remove_weight_norm() |
| |
| def __prepare_scriptable__(self): |
| for i in range(self.n_flows): |
| for hook in self.flows[i * 2]._forward_pre_hooks.values(): |
| if ( |
| hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
| and hook.__class__.__name__ == "_WeightNorm" |
| ): |
| remove_weight_norm(self.flows[i * 2]) |
| |
| return self |
| |
| |
| class ResidualCouplingLayer(nn.Module): |
| def __init__( |
| self, |
| channels, |
| hidden_channels, |
| kernel_size, |
| dilation_rate, |
| n_layers, |
| p_dropout=0, |
| gin_channels=0, |
| mean_only=False, |
| ): |
| assert channels % 2 == 0, "channels should be divisible by 2" |
| super().__init__() |
| self.channels = channels |
| self.hidden_channels = hidden_channels |
| self.kernel_size = kernel_size |
| self.dilation_rate = dilation_rate |
| self.n_layers = n_layers |
| self.half_channels = channels // 2 |
| self.mean_only = mean_only |
| |
| self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1) |
| self.enc = WaveNet( |
| hidden_channels, |
| kernel_size, |
| dilation_rate, |
| n_layers, |
| p_dropout=p_dropout, |
| gin_channels=gin_channels, |
| ) |
| self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1) |
| self.post.weight.data.zero_() |
| self.post.bias.data.zero_() |
| |
| def forward(self, x, x_mask, g=None, reverse=False): |
| x0, x1 = torch.split(x, [self.half_channels] * 2, 1) |
| h = self.pre(x0) * x_mask |
| h = self.enc(h, x_mask, g=g) |
| stats = self.post(h) * x_mask |
| if not self.mean_only: |
| m, logs = torch.split(stats, [self.half_channels] * 2, 1) |
| else: |
| m = stats |
| logs = torch.zeros_like(m) |
| |
| if not reverse: |
| x1 = m + x1 * torch.exp(logs) * x_mask |
| x = torch.cat([x0, x1], 1) |
| logdet = torch.sum(logs, [1, 2]) |
| return x, logdet |
| else: |
| x1 = (x1 - m) * torch.exp(-logs) * x_mask |
| x = torch.cat([x0, x1], 1) |
| return x |
| |
| def remove_weight_norm(self): |
| self.enc.remove_weight_norm() |
| |
| '''], |
| "nsf" : ["nsf.py", ''' |
| import math |
| import torch |
| from torch import nn |
| from torch.nn import functional as F |
| from torch.nn.utils.weight_norm import remove_weight_norm |
| from torch.nn.utils.parametrizations import weight_norm |
| from typing import Optional |
| |
| from .commons import init_weights |
| from .generators import SineGen |
| from .residuals import LRELU_SLOPE, ResBlock1, ResBlock2 |
| |
| |
| class SourceModuleHnNSF(nn.Module): |
| def __init__( |
| self, |
| sample_rate, |
| harmonic_num=0, |
| sine_amp=0.1, |
| add_noise_std=0.003, |
| voiced_threshod=0, |
| is_half=True, |
| ): |
| super(SourceModuleHnNSF, self).__init__() |
| |
| self.sine_amp = sine_amp |
| self.noise_std = add_noise_std |
| self.is_half = is_half |
| |
| self.l_sin_gen = SineGen( |
| sample_rate, harmonic_num, sine_amp, add_noise_std, voiced_threshod |
| ) |
| self.l_linear = nn.Linear(harmonic_num + 1, 1) |
| self.l_tanh = nn.Tanh() |
| |
| def forward(self, x: torch.Tensor, upsample_factor: int = 1): |
| sine_wavs, uv, _ = self.l_sin_gen(x, upsample_factor) |
| sine_wavs = sine_wavs.to(dtype=self.l_linear.weight.dtype) |
| sine_merge = self.l_tanh(self.l_linear(sine_wavs)) |
| return sine_merge, None, None |
| |
| |
| class GeneratorNSF(nn.Module): |
| def __init__( |
| self, |
| initial_channel, |
| resblock, |
| resblock_kernel_sizes, |
| resblock_dilation_sizes, |
| upsample_rates, |
| upsample_initial_channel, |
| upsample_kernel_sizes, |
| gin_channels, |
| sr, |
| is_half=False, |
| ): |
| super(GeneratorNSF, self).__init__() |
| |
| self.num_kernels = len(resblock_kernel_sizes) |
| self.num_upsamples = len(upsample_rates) |
| self.f0_upsamp = nn.Upsample(scale_factor=math.prod(upsample_rates)) |
| self.m_source = SourceModuleHnNSF(sample_rate=sr, harmonic_num=0, is_half=is_half) |
| |
| self.conv_pre = nn.Conv1d( |
| initial_channel, upsample_initial_channel, 7, 1, padding=3 |
| ) |
| resblock_cls = ResBlock1 if resblock == "1" else ResBlock2 |
| |
| self.ups = nn.ModuleList() |
| self.noise_convs = nn.ModuleList() |
| |
| channels = [ |
| upsample_initial_channel // (2 ** (i + 1)) for i in range(len(upsample_rates)) |
| ] |
| stride_f0s = [ |
| math.prod(upsample_rates[i + 1 :]) if i + 1 < len(upsample_rates) else 1 |
| for i in range(len(upsample_rates)) |
| ] |
| |
| for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): |
| self.ups.append( |
| weight_norm( |
| nn.ConvTranspose1d( |
| upsample_initial_channel // (2**i), |
| channels[i], |
| k, |
| u, |
| padding=(k - u) // 2, |
| ) |
| ) |
| ) |
| |
| self.noise_convs.append( |
| nn.Conv1d( |
| 1, |
| channels[i], |
| kernel_size=(stride_f0s[i] * 2 if stride_f0s[i] > 1 else 1), |
| stride=stride_f0s[i], |
| padding=(stride_f0s[i] // 2 if stride_f0s[i] > 1 else 0), |
| ) |
| ) |
| |
| self.resblocks = nn.ModuleList( |
| [ |
| resblock_cls(channels[i], k, d) |
| for i in range(len(self.ups)) |
| for k, d in zip(resblock_kernel_sizes, resblock_dilation_sizes) |
| ] |
| ) |
| |
| self.conv_post = nn.Conv1d(channels[-1], 1, 7, 1, padding=3, bias=False) |
| self.ups.apply(init_weights) |
| |
| if gin_channels != 0: |
| self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1) |
| |
| self.upp = math.prod(upsample_rates) |
| self.lrelu_slope = LRELU_SLOPE |
| |
| def forward(self, x, f0, g: Optional[torch.Tensor] = None): |
| har_source, _, _ = self.m_source(f0, self.upp) |
| har_source = har_source.transpose(1, 2) |
| x = self.conv_pre(x) |
| |
| if g is not None: |
| x = x + self.cond(g) |
| |
| for i, (ups, noise_convs) in enumerate(zip(self.ups, self.noise_convs)): |
| x = F.leaky_relu(x, self.lrelu_slope) |
| x = ups(x) |
| x = x + noise_convs(har_source) |
| |
| xs = sum( |
| [ |
| resblock(x) |
| for j, resblock in enumerate(self.resblocks) |
| if j in range(i * self.num_kernels, (i + 1) * self.num_kernels) |
| ] |
| ) |
| x = xs / self.num_kernels |
| |
| x = F.leaky_relu(x) |
| x = torch.tanh(self.conv_post(x)) |
| return x |
| |
| def remove_weight_norm(self): |
| for l in self.ups: |
| remove_weight_norm(l) |
| for l in self.resblocks: |
| l.remove_weight_norm() |
| |
| def __prepare_scriptable__(self): |
| for l in self.ups: |
| for hook in l._forward_pre_hooks.values(): |
| if ( |
| hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
| and hook.__class__.__name__ == "_WeightNorm" |
| ): |
| remove_weight_norm(l) |
| for l in self.resblocks: |
| for hook in l._forward_pre_hooks.values(): |
| if ( |
| hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
| and hook.__class__.__name__ == "_WeightNorm" |
| ): |
| remove_weight_norm(l) |
| return self |
| |
| '''], |
| "normalization" : ["normalization.py", ''' |
| import torch |
| from torch import nn |
| from torch.nn import functional as F |
| |
| |
| class LayerNorm(nn.Module): |
| def __init__(self, channels, eps=1e-5): |
| super().__init__() |
| self.eps = eps |
| self.gamma = nn.Parameter(torch.ones(channels)) |
| self.beta = nn.Parameter(torch.zeros(channels)) |
| |
| def forward(self, x): |
| x = x.transpose(1, -1) |
| x = F.layer_norm(x, (x.size(-1),), self.gamma, self.beta, self.eps) |
| return x.transpose(1, -1) |
| '''], |
| "modules" : ["modules.py", ''' |
| import torch |
| from torch import nn |
| from torch.nn.utils.weight_norm import remove_weight_norm |
| from torch.nn.utils.parametrizations import weight_norm |
| |
| from .commons import fused_add_tanh_sigmoid_multiply |
| |
| |
| class WaveNet(nn.Module): |
| def __init__( |
| self, |
| hidden_channels, |
| kernel_size, |
| dilation_rate, |
| n_layers, |
| gin_channels=0, |
| p_dropout=0, |
| ): |
| super(WaveNet, self).__init__() |
| assert kernel_size % 2 == 1 |
| self.hidden_channels = hidden_channels |
| self.kernel_size = (kernel_size,) |
| self.dilation_rate = dilation_rate |
| self.n_layers = n_layers |
| self.gin_channels = gin_channels |
| self.p_dropout = p_dropout |
| |
| self.in_layers = nn.ModuleList() |
| self.res_skip_layers = nn.ModuleList() |
| self.drop = nn.Dropout(p_dropout) |
| |
| if gin_channels != 0: |
| cond_layer = nn.Conv1d(gin_channels, 2 * hidden_channels * n_layers, 1) |
| self.cond_layer = weight_norm(cond_layer, name="weight") |
| |
| dilations = [dilation_rate**i for i in range(n_layers)] |
| paddings = [(kernel_size * d - d) // 2 for d in dilations] |
| |
| for i in range(n_layers): |
| in_layer = nn.Conv1d( |
| hidden_channels, |
| 2 * hidden_channels, |
| kernel_size, |
| dilation=dilations[i], |
| padding=paddings[i], |
| ) |
| in_layer = weight_norm(in_layer, name="weight") |
| self.in_layers.append(in_layer) |
| |
| res_skip_channels = ( |
| hidden_channels if i == n_layers - 1 else 2 * hidden_channels |
| ) |
| |
| res_skip_layer = nn.Conv1d(hidden_channels, res_skip_channels, 1) |
| res_skip_layer = weight_norm(res_skip_layer, name="weight") |
| self.res_skip_layers.append(res_skip_layer) |
| |
| def forward(self, x, x_mask, g=None, **kwargs): |
| output = torch.zeros_like(x) |
| n_channels_tensor = torch.IntTensor([self.hidden_channels]) |
| |
| if g is not None: |
| g = self.cond_layer(g) |
| |
| for i in range(self.n_layers): |
| x_in = self.in_layers[i](x) |
| if g is not None: |
| cond_offset = i * 2 * self.hidden_channels |
| g_l = g[:, cond_offset : cond_offset + 2 * self.hidden_channels, :] |
| else: |
| g_l = torch.zeros_like(x_in) |
| |
| acts = fused_add_tanh_sigmoid_multiply(x_in, g_l, n_channels_tensor) |
| |
| acts = self.drop(acts) |
| |
| res_skip_acts = self.res_skip_layers[i](acts) |
| if i < self.n_layers - 1: |
| res_acts = res_skip_acts[:, : self.hidden_channels, :] |
| x = (x + res_acts) * x_mask |
| output = output + res_skip_acts[:, self.hidden_channels :, :] |
| else: |
| output = output + res_skip_acts |
| return output * x_mask |
| |
| def remove_weight_norm(self): |
| if self.gin_channels != 0: |
| remove_weight_norm(self.cond_layer) |
| for l in self.in_layers: |
| remove_weight_norm(l) |
| for l in self.res_skip_layers: |
| remove_weight_norm(l) |
| |
| '''], |
| "generators" : ["generators.py", ''' |
| import torch |
| from torch import nn |
| from torch.nn import functional as F |
| from torch.nn.utils.weight_norm import remove_weight_norm |
| from torch.nn.utils.parametrizations import weight_norm |
| from typing import Optional |
| |
| from .commons import init_weights |
| from .residuals import LRELU_SLOPE, ResBlock1, ResBlock2 |
| |
| |
| class Generator(nn.Module): |
| def __init__( |
| self, |
| initial_channel, |
| resblock, |
| resblock_kernel_sizes, |
| resblock_dilation_sizes, |
| upsample_rates, |
| upsample_initial_channel, |
| upsample_kernel_sizes, |
| gin_channels=0, |
| ): |
| super(Generator, self).__init__() |
| self.num_kernels = len(resblock_kernel_sizes) |
| self.num_upsamples = len(upsample_rates) |
| self.conv_pre = nn.Conv1d( |
| initial_channel, upsample_initial_channel, 7, 1, padding=3 |
| ) |
| resblock = ResBlock1 if resblock == "1" else ResBlock2 |
| |
| self.ups_and_resblocks = nn.ModuleList() |
| for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): |
| self.ups_and_resblocks.append( |
| weight_norm( |
| nn.ConvTranspose1d( |
| upsample_initial_channel // (2**i), |
| upsample_initial_channel // (2 ** (i + 1)), |
| k, |
| u, |
| padding=(k - u) // 2, |
| ) |
| ) |
| ) |
| ch = upsample_initial_channel // (2 ** (i + 1)) |
| for j, (k, d) in enumerate( |
| zip(resblock_kernel_sizes, resblock_dilation_sizes) |
| ): |
| self.ups_and_resblocks.append(resblock(ch, k, d)) |
| |
| self.conv_post = nn.Conv1d(ch, 1, 7, 1, padding=3, bias=False) |
| self.ups_and_resblocks.apply(init_weights) |
| |
| if gin_channels != 0: |
| self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1) |
| |
| def forward(self, x: torch.Tensor, g: Optional[torch.Tensor] = None): |
| x = self.conv_pre(x) |
| if g is not None: |
| x = x + self.cond(g) |
| |
| resblock_idx = 0 |
| for _ in range(self.num_upsamples): |
| x = F.leaky_relu(x, LRELU_SLOPE) |
| x = self.ups_and_resblocks[resblock_idx](x) |
| resblock_idx += 1 |
| xs = 0 |
| for _ in range(self.num_kernels): |
| xs += self.ups_and_resblocks[resblock_idx](x) |
| resblock_idx += 1 |
| x = xs / self.num_kernels |
| |
| x = F.leaky_relu(x) |
| x = self.conv_post(x) |
| x = torch.tanh(x) |
| |
| return x |
| |
| def __prepare_scriptable__(self): |
| for l in self.ups_and_resblocks: |
| for hook in l._forward_pre_hooks.values(): |
| if ( |
| hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
| and hook.__class__.__name__ == "_WeightNorm" |
| ): |
| remove_weight_norm(l) |
| return self |
| |
| def remove_weight_norm(self): |
| for l in self.ups_and_resblocks: |
| remove_weight_norm(l) |
| |
| |
| class SineGen(nn.Module): |
| def __init__( |
| self, |
| samp_rate, |
| harmonic_num=0, |
| sine_amp=0.1, |
| noise_std=0.003, |
| voiced_threshold=0, |
| flag_for_pulse=False, |
| ): |
| super(SineGen, self).__init__() |
| self.sine_amp = sine_amp |
| self.noise_std = noise_std |
| self.harmonic_num = harmonic_num |
| self.dim = self.harmonic_num + 1 |
| self.sample_rate = samp_rate |
| self.voiced_threshold = voiced_threshold |
| |
| def _f02uv(self, f0): |
| uv = torch.ones_like(f0) |
| uv = uv * (f0 > self.voiced_threshold) |
| return uv |
| |
| def forward(self, f0: torch.Tensor, upp: int): |
| with torch.no_grad(): |
| f0 = f0[:, None].transpose(1, 2) |
| f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim, device=f0.device) |
| f0_buf[:, :, 0] = f0[:, :, 0] |
| f0_buf[:, :, 1:] = ( |
| f0_buf[:, :, 0:1] |
| * torch.arange(2, self.harmonic_num + 2, device=f0.device)[None, None, :] |
| ) |
| rad_values = (f0_buf / float(self.sample_rate)) % 1 |
| rand_ini = torch.rand(f0_buf.shape[0], f0_buf.shape[2], device=f0_buf.device) |
| rand_ini[:, 0] = 0 |
| rad_values[:, 0, :] = rad_values[:, 0, :] + rand_ini |
| tmp_over_one = torch.cumsum(rad_values, 1) |
| tmp_over_one *= upp |
| tmp_over_one = F.interpolate( |
| tmp_over_one.transpose(2, 1), |
| scale_factor=float(upp), |
| mode="linear", |
| align_corners=True, |
| ).transpose(2, 1) |
| rad_values = F.interpolate( |
| rad_values.transpose(2, 1), scale_factor=float(upp), mode="nearest" |
| ).transpose(2, 1) |
| tmp_over_one %= 1 |
| tmp_over_one_idx = (tmp_over_one[:, 1:, :] - tmp_over_one[:, :-1, :]) < 0 |
| cumsum_shift = torch.zeros_like(rad_values) |
| cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0 |
| sine_waves = torch.sin( |
| torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * torch.pi |
| ) |
| sine_waves = sine_waves * self.sine_amp |
| uv = self._f02uv(f0) |
| uv = F.interpolate( |
| uv.transpose(2, 1), scale_factor=float(upp), mode="nearest" |
| ).transpose(2, 1) |
| noise_amp = uv * self.noise_std + (1 - uv) * self.sine_amp / 3 |
| noise = noise_amp * torch.randn_like(sine_waves) |
| sine_waves = sine_waves * uv + noise |
| return sine_waves, uv, noise |
| |
| '''], |
| "encoders" : ["encoders.py", ''' |
| import math |
| import torch |
| from torch import nn |
| from torch.nn.utils.weight_norm import remove_weight_norm |
| from typing import Optional |
| |
| from .attentions import FFN, MultiHeadAttention |
| from .commons import sequence_mask |
| from .modules import WaveNet |
| from .normalization import LayerNorm |
| |
| |
| class Encoder(nn.Module): |
| def __init__( |
| self, |
| hidden_channels, |
| filter_channels, |
| n_heads, |
| n_layers, |
| kernel_size=1, |
| p_dropout=0.0, |
| window_size=10, |
| **kwargs |
| ): |
| super().__init__() |
| self.hidden_channels = hidden_channels |
| self.filter_channels = filter_channels |
| self.n_heads = n_heads |
| self.n_layers = n_layers |
| self.kernel_size = kernel_size |
| self.p_dropout = p_dropout |
| self.window_size = window_size |
| |
| self.drop = nn.Dropout(p_dropout) |
| self.attn_layers = nn.ModuleList() |
| self.norm_layers_1 = nn.ModuleList() |
| self.ffn_layers = nn.ModuleList() |
| self.norm_layers_2 = nn.ModuleList() |
| for i in range(self.n_layers): |
| self.attn_layers.append( |
| MultiHeadAttention( |
| hidden_channels, |
| hidden_channels, |
| n_heads, |
| p_dropout=p_dropout, |
| window_size=window_size, |
| ) |
| ) |
| self.norm_layers_1.append(LayerNorm(hidden_channels)) |
| self.ffn_layers.append( |
| FFN( |
| hidden_channels, |
| hidden_channels, |
| filter_channels, |
| kernel_size, |
| p_dropout=p_dropout, |
| ) |
| ) |
| self.norm_layers_2.append(LayerNorm(hidden_channels)) |
| |
| def forward(self, x, x_mask): |
| attn_mask = x_mask.unsqueeze(2) * x_mask.unsqueeze(-1) |
| x = x * x_mask |
| for i in range(self.n_layers): |
| y = self.attn_layers[i](x, x, attn_mask) |
| y = self.drop(y) |
| x = self.norm_layers_1[i](x + y) |
| |
| y = self.ffn_layers[i](x, x_mask) |
| y = self.drop(y) |
| x = self.norm_layers_2[i](x + y) |
| x = x * x_mask |
| return x |
| |
| |
| class TextEncoder(nn.Module): |
| def __init__( |
| self, |
| out_channels, |
| hidden_channels, |
| filter_channels, |
| n_heads, |
| n_layers, |
| kernel_size, |
| p_dropout, |
| embedding_dim, |
| f0=True, |
| ): |
| super(TextEncoder, self).__init__() |
| self.out_channels = out_channels |
| self.hidden_channels = hidden_channels |
| self.filter_channels = filter_channels |
| self.n_heads = n_heads |
| self.n_layers = n_layers |
| self.kernel_size = kernel_size |
| self.p_dropout = float(p_dropout) |
| self.emb_phone = nn.Linear(embedding_dim, hidden_channels) |
| self.lrelu = nn.LeakyReLU(0.1, inplace=True) |
| if f0: |
| self.emb_pitch = nn.Embedding(256, hidden_channels) |
| self.encoder = Encoder( |
| hidden_channels, |
| filter_channels, |
| n_heads, |
| n_layers, |
| kernel_size, |
| float(p_dropout), |
| ) |
| self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) |
| |
| def forward( |
| self, phone: torch.Tensor, pitch: Optional[torch.Tensor], lengths: torch.Tensor |
| ): |
| if pitch is None: |
| x = self.emb_phone(phone) |
| else: |
| x = self.emb_phone(phone) + self.emb_pitch(pitch) |
| x = x * math.sqrt(self.hidden_channels) |
| x = self.lrelu(x) |
| x = torch.transpose(x, 1, -1) |
| x_mask = torch.unsqueeze(sequence_mask(lengths, x.size(2)), 1).to(x.dtype) |
| x = self.encoder(x * x_mask, x_mask) |
| stats = self.proj(x) * x_mask |
| |
| m, logs = torch.split(stats, self.out_channels, dim=1) |
| return m, logs, x_mask |
| |
| |
| class PosteriorEncoder(nn.Module): |
| def __init__( |
| self, |
| in_channels, |
| out_channels, |
| hidden_channels, |
| kernel_size, |
| dilation_rate, |
| n_layers, |
| gin_channels=0, |
| ): |
| super(PosteriorEncoder, self).__init__() |
| self.in_channels = in_channels |
| self.out_channels = out_channels |
| self.hidden_channels = hidden_channels |
| self.kernel_size = kernel_size |
| self.dilation_rate = dilation_rate |
| self.n_layers = n_layers |
| self.gin_channels = gin_channels |
| |
| self.pre = nn.Conv1d(in_channels, hidden_channels, 1) |
| self.enc = WaveNet( |
| hidden_channels, |
| kernel_size, |
| dilation_rate, |
| n_layers, |
| gin_channels=gin_channels, |
| ) |
| self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) |
| |
| def forward( |
| self, x: torch.Tensor, x_lengths: torch.Tensor, g: Optional[torch.Tensor] = None |
| ): |
| x_mask = torch.unsqueeze(sequence_mask(x_lengths, x.size(2)), 1).to(x.dtype) |
| x = self.pre(x) * x_mask |
| x = self.enc(x, x_mask, g=g) |
| stats = self.proj(x) * x_mask |
| m, logs = torch.split(stats, self.out_channels, dim=1) |
| z = (m + torch.randn_like(m) * torch.exp(logs)) * x_mask |
| return z, m, logs, x_mask |
| |
| def remove_weight_norm(self): |
| self.enc.remove_weight_norm() |
| |
| def __prepare_scriptable__(self): |
| for hook in self.enc._forward_pre_hooks.values(): |
| if ( |
| hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
| and hook.__class__.__name__ == "_WeightNorm" |
| ): |
| remove_weight_norm(self.enc) |
| return self |
| |
| '''], |
| "discriminators" : ["discriminators.py", ''' |
| import torch |
| from torch import nn |
| from torch.nn import functional as F |
| from torch.nn.utils.parametrizations import spectral_norm, weight_norm |
| |
| from .commons import get_padding |
| from .residuals import LRELU_SLOPE |
| |
| |
| PERIODS_V1 = [2, 3, 5, 7, 11, 17] |
| PERIODS_V2 = [2, 3, 5, 7, 11, 17, 23, 37] |
| IN_CHANNELS = [1, 32, 128, 512, 1024] |
| OUT_CHANNELS = [32, 128, 512, 1024, 1024] |
| |
| |
| class MultiPeriodDiscriminator(nn.Module): |
| def __init__(self, use_spectral_norm=False): |
| super(MultiPeriodDiscriminator, self).__init__() |
| self.discriminators = nn.ModuleList( |
| [DiscriminatorS(use_spectral_norm=use_spectral_norm)] |
| + [DiscriminatorP(p, use_spectral_norm=use_spectral_norm) for p in PERIODS_V1] |
| ) |
| |
| def forward(self, y, y_hat): |
| y_d_rs, y_d_gs, fmap_rs, fmap_gs = [], [], [], [] |
| for d in self.discriminators: |
| y_d_r, fmap_r = d(y) |
| y_d_g, fmap_g = d(y_hat) |
| y_d_rs.append(y_d_r) |
| y_d_gs.append(y_d_g) |
| fmap_rs.append(fmap_r) |
| fmap_gs.append(fmap_g) |
| |
| return y_d_rs, y_d_gs, fmap_rs, fmap_gs |
| |
| |
| class MultiPeriodDiscriminatorV2(nn.Module): |
| def __init__(self, use_spectral_norm=False): |
| super(MultiPeriodDiscriminatorV2, self).__init__() |
| self.discriminators = nn.ModuleList( |
| [DiscriminatorS(use_spectral_norm=use_spectral_norm)] |
| + [DiscriminatorP(p, use_spectral_norm=use_spectral_norm) for p in PERIODS_V2] |
| ) |
| |
| def forward(self, y, y_hat): |
| y_d_rs, y_d_gs, fmap_rs, fmap_gs = [], [], [], [] |
| for d in self.discriminators: |
| y_d_r, fmap_r = d(y) |
| y_d_g, fmap_g = d(y_hat) |
| y_d_rs.append(y_d_r) |
| y_d_gs.append(y_d_g) |
| fmap_rs.append(fmap_r) |
| fmap_gs.append(fmap_g) |
| |
| return y_d_rs, y_d_gs, fmap_rs, fmap_gs |
| |
| |
| class DiscriminatorS(nn.Module): |
| def __init__(self, use_spectral_norm=False): |
| super(DiscriminatorS, self).__init__() |
| norm_f = spectral_norm if use_spectral_norm else weight_norm |
| self.convs = nn.ModuleList( |
| [ |
| norm_f(nn.Conv1d(1, 16, 15, 1, padding=7)), |
| norm_f(nn.Conv1d(16, 64, 41, 4, groups=4, padding=20)), |
| norm_f(nn.Conv1d(64, 256, 41, 4, groups=16, padding=20)), |
| norm_f(nn.Conv1d(256, 1024, 41, 4, groups=64, padding=20)), |
| norm_f(nn.Conv1d(1024, 1024, 41, 4, groups=256, padding=20)), |
| norm_f(nn.Conv1d(1024, 1024, 5, 1, padding=2)), |
| ] |
| ) |
| self.conv_post = norm_f(nn.Conv1d(1024, 1, 3, 1, padding=1)) |
| self.lrelu = nn.LeakyReLU(LRELU_SLOPE) |
| |
| def forward(self, x): |
| fmap = [] |
| for conv in self.convs: |
| x = self.lrelu(conv(x)) |
| fmap.append(x) |
| x = self.conv_post(x) |
| fmap.append(x) |
| x = torch.flatten(x, 1, -1) |
| return x, fmap |
| |
| |
| class DiscriminatorP(nn.Module): |
| def __init__(self, period, kernel_size=5, stride=3, use_spectral_norm=False): |
| super(DiscriminatorP, self).__init__() |
| self.period = period |
| norm_f = spectral_norm if use_spectral_norm else weight_norm |
| |
| self.convs = nn.ModuleList( |
| [ |
| norm_f( |
| nn.Conv2d( |
| in_ch, |
| out_ch, |
| (kernel_size, 1), |
| (stride, 1), |
| padding=(get_padding(kernel_size, 1), 0), |
| ) |
| ) |
| for in_ch, out_ch in zip(IN_CHANNELS, OUT_CHANNELS) |
| ] |
| ) |
| |
| self.conv_post = norm_f(nn.Conv2d(1024, 1, (3, 1), 1, padding=(1, 0))) |
| self.lrelu = nn.LeakyReLU(LRELU_SLOPE) |
| |
| def forward(self, x): |
| fmap = [] |
| b, c, t = x.shape |
| if t % self.period != 0: |
| n_pad = self.period - (t % self.period) |
| x = F.pad(x, (0, n_pad), "reflect") |
| x = x.view(b, c, -1, self.period) |
| |
| for conv in self.convs: |
| x = self.lrelu(conv(x)) |
| fmap.append(x) |
| |
| x = self.conv_post(x) |
| fmap.append(x) |
| x = torch.flatten(x, 1, -1) |
| return x, fmap |
| |
| '''], |
| "commons" : ["commons.py", ''' |
| import math |
| import torch |
| from torch.nn import functional as F |
| from typing import List, Optional |
| |
| |
| def init_weights(m, mean=0.0, std=0.01): |
| classname = m.__class__.__name__ |
| if classname.find("Conv") != -1: |
| m.weight.data.normal_(mean, std) |
| |
| |
| def get_padding(kernel_size, dilation=1): |
| return int((kernel_size * dilation - dilation) / 2) |
| |
| |
| def convert_pad_shape(pad_shape): |
| l = pad_shape[::-1] |
| pad_shape = [item for sublist in l for item in sublist] |
| return pad_shape |
| |
| |
| def kl_divergence(m_p, logs_p, m_q, logs_q): |
| kl = (logs_q - logs_p) - 0.5 |
| kl += 0.5 * (torch.exp(2.0 * logs_p) + ((m_p - m_q) ** 2)) * torch.exp(-2.0 * logs_q) |
| return kl |
| |
| |
| def slice_segments( |
| x: torch.Tensor, ids_str: torch.Tensor, segment_size: int = 4, dim: int = 2 |
| ): |
| if dim == 2: |
| ret = torch.zeros_like(x[:, :segment_size]) |
| elif dim == 3: |
| ret = torch.zeros_like(x[:, :, :segment_size]) |
| |
| for i in range(x.size(0)): |
| idx_str = ids_str[i].item() |
| idx_end = idx_str + segment_size |
| if dim == 2: |
| ret[i] = x[i, idx_str:idx_end] |
| else: |
| ret[i] = x[i, :, idx_str:idx_end] |
| |
| return ret |
| |
| |
| def rand_slice_segments(x, x_lengths=None, segment_size=4): |
| b, d, t = x.size() |
| if x_lengths is None: |
| x_lengths = t |
| ids_str_max = x_lengths - segment_size + 1 |
| ids_str = (torch.rand([b]).to(device=x.device) * ids_str_max).to(dtype=torch.long) |
| ret = slice_segments(x, ids_str, segment_size, dim=3) |
| return ret, ids_str |
| |
| |
| def get_timing_signal_1d(length, channels, min_timescale=1.0, max_timescale=1.0e4): |
| position = torch.arange(length, dtype=torch.float) |
| num_timescales = channels // 2 |
| log_timescale_increment = math.log(float(max_timescale) / float(min_timescale)) / ( |
| num_timescales - 1 |
| ) |
| inv_timescales = min_timescale * torch.exp( |
| torch.arange(num_timescales, dtype=torch.float) * -log_timescale_increment |
| ) |
| scaled_time = position.unsqueeze(0) * inv_timescales.unsqueeze(1) |
| signal = torch.cat([torch.sin(scaled_time), torch.cos(scaled_time)], 0) |
| signal = F.pad(signal, [0, 0, 0, channels % 2]) |
| signal = signal.view(1, channels, length) |
| return signal |
| |
| |
| def subsequent_mask(length): |
| mask = torch.tril(torch.ones(length, length)).unsqueeze(0).unsqueeze(0) |
| return mask |
| |
| |
| @torch.jit.script |
| def fused_add_tanh_sigmoid_multiply(input_a, input_b, n_channels): |
| n_channels_int = n_channels[0] |
| in_act = input_a + input_b |
| t_act = torch.tanh(in_act[:, :n_channels_int, :]) |
| s_act = torch.sigmoid(in_act[:, n_channels_int:, :]) |
| acts = t_act * s_act |
| return acts |
| |
| |
| def sequence_mask(length: torch.Tensor, max_length: Optional[int] = None): |
| if max_length is None: |
| max_length = length.max() |
| x = torch.arange(max_length, dtype=length.dtype, device=length.device) |
| return x.unsqueeze(0) < length.unsqueeze(1) |
| |
| |
| def clip_grad_value(parameters, clip_value, norm_type=2): |
| if isinstance(parameters, torch.Tensor): |
| parameters = [parameters] |
| parameters = List(filter(lambda p: p.grad is not None, parameters)) |
| norm_type = float(norm_type) |
| if clip_value is not None: |
| clip_value = float(clip_value) |
| |
| total_norm = 0 |
| for p in parameters: |
| param_norm = p.grad.data.norm(norm_type) |
| total_norm += param_norm.item() ** norm_type |
| if clip_value is not None: |
| p.grad.data.clamp_(min=-clip_value, max=clip_value) |
| total_norm = total_norm ** (1.0 / norm_type) |
| return total_norm |
| |
| '''], |
| "attentions" : ["attentions.py", ''' |
| import math |
| import torch |
| from torch import nn |
| from torch.nn import functional as F |
| |
| from .commons import convert_pad_shape |
| |
| |
| class MultiHeadAttention(nn.Module): |
| def __init__( |
| self, |
| channels, |
| out_channels, |
| n_heads, |
| p_dropout=0.0, |
| window_size=None, |
| heads_share=True, |
| block_length=None, |
| proximal_bias=False, |
| proximal_init=False, |
| ): |
| super().__init__() |
| assert channels % n_heads == 0 |
| |
| self.channels = channels |
| self.out_channels = out_channels |
| self.n_heads = n_heads |
| self.p_dropout = p_dropout |
| self.window_size = window_size |
| self.heads_share = heads_share |
| self.block_length = block_length |
| self.proximal_bias = proximal_bias |
| self.proximal_init = proximal_init |
| self.attn = None |
| |
| self.k_channels = channels // n_heads |
| self.conv_q = nn.Conv1d(channels, channels, 1) |
| self.conv_k = nn.Conv1d(channels, channels, 1) |
| self.conv_v = nn.Conv1d(channels, channels, 1) |
| self.conv_o = nn.Conv1d(channels, out_channels, 1) |
| self.drop = nn.Dropout(p_dropout) |
| |
| if window_size is not None: |
| n_heads_rel = 1 if heads_share else n_heads |
| rel_stddev = self.k_channels**-0.5 |
| self.emb_rel_k = nn.Parameter( |
| torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels) |
| * rel_stddev |
| ) |
| self.emb_rel_v = nn.Parameter( |
| torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels) |
| * rel_stddev |
| ) |
| |
| nn.init.xavier_uniform_(self.conv_q.weight) |
| nn.init.xavier_uniform_(self.conv_k.weight) |
| nn.init.xavier_uniform_(self.conv_v.weight) |
| if proximal_init: |
| with torch.no_grad(): |
| self.conv_k.weight.copy_(self.conv_q.weight) |
| self.conv_k.bias.copy_(self.conv_q.bias) |
| |
| def forward(self, x, c, attn_mask=None): |
| q = self.conv_q(x) |
| k = self.conv_k(c) |
| v = self.conv_v(c) |
| |
| x, self.attn = self.attention(q, k, v, mask=attn_mask) |
| |
| x = self.conv_o(x) |
| return x |
| |
| def attention(self, query, key, value, mask=None): |
| b, d, t_s, t_t = (*key.size(), query.size(2)) |
| query = query.view(b, self.n_heads, self.k_channels, t_t).transpose(2, 3) |
| key = key.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3) |
| value = value.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3) |
| |
| scores = torch.matmul(query / math.sqrt(self.k_channels), key.transpose(-2, -1)) |
| if self.window_size is not None: |
| assert t_s == t_t, "Relative attention is only available for self-attention." |
| key_relative_embeddings = self._get_relative_embeddings(self.emb_rel_k, t_s) |
| rel_logits = self._matmul_with_relative_keys( |
| query / math.sqrt(self.k_channels), key_relative_embeddings |
| ) |
| scores_local = self._relative_position_to_absolute_position(rel_logits) |
| scores = scores + scores_local |
| if self.proximal_bias: |
| assert t_s == t_t, "Proximal bias is only available for self-attention." |
| scores = scores + self._attention_bias_proximal(t_s).to( |
| device=scores.device, dtype=scores.dtype |
| ) |
| if mask is not None: |
| scores = scores.masked_fill(mask == 0, -1e4) |
| if self.block_length is not None: |
| assert t_s == t_t, "Local attention is only available for self-attention." |
| block_mask = ( |
| torch.ones_like(scores) |
| .triu(-self.block_length) |
| .tril(self.block_length) |
| ) |
| scores = scores.masked_fill(block_mask == 0, -1e4) |
| p_attn = F.softmax(scores, dim=-1) |
| p_attn = self.drop(p_attn) |
| output = torch.matmul(p_attn, value) |
| if self.window_size is not None: |
| relative_weights = self._absolute_position_to_relative_position(p_attn) |
| value_relative_embeddings = self._get_relative_embeddings(self.emb_rel_v, t_s) |
| output = output + self._matmul_with_relative_values( |
| relative_weights, value_relative_embeddings |
| ) |
| output = output.transpose(2, 3).contiguous().view(b, d, t_t) |
| return output, p_attn |
| |
| def _matmul_with_relative_values(self, x, y): |
| ret = torch.matmul(x, y.unsqueeze(0)) |
| return ret |
| |
| def _matmul_with_relative_keys(self, x, y): |
| ret = torch.matmul(x, y.unsqueeze(0).transpose(-2, -1)) |
| return ret |
| |
| def _get_relative_embeddings(self, relative_embeddings, length): |
| pad_length = max(length - (self.window_size + 1), 0) |
| slice_start_position = max((self.window_size + 1) - length, 0) |
| slice_end_position = slice_start_position + 2 * length - 1 |
| if pad_length > 0: |
| padded_relative_embeddings = F.pad( |
| relative_embeddings, |
| convert_pad_shape([[0, 0], [pad_length, pad_length], [0, 0]]), |
| ) |
| else: |
| padded_relative_embeddings = relative_embeddings |
| used_relative_embeddings = padded_relative_embeddings[ |
| :, slice_start_position:slice_end_position |
| ] |
| return used_relative_embeddings |
| |
| def _relative_position_to_absolute_position(self, x): |
| batch, heads, length, _ = x.size() |
| |
| x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, 1]])) |
| |
| x_flat = x.view([batch, heads, length * 2 * length]) |
| x_flat = F.pad(x_flat, convert_pad_shape([[0, 0], [0, 0], [0, length - 1]])) |
| |
| x_final = x_flat.view([batch, heads, length + 1, 2 * length - 1])[ |
| :, :, :length, length - 1 : |
| ] |
| return x_final |
| |
| def _absolute_position_to_relative_position(self, x): |
| batch, heads, length, _ = x.size() |
| x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, length - 1]])) |
| x_flat = x.view([batch, heads, length**2 + length * (length - 1)]) |
| x_flat = F.pad(x_flat, convert_pad_shape([[0, 0], [0, 0], [length, 0]])) |
| x_final = x_flat.view([batch, heads, length, 2 * length])[:, :, :, 1:] |
| return x_final |
| |
| def _attention_bias_proximal(self, length): |
| r = torch.arange(length, dtype=torch.float32) |
| diff = torch.unsqueeze(r, 0) - torch.unsqueeze(r, 1) |
| return torch.unsqueeze(torch.unsqueeze(-torch.log1p(torch.abs(diff)), 0), 0) |
| |
| |
| class FFN(nn.Module): |
| def __init__( |
| self, |
| in_channels, |
| out_channels, |
| filter_channels, |
| kernel_size, |
| p_dropout=0.0, |
| activation=None, |
| causal=False, |
| ): |
| super().__init__() |
| self.in_channels = in_channels |
| self.out_channels = out_channels |
| self.filter_channels = filter_channels |
| self.kernel_size = kernel_size |
| self.p_dropout = p_dropout |
| self.activation = activation |
| self.causal = causal |
| |
| if causal: |
| self.padding = self._causal_padding |
| else: |
| self.padding = self._same_padding |
| |
| self.conv_1 = nn.Conv1d(in_channels, filter_channels, kernel_size) |
| self.conv_2 = nn.Conv1d(filter_channels, out_channels, kernel_size) |
| self.drop = nn.Dropout(p_dropout) |
| |
| def forward(self, x, x_mask): |
| x = self.conv_1(self.padding(x * x_mask)) |
| if self.activation == "gelu": |
| x = x * torch.sigmoid(1.702 * x) |
| else: |
| x = torch.relu(x) |
| x = self.drop(x) |
| x = self.conv_2(self.padding(x * x_mask)) |
| return x * x_mask |
| |
| def _causal_padding(self, x): |
| if self.kernel_size == 1: |
| return x |
| pad_l = self.kernel_size - 1 |
| pad_r = 0 |
| padding = [[0, 0], [0, 0], [pad_l, pad_r]] |
| x = F.pad(x, convert_pad_shape(padding)) |
| return x |
| |
| def _same_padding(self, x): |
| if self.kernel_size == 1: |
| return x |
| pad_l = (self.kernel_size - 1) // 2 |
| pad_r = self.kernel_size // 2 |
| padding = [[0, 0], [0, 0], [pad_l, pad_r]] |
| x = F.pad(x, convert_pad_shape(padding)) |
| return x |
| |
| '''], |
| "init" : ["__init__.py", ''' |
| '''] |
| } |
|
|
| with open(os.sep.join([current_dir, dirs[5], lib_algorithm["synthesizers"][0]]), 'w') as f: |
| f.write(lib_algorithm["synthesizers"][1]) |
| with open(os.sep.join([current_dir, dirs[5], lib_algorithm["residuals"][0]]), 'w') as f: |
| f.write(lib_algorithm["residuals"][1]) |
| with open(os.sep.join([current_dir, dirs[5], lib_algorithm["nsf"][0]]), 'w') as f: |
| f.write(lib_algorithm["nsf"][1]) |
| with open(os.sep.join([current_dir, dirs[5], lib_algorithm["normalization"][0]]), 'w') as f: |
| f.write(lib_algorithm["normalization"][1]) |
| with open(os.sep.join([current_dir, dirs[5], lib_algorithm["modules"][0]]), 'w') as f: |
| f.write(lib_algorithm["modules"][1]) |
| with open(os.sep.join([current_dir, dirs[5], lib_algorithm["generators"][0]]), 'w') as f: |
| f.write(lib_algorithm["generators"][1]) |
| with open(os.sep.join([current_dir, dirs[5], lib_algorithm["encoders"][0]]), 'w') as f: |
| f.write(lib_algorithm["encoders"][1]) |
| with open(os.sep.join([current_dir, dirs[5], lib_algorithm["discriminators"][0]]), 'w') as f: |
| f.write(lib_algorithm["discriminators"][1]) |
| with open(os.sep.join([current_dir, dirs[5], lib_algorithm["commons"][0]]), 'w') as f: |
| f.write(lib_algorithm["commons"][1]) |
| with open(os.sep.join([current_dir, dirs[5], lib_algorithm["attentions"][0]]), 'w') as f: |
| f.write(lib_algorithm["attentions"][1]) |
| with open(os.sep.join([current_dir, dirs[5], lib_algorithm["init"][0]]), 'w') as f: |
| f.write(lib_algorithm["init"][1]) |
|
|
| RMVPE = ''' |
| import torch |
| import numpy as np |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from librosa.filters import mel |
| from scipy.signal import get_window |
| from librosa.util import pad_center, tiny, normalize |
| |
| |
| def window_sumsquare( |
| window, |
| n_frames, |
| hop_length=200, |
| win_length=800, |
| n_fft=800, |
| dtype=np.float32, |
| norm=None, |
| ): |
| if win_length is None: |
| win_length = n_fft |
| |
| n = n_fft + hop_length * (n_frames - 1) |
| x = np.zeros(n, dtype=dtype) |
| |
| win_sq = get_window(window, win_length, fftbins=True) |
| win_sq = normalize(win_sq, norm=norm) ** 2 |
| win_sq = pad_center(win_sq, n_fft) |
| |
| for i in range(n_frames): |
| sample = i * hop_length |
| x[sample : min(n, sample + n_fft)] += win_sq[: max(0, min(n_fft, n - sample))] |
| return x |
| |
| |
| class STFT(nn.Module): |
| def __init__( |
| self, filter_length=1024, hop_length=512, win_length=None, window="hann" |
| ): |
| super(STFT, self).__init__() |
| self.filter_length = filter_length |
| self.hop_length = hop_length |
| self.win_length = win_length if win_length else filter_length |
| self.window = window |
| self.pad_amount = int(self.filter_length / 2) |
| scale = self.filter_length / self.hop_length |
| fourier_basis = np.fft.fft(np.eye(self.filter_length)) |
| |
| cutoff = int((self.filter_length / 2 + 1)) |
| fourier_basis = np.vstack( |
| [np.real(fourier_basis[:cutoff, :]), np.imag(fourier_basis[:cutoff, :])] |
| ) |
| forward_basis = torch.FloatTensor(fourier_basis[:, None, :]) |
| inverse_basis = torch.FloatTensor( |
| np.linalg.pinv(scale * fourier_basis).T[:, None, :] |
| ) |
| |
| assert filter_length >= self.win_length |
| fft_window = get_window(window, self.win_length, fftbins=True) |
| fft_window = pad_center(fft_window, size=filter_length) |
| fft_window = torch.from_numpy(fft_window).float() |
| |
| forward_basis *= fft_window |
| inverse_basis *= fft_window |
| |
| self.register_buffer("forward_basis", forward_basis.float()) |
| self.register_buffer("inverse_basis", inverse_basis.float()) |
| |
| def transform(self, input_data): |
| num_batches = input_data.shape[0] |
| num_samples = input_data.shape[-1] |
| |
| input_data = input_data.view(num_batches, 1, num_samples) |
| input_data = F.pad( |
| input_data.unsqueeze(1), |
| (self.pad_amount, self.pad_amount, 0, 0, 0, 0), |
| mode="reflect", |
| ).squeeze(1) |
| forward_transform = F.conv1d( |
| input_data, self.forward_basis, stride=self.hop_length, padding=0 |
| ) |
| |
| cutoff = int((self.filter_length / 2) + 1) |
| real_part = forward_transform[:, :cutoff, :] |
| imag_part = forward_transform[:, cutoff:, :] |
| return torch.sqrt(real_part**2 + imag_part**2) |
| |
| def inverse(self, magnitude, phase): |
| recombine_magnitude_phase = torch.cat( |
| [magnitude * torch.cos(phase), magnitude * torch.sin(phase)], dim=1 |
| ) |
| inverse_transform = F.conv_transpose1d( |
| recombine_magnitude_phase, |
| self.inverse_basis, |
| stride=self.hop_length, |
| padding=0, |
| ) |
| |
| if self.window is not None: |
| window_sum = window_sumsquare( |
| self.window, |
| magnitude.size(-1), |
| hop_length=self.hop_length, |
| win_length=self.win_length, |
| n_fft=self.filter_length, |
| dtype=np.float32, |
| ) |
| approx_nonzero_indices = torch.from_numpy( |
| np.where(window_sum > tiny(window_sum))[0] |
| ) |
| window_sum = torch.from_numpy(window_sum).to(inverse_transform.device) |
| inverse_transform[:, :, approx_nonzero_indices] /= window_sum[ |
| approx_nonzero_indices |
| ] |
| inverse_transform *= float(self.filter_length) / self.hop_length |
| |
| inverse_transform = inverse_transform[..., self.pad_amount :] |
| inverse_transform = inverse_transform[..., : self.num_samples] |
| return inverse_transform.squeeze(1) |
| |
| def forward(self, input_data): |
| self.magnitude, self.phase = self.transform(input_data) |
| return self.inverse(self.magnitude, self.phase) |
| |
| |
| class BiGRU(nn.Module): |
| def __init__(self, input_features, hidden_features, num_layers): |
| super(BiGRU, self).__init__() |
| self.gru = nn.GRU( |
| input_features, |
| hidden_features, |
| num_layers=num_layers, |
| batch_first=True, |
| bidirectional=True, |
| ) |
| |
| def forward(self, x): |
| return self.gru(x)[0] |
| |
| |
| class ConvBlockRes(nn.Module): |
| def __init__(self, in_channels, out_channels, momentum=0.01): |
| super(ConvBlockRes, self).__init__() |
| self.conv = nn.Sequential( |
| nn.Conv2d( |
| in_channels=in_channels, |
| out_channels=out_channels, |
| kernel_size=(3, 3), |
| stride=(1, 1), |
| padding=(1, 1), |
| bias=False, |
| ), |
| nn.BatchNorm2d(out_channels, momentum=momentum), |
| nn.ReLU(), |
| nn.Conv2d( |
| in_channels=out_channels, |
| out_channels=out_channels, |
| kernel_size=(3, 3), |
| stride=(1, 1), |
| padding=(1, 1), |
| bias=False, |
| ), |
| nn.BatchNorm2d(out_channels, momentum=momentum), |
| nn.ReLU(), |
| ) |
| self.shortcut = ( |
| nn.Conv2d(in_channels, out_channels, (1, 1)) |
| if in_channels != out_channels |
| else None |
| ) |
| |
| def forward(self, x): |
| out = self.conv(x) |
| if self.shortcut is not None: |
| x = self.shortcut(x) |
| return out + x |
| |
| |
| class ResEncoderBlock(nn.Module): |
| def __init__(self, in_channels, out_channels, kernel_size, n_blocks=1, momentum=0.01): |
| super(ResEncoderBlock, self).__init__() |
| self.conv = nn.ModuleList( |
| [ |
| ConvBlockRes( |
| in_channels if i == 0 else out_channels, out_channels, momentum |
| ) |
| for i in range(n_blocks) |
| ] |
| ) |
| self.pool = ( |
| nn.AvgPool2d(kernel_size=kernel_size) if kernel_size is not None else None |
| ) |
| |
| def forward(self, x): |
| for conv in self.conv: |
| x = conv(x) |
| pooled = self.pool(x) if self.pool is not None else x |
| return pooled, x |
| |
| |
| class Encoder(nn.Module): |
| def __init__( |
| self, |
| in_channels, |
| in_size, |
| n_encoders, |
| kernel_size, |
| n_blocks, |
| out_channels=16, |
| momentum=0.01, |
| ): |
| super(Encoder, self).__init__() |
| self.bn = nn.BatchNorm2d(in_channels, momentum=momentum) |
| self.layers = nn.ModuleList() |
| self.latent_channels = [] |
| for _ in range(n_encoders): |
| self.layers.append( |
| ResEncoderBlock( |
| in_channels, out_channels, kernel_size, n_blocks, momentum=momentum |
| ) |
| ) |
| self.latent_channels.append([out_channels, in_size]) |
| in_channels = out_channels |
| out_channels *= 2 |
| in_size //= 2 |
| self.out_size = in_size |
| self.out_channel = out_channels |
| |
| def forward(self, x): |
| concat_tensors = [] |
| x = self.bn(x) |
| for layer in self.layers: |
| x, pooled = layer(x) |
| concat_tensors.append(pooled) |
| return x, concat_tensors |
| |
| |
| class Intermediate(nn.Module): |
| def __init__(self, in_channels, out_channels, n_inters, n_blocks, momentum=0.01): |
| super(Intermediate, self).__init__() |
| self.layers = nn.ModuleList( |
| [ |
| ResEncoderBlock( |
| in_channels if i == 0 else out_channels, |
| out_channels, |
| None, |
| n_blocks, |
| momentum, |
| ) |
| for i in range(n_inters) |
| ] |
| ) |
| |
| def forward(self, x): |
| for layer in self.layers: |
| _, x = layer(x) |
| return x |
| |
| |
| class ResDecoderBlock(nn.Module): |
| def __init__(self, in_channels, out_channels, stride, n_blocks=1, momentum=0.01): |
| super(ResDecoderBlock, self).__init__() |
| out_padding = (0, 1) if stride == (1, 2) else (1, 1) |
| self.conv1 = nn.Sequential( |
| nn.ConvTranspose2d( |
| in_channels=in_channels, |
| out_channels=out_channels, |
| kernel_size=(3, 3), |
| stride=stride, |
| padding=(1, 1), |
| output_padding=out_padding, |
| bias=False, |
| ), |
| nn.BatchNorm2d(out_channels, momentum=momentum), |
| nn.ReLU(), |
| ) |
| self.conv2 = nn.ModuleList( |
| [ |
| ConvBlockRes( |
| out_channels * 2 if i == 0 else out_channels, out_channels, momentum |
| ) |
| for i in range(n_blocks) |
| ] |
| ) |
| |
| def forward(self, x, concat_tensor): |
| x = self.conv1(x) |
| x = torch.cat((x, concat_tensor), dim=1) |
| for conv in self.conv2: |
| x = conv(x) |
| return x |
| |
| |
| class Decoder(nn.Module): |
| def __init__(self, in_channels, n_decoders, stride, n_blocks, momentum=0.01): |
| super(Decoder, self).__init__() |
| self.layers = nn.ModuleList() |
| for _ in range(n_decoders): |
| out_channels = in_channels // 2 |
| self.layers.append( |
| ResDecoderBlock(in_channels, out_channels, stride, n_blocks, momentum) |
| ) |
| in_channels = out_channels |
| |
| def forward(self, x, concat_tensors): |
| for layer, concat_tensor in zip(self.layers, reversed(concat_tensors)): |
| x = layer(x, concat_tensor) |
| return x |
| |
| |
| class DeepUnet(nn.Module): |
| def __init__( |
| self, |
| kernel_size, |
| n_blocks, |
| en_de_layers=5, |
| inter_layers=4, |
| in_channels=1, |
| en_out_channels=16, |
| ): |
| super(DeepUnet, self).__init__() |
| self.encoder = Encoder( |
| in_channels, 128, en_de_layers, kernel_size, n_blocks, en_out_channels |
| ) |
| self.intermediate = Intermediate( |
| self.encoder.out_channel // 2, |
| self.encoder.out_channel, |
| inter_layers, |
| n_blocks, |
| ) |
| self.decoder = Decoder( |
| self.encoder.out_channel, en_de_layers, kernel_size, n_blocks |
| ) |
| |
| def forward(self, x): |
| x, concat_tensors = self.encoder(x) |
| x = self.intermediate(x) |
| return self.decoder(x, concat_tensors) |
| |
| |
| class E2E(nn.Module): |
| def __init__( |
| self, |
| n_blocks, |
| n_gru, |
| kernel_size, |
| en_de_layers=5, |
| inter_layers=4, |
| in_channels=1, |
| en_out_channels=16, |
| ): |
| super(E2E, self).__init__() |
| self.unet = DeepUnet( |
| kernel_size, |
| n_blocks, |
| en_de_layers, |
| inter_layers, |
| in_channels, |
| en_out_channels, |
| ) |
| self.cnn = nn.Conv2d(en_out_channels, 3, (3, 3), padding=(1, 1)) |
| if n_gru: |
| self.fc = nn.Sequential( |
| BiGRU(3 * 128, 256, n_gru), |
| nn.Linear(512, 360), |
| nn.Dropout(0.25), |
| nn.Sigmoid(), |
| ) |
| else: |
| self.fc = nn.Sequential( |
| nn.Linear(3 * nn.N_MELS, nn.N_CLASS), nn.Dropout(0.25), nn.Sigmoid() |
| ) |
| |
| def forward(self, mel): |
| mel = mel.transpose(-1, -2).unsqueeze(1) |
| x = self.cnn(self.unet(mel)).transpose(1, 2).flatten(-2) |
| return self.fc(x) |
| |
| |
| class MelSpectrogram(nn.Module): |
| def __init__( |
| self, |
| is_half, |
| n_mel_channels, |
| sample_rate, |
| win_length, |
| hop_length, |
| n_fft=None, |
| mel_fmin=0, |
| mel_fmax=None, |
| clamp=1e-5, |
| ): |
| super(MelSpectrogram, self).__init__() |
| n_fft = win_length if n_fft is None else n_fft |
| self.hann_window = {} |
| mel_basis = mel( |
| sr=sample_rate, |
| n_fft=n_fft, |
| n_mels=n_mel_channels, |
| fmin=mel_fmin, |
| fmax=mel_fmax, |
| htk=True, |
| ) |
| self.register_buffer("mel_basis", torch.from_numpy(mel_basis).float()) |
| self.n_fft = n_fft |
| self.hop_length = hop_length |
| self.win_length = win_length |
| self.sample_rate = sample_rate |
| self.n_mel_channels = n_mel_channels |
| self.clamp = clamp |
| self.is_half = is_half |
| |
| def forward(self, audio, keyshift=0, speed=1, center=True): |
| factor = 2 ** (keyshift / 12) |
| n_fft_new = int(np.round(self.n_fft * factor)) |
| win_length_new = int(np.round(self.win_length * factor)) |
| hop_length_new = int(np.round(self.hop_length * speed)) |
| keyshift_key = f"{keyshift}_{audio.device}" |
| if keyshift_key not in self.hann_window: |
| self.hann_window[keyshift_key] = torch.hann_window(win_length_new).to( |
| audio.device |
| ) |
| if not hasattr(self, "stft"): |
| self.stft = STFT( |
| filter_length=n_fft_new, |
| hop_length=hop_length_new, |
| win_length=win_length_new, |
| window="hann", |
| ).to(audio.device) |
| magnitude = self.stft.transform(audio) |
| if keyshift != 0: |
| size = self.n_fft // 2 + 1 |
| resize = magnitude.size(1) |
| if resize < size: |
| magnitude = F.pad(magnitude, (0, 0, 0, size - resize)) |
| magnitude = magnitude[:, :size, :] * self.win_length / win_length_new |
| mel_output = torch.matmul(self.mel_basis, magnitude) |
| if self.is_half: |
| mel_output = mel_output.half() |
| return torch.log(torch.clamp(mel_output, min=self.clamp)) |
| |
| |
| class RMVPE0Predictor: |
| def __init__(self, model_path, is_half, device=None): |
| self.resample_kernel = {} |
| self.is_half = is_half |
| if device is None: |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| self.device = device |
| self.mel_extractor = MelSpectrogram( |
| is_half, 128, 16000, 1024, 160, None, 30, 8000 |
| ).to(device) |
| model = E2E(4, 1, (2, 2)) |
| ckpt = torch.load(model_path, map_location="cpu", weights_only=True) |
| model.load_state_dict(ckpt) |
| model.eval() |
| if is_half: |
| model = model.half() |
| self.model = model.to(device) |
| self.cents_mapping = np.pad(20 * np.arange(360) + 1997.3794084376191, (4, 4)) |
| |
| def mel2hidden(self, mel): |
| with torch.no_grad(): |
| n_frames = mel.shape[-1] |
| mel = mel.float() |
| padding = min(32 * ((n_frames - 1) // 32 + 1) - n_frames, n_frames) |
| mel = F.pad(mel, (0, padding), mode="reflect") |
| if self.is_half: |
| mel = mel.half() |
| hidden = self.model(mel) |
| return hidden[:, :n_frames] |
| |
| def decode(self, hidden, thred=0.03): |
| cents_pred = self.to_local_average_cents(hidden, thred=thred) |
| f0 = 10 * (2 ** (cents_pred / 1200)) |
| f0[f0 == 10] = 0 |
| return f0 |
| |
| def infer_from_audio(self, audio, thred=0.03): |
| audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0) |
| mel = self.mel_extractor(audio, center=True) |
| hidden = self.mel2hidden(mel) |
| hidden = hidden.squeeze(0).cpu().numpy() |
| if self.is_half: |
| hidden = hidden.astype("float32") |
| return self.decode(hidden, thred=thred) |
| |
| def infer_from_audio_with_pitch(self, audio, thred=0.03, f0_min=50, f0_max=1100): |
| audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0) |
| mel = self.mel_extractor(audio, center=True) |
| hidden = self.mel2hidden(mel) |
| hidden = hidden.squeeze(0).cpu().numpy() |
| if self.is_half: |
| hidden = hidden.astype("float32") |
| f0 = self.decode(hidden, thred=thred) |
| f0[(f0 < f0_min) | (f0 > f0_max)] = 0 |
| return f0 |
| |
| def to_local_average_cents(self, salience, thred=0.05): |
| center = np.argmax(salience, axis=1) |
| salience = np.pad(salience, ((0, 0), (4, 4))) |
| center += 4 |
| todo_salience = [] |
| todo_cents_mapping = [] |
| starts = center - 4 |
| ends = center + 5 |
| for idx in range(salience.shape[0]): |
| todo_salience.append(salience[:, starts[idx] : ends[idx]][idx]) |
| todo_cents_mapping.append(self.cents_mapping[starts[idx] : ends[idx]]) |
| todo_salience = np.array(todo_salience) |
| todo_cents_mapping = np.array(todo_cents_mapping) |
| product_sum = np.sum(todo_salience * todo_cents_mapping, 1) |
| weight_sum = np.sum(todo_salience, 1) |
| divided = product_sum / weight_sum |
| maxx = np.max(salience, axis=1) |
| divided[maxx <= thred] = 0 |
| return divided |
| |
| ''' |
| with open(os.sep.join([current_dir, dirs[6], "RMVPE.py"]), 'w') as f: |
| f.write(RMVPE) |
| |
| FCPE = ''' |
| from typing import Union |
| |
| import torch.nn.functional as F |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| from torch.nn.utils.parametrizations import weight_norm |
| from torchaudio.transforms import Resample |
| import os |
| import librosa |
| import soundfile as sf |
| import torch.utils.data |
| from librosa.filters import mel as librosa_mel_fn |
| import math |
| from functools import partial |
| |
| from einops import rearrange, repeat |
| from local_attention import LocalAttention |
| |
| os.environ["LRU_CACHE_CAPACITY"] = "3" |
| |
| |
| def load_wav_to_torch(full_path, target_sr=None, return_empty_on_exception=False): |
| try: |
| data, sample_rate = sf.read(full_path, always_2d=True) |
| except Exception as error: |
| print(f"An error occurred loading {full_path}: {error}") |
| if return_empty_on_exception: |
| return [], sample_rate or target_sr or 48000 |
| else: |
| raise |
| |
| data = data[:, 0] if len(data.shape) > 1 else data |
| assert len(data) > 2 |
| |
| max_mag = ( |
| -np.iinfo(data.dtype).min |
| if np.issubdtype(data.dtype, np.integer) |
| else max(np.amax(data), -np.amin(data)) |
| ) |
| max_mag = ( |
| (2**31) + 1 if max_mag > (2**15) else ((2**15) + 1 if max_mag > 1.01 else 1.0) |
| ) |
| data = torch.FloatTensor(data.astype(np.float32)) / max_mag |
| |
| if (torch.isinf(data) | torch.isnan(data)).any() and return_empty_on_exception: |
| return [], sample_rate or target_sr or 48000 |
| if target_sr is not None and sample_rate != target_sr: |
| data = torch.from_numpy( |
| librosa.core.resample(data.numpy(), orig_sr=sample_rate, target_sr=target_sr) |
| ) |
| sample_rate = target_sr |
| |
| return data, sample_rate |
| |
| |
| def dynamic_range_compression(x, C=1, clip_val=1e-5): |
| return np.log(np.clip(x, a_min=clip_val, a_max=None) * C) |
| |
| |
| def dynamic_range_decompression(x, C=1): |
| return np.exp(x) / C |
| |
| |
| def dynamic_range_compression_torch(x, C=1, clip_val=1e-5): |
| return torch.log(torch.clamp(x, min=clip_val) * C) |
| |
| |
| def dynamic_range_decompression_torch(x, C=1): |
| return torch.exp(x) / C |
| |
| |
| class STFT: |
| def __init__( |
| self, |
| sr=22050, |
| n_mels=80, |
| n_fft=1024, |
| win_size=1024, |
| hop_length=256, |
| fmin=20, |
| fmax=11025, |
| clip_val=1e-5, |
| ): |
| self.target_sr = sr |
| self.n_mels = n_mels |
| self.n_fft = n_fft |
| self.win_size = win_size |
| self.hop_length = hop_length |
| self.fmin = fmin |
| self.fmax = fmax |
| self.clip_val = clip_val |
| self.mel_basis = {} |
| self.hann_window = {} |
| |
| def get_mel(self, y, keyshift=0, speed=1, center=False, train=False): |
| sample_rate = self.target_sr |
| n_mels = self.n_mels |
| n_fft = self.n_fft |
| win_size = self.win_size |
| hop_length = self.hop_length |
| fmin = self.fmin |
| fmax = self.fmax |
| clip_val = self.clip_val |
| |
| factor = 2 ** (keyshift / 12) |
| n_fft_new = int(np.round(n_fft * factor)) |
| win_size_new = int(np.round(win_size * factor)) |
| hop_length_new = int(np.round(hop_length * speed)) |
| |
| mel_basis = self.mel_basis if not train else {} |
| hann_window = self.hann_window if not train else {} |
| |
| mel_basis_key = str(fmax) + "_" + str(y.device) |
| if mel_basis_key not in mel_basis: |
| mel = librosa_mel_fn( |
| sr=sample_rate, n_fft=n_fft, n_mels=n_mels, fmin=fmin, fmax=fmax |
| ) |
| mel_basis[mel_basis_key] = torch.from_numpy(mel).float().to(y.device) |
| |
| keyshift_key = str(keyshift) + "_" + str(y.device) |
| if keyshift_key not in hann_window: |
| hann_window[keyshift_key] = torch.hann_window(win_size_new).to(y.device) |
| |
| pad_left = (win_size_new - hop_length_new) // 2 |
| pad_right = max( |
| (win_size_new - hop_length_new + 1) // 2, |
| win_size_new - y.size(-1) - pad_left, |
| ) |
| mode = "reflect" if pad_right < y.size(-1) else "constant" |
| y = torch.nn.functional.pad(y.unsqueeze(1), (pad_left, pad_right), mode=mode) |
| y = y.squeeze(1) |
| |
| spec = torch.stft( |
| y, |
| n_fft_new, |
| hop_length=hop_length_new, |
| win_length=win_size_new, |
| window=hann_window[keyshift_key], |
| center=center, |
| pad_mode="reflect", |
| normalized=False, |
| onesided=True, |
| return_complex=True, |
| ) |
| spec = torch.sqrt(spec.real.pow(2) + spec.imag.pow(2) + (1e-9)) |
| |
| if keyshift != 0: |
| size = n_fft // 2 + 1 |
| resize = spec.size(1) |
| spec = ( |
| F.pad(spec, (0, 0, 0, size - resize)) |
| if resize < size |
| else spec[:, :size, :] |
| ) |
| spec = spec * win_size / win_size_new |
| spec = torch.matmul(mel_basis[mel_basis_key], spec) |
| spec = dynamic_range_compression_torch(spec, clip_val=clip_val) |
| return spec |
| |
| def __call__(self, audiopath): |
| audio, sr = load_wav_to_torch(audiopath, target_sr=self.target_sr) |
| spect = self.get_mel(audio.unsqueeze(0)).squeeze(0) |
| return spect |
| |
| |
| stft = STFT() |
| |
| |
| def softmax_kernel( |
| data, *, projection_matrix, is_query, normalize_data=True, eps=1e-4, device=None |
| ): |
| b, h, *_ = data.shape |
| |
| data_normalizer = (data.shape[-1] ** -0.25) if normalize_data else 1.0 |
| |
| ratio = projection_matrix.shape[0] ** -0.5 |
| projection = repeat(projection_matrix, "j d -> b h j d", b=b, h=h) |
| projection = projection.type_as(data) |
| data_dash = torch.einsum("...id,...jd->...ij", (data_normalizer * data), projection) |
| |
| diag_data = data**2 |
| diag_data = torch.sum(diag_data, dim=-1) |
| diag_data = (diag_data / 2.0) * (data_normalizer**2) |
| diag_data = diag_data.unsqueeze(dim=-1) |
| |
| if is_query: |
| data_dash = ratio * ( |
| torch.exp( |
| data_dash - diag_data - torch.max(data_dash, dim=-1, keepdim=True).values |
| ) |
| + eps |
| ) |
| else: |
| data_dash = ratio * (torch.exp(data_dash - diag_data + eps)) |
| |
| return data_dash.type_as(data) |
| |
| |
| def orthogonal_matrix_chunk(cols, qr_uniform_q=False, device=None): |
| unstructured_block = torch.randn((cols, cols), device=device) |
| q, r = torch.linalg.qr(unstructured_block.cpu(), mode="reduced") |
| q, r = map(lambda t: t.to(device), (q, r)) |
| |
| if qr_uniform_q: |
| d = torch.diag(r, 0) |
| q *= d.sign() |
| return q.t() |
| |
| |
| def exists(val): |
| return val is not None |
| |
| |
| def empty(tensor): |
| return tensor.numel() == 0 |
| |
| |
| def default(val, d): |
| return val if exists(val) else d |
| |
| |
| def cast_tuple(val): |
| return (val,) if not isinstance(val, tuple) else val |
| |
| |
| class PCmer(nn.Module): |
| def __init__( |
| self, |
| num_layers, |
| num_heads, |
| dim_model, |
| dim_keys, |
| dim_values, |
| residual_dropout, |
| attention_dropout, |
| ): |
| super().__init__() |
| self.num_layers = num_layers |
| self.num_heads = num_heads |
| self.dim_model = dim_model |
| self.dim_values = dim_values |
| self.dim_keys = dim_keys |
| self.residual_dropout = residual_dropout |
| self.attention_dropout = attention_dropout |
| |
| self._layers = nn.ModuleList([_EncoderLayer(self) for _ in range(num_layers)]) |
| |
| def forward(self, phone, mask=None): |
| for layer in self._layers: |
| phone = layer(phone, mask) |
| return phone |
| |
| |
| class _EncoderLayer(nn.Module): |
| def __init__(self, parent: PCmer): |
| super().__init__() |
| self.conformer = ConformerConvModule(parent.dim_model) |
| self.norm = nn.LayerNorm(parent.dim_model) |
| self.dropout = nn.Dropout(parent.residual_dropout) |
| self.attn = SelfAttention( |
| dim=parent.dim_model, heads=parent.num_heads, causal=False |
| ) |
| |
| def forward(self, phone, mask=None): |
| phone = phone + (self.attn(self.norm(phone), mask=mask)) |
| phone = phone + (self.conformer(phone)) |
| return phone |
| |
| |
| def calc_same_padding(kernel_size): |
| pad = kernel_size // 2 |
| return (pad, pad - (kernel_size + 1) % 2) |
| |
| |
| class Swish(nn.Module): |
| def forward(self, x): |
| return x * x.sigmoid() |
| |
| |
| class Transpose(nn.Module): |
| def __init__(self, dims): |
| super().__init__() |
| assert len(dims) == 2, "dims must be a tuple of two dimensions" |
| self.dims = dims |
| |
| def forward(self, x): |
| return x.transpose(*self.dims) |
| |
| |
| class GLU(nn.Module): |
| def __init__(self, dim): |
| super().__init__() |
| self.dim = dim |
| |
| def forward(self, x): |
| out, gate = x.chunk(2, dim=self.dim) |
| return out * gate.sigmoid() |
| |
| |
| class DepthWiseConv1d(nn.Module): |
| def __init__(self, chan_in, chan_out, kernel_size, padding): |
| super().__init__() |
| self.padding = padding |
| self.conv = nn.Conv1d(chan_in, chan_out, kernel_size, groups=chan_in) |
| |
| def forward(self, x): |
| x = F.pad(x, self.padding) |
| return self.conv(x) |
| |
| |
| class ConformerConvModule(nn.Module): |
| def __init__( |
| self, dim, causal=False, expansion_factor=2, kernel_size=31, dropout=0.0 |
| ): |
| super().__init__() |
| |
| inner_dim = dim * expansion_factor |
| padding = calc_same_padding(kernel_size) if not causal else (kernel_size - 1, 0) |
| |
| self.net = nn.Sequential( |
| nn.LayerNorm(dim), |
| Transpose((1, 2)), |
| nn.Conv1d(dim, inner_dim * 2, 1), |
| GLU(dim=1), |
| DepthWiseConv1d( |
| inner_dim, inner_dim, kernel_size=kernel_size, padding=padding |
| ), |
| Swish(), |
| nn.Conv1d(inner_dim, dim, 1), |
| Transpose((1, 2)), |
| nn.Dropout(dropout), |
| ) |
| |
| def forward(self, x): |
| return self.net(x) |
| |
| |
| def linear_attention(q, k, v): |
| if v is None: |
| out = torch.einsum("...ed,...nd->...ne", k, q) |
| return out |
| else: |
| k_cumsum = k.sum(dim=-2) |
| D_inv = 1.0 / (torch.einsum("...nd,...d->...n", q, k_cumsum.type_as(q)) + 1e-8) |
| context = torch.einsum("...nd,...ne->...de", k, v) |
| out = torch.einsum("...de,...nd,...n->...ne", context, q, D_inv) |
| return out |
| |
| |
| def gaussian_orthogonal_random_matrix( |
| nb_rows, nb_columns, scaling=0, qr_uniform_q=False, device=None |
| ): |
| nb_full_blocks = int(nb_rows / nb_columns) |
| block_list = [] |
| |
| for _ in range(nb_full_blocks): |
| q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q, device=device) |
| block_list.append(q) |
| |
| remaining_rows = nb_rows - nb_full_blocks * nb_columns |
| if remaining_rows > 0: |
| q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q, device=device) |
| block_list.append(q[:remaining_rows]) |
| |
| final_matrix = torch.cat(block_list) |
| |
| if scaling == 0: |
| multiplier = torch.randn((nb_rows, nb_columns), device=device).norm(dim=1) |
| elif scaling == 1: |
| multiplier = math.sqrt((float(nb_columns))) * torch.ones( |
| (nb_rows,), device=device |
| ) |
| else: |
| raise ValueError(f"Invalid scaling {scaling}") |
| |
| return torch.diag(multiplier) @ final_matrix |
| |
| |
| class FastAttention(nn.Module): |
| def __init__( |
| self, |
| dim_heads, |
| nb_features=None, |
| ortho_scaling=0, |
| causal=False, |
| generalized_attention=False, |
| kernel_fn=nn.ReLU(), |
| qr_uniform_q=False, |
| no_projection=False, |
| ): |
| super().__init__() |
| nb_features = default(nb_features, int(dim_heads * math.log(dim_heads))) |
| |
| self.dim_heads = dim_heads |
| self.nb_features = nb_features |
| self.ortho_scaling = ortho_scaling |
| |
| self.create_projection = partial( |
| gaussian_orthogonal_random_matrix, |
| nb_rows=self.nb_features, |
| nb_columns=dim_heads, |
| scaling=ortho_scaling, |
| qr_uniform_q=qr_uniform_q, |
| ) |
| projection_matrix = self.create_projection() |
| self.register_buffer("projection_matrix", projection_matrix) |
| |
| self.generalized_attention = generalized_attention |
| self.kernel_fn = kernel_fn |
| self.no_projection = no_projection |
| self.causal = causal |
| |
| @torch.no_grad() |
| def redraw_projection_matrix(self): |
| projections = self.create_projection() |
| self.projection_matrix.copy_(projections) |
| del projections |
| |
| def forward(self, q, k, v): |
| device = q.device |
| |
| if self.no_projection: |
| q = q.softmax(dim=-1) |
| k = torch.exp(k) if self.causal else k.softmax(dim=-2) |
| else: |
| create_kernel = partial( |
| softmax_kernel, projection_matrix=self.projection_matrix, device=device |
| ) |
| q = create_kernel(q, is_query=True) |
| k = create_kernel(k, is_query=False) |
| |
| attn_fn = linear_attention if not self.causal else self.causal_linear_fn |
| |
| if v is None: |
| out = attn_fn(q, k, None) |
| return out |
| else: |
| out = attn_fn(q, k, v) |
| return out |
| |
| |
| class SelfAttention(nn.Module): |
| def __init__( |
| self, |
| dim, |
| causal=False, |
| heads=8, |
| dim_head=64, |
| local_heads=0, |
| local_window_size=256, |
| nb_features=None, |
| feature_redraw_interval=1000, |
| generalized_attention=False, |
| kernel_fn=nn.ReLU(), |
| qr_uniform_q=False, |
| dropout=0.0, |
| no_projection=False, |
| ): |
| super().__init__() |
| assert dim % heads == 0, "dimension must be divisible by number of heads" |
| dim_head = default(dim_head, dim // heads) |
| inner_dim = dim_head * heads |
| self.fast_attention = FastAttention( |
| dim_head, |
| nb_features, |
| causal=causal, |
| generalized_attention=generalized_attention, |
| kernel_fn=kernel_fn, |
| qr_uniform_q=qr_uniform_q, |
| no_projection=no_projection, |
| ) |
| |
| self.heads = heads |
| self.global_heads = heads - local_heads |
| self.local_attn = ( |
| LocalAttention( |
| window_size=local_window_size, |
| causal=causal, |
| autopad=True, |
| dropout=dropout, |
| look_forward=int(not causal), |
| rel_pos_emb_config=(dim_head, local_heads), |
| ) |
| if local_heads > 0 |
| else None |
| ) |
| |
| self.to_q = nn.Linear(dim, inner_dim) |
| self.to_k = nn.Linear(dim, inner_dim) |
| self.to_v = nn.Linear(dim, inner_dim) |
| self.to_out = nn.Linear(inner_dim, dim) |
| self.dropout = nn.Dropout(dropout) |
| |
| @torch.no_grad() |
| def redraw_projection_matrix(self): |
| self.fast_attention.redraw_projection_matrix() |
| |
| def forward( |
| self, |
| x, |
| context=None, |
| mask=None, |
| context_mask=None, |
| name=None, |
| inference=False, |
| **kwargs, |
| ): |
| _, _, _, h, gh = *x.shape, self.heads, self.global_heads |
| |
| cross_attend = exists(context) |
| context = default(context, x) |
| context_mask = default(context_mask, mask) if not cross_attend else context_mask |
| q, k, v = self.to_q(x), self.to_k(context), self.to_v(context) |
| |
| q, k, v = map(lambda t: rearrange(t, "b n (h d) -> b h n d", h=h), (q, k, v)) |
| (q, lq), (k, lk), (v, lv) = map(lambda t: (t[:, :gh], t[:, gh:]), (q, k, v)) |
| |
| attn_outs = [] |
| if not empty(q): |
| if exists(context_mask): |
| global_mask = context_mask[:, None, :, None] |
| v.masked_fill_(~global_mask, 0.0) |
| if cross_attend: |
| pass |
| else: |
| out = self.fast_attention(q, k, v) |
| attn_outs.append(out) |
| |
| if not empty(lq): |
| assert ( |
| not cross_attend |
| ), "local attention is not compatible with cross attention" |
| out = self.local_attn(lq, lk, lv, input_mask=mask) |
| attn_outs.append(out) |
| |
| out = torch.cat(attn_outs, dim=1) |
| out = rearrange(out, "b h n d -> b n (h d)") |
| out = self.to_out(out) |
| return self.dropout(out) |
| |
| |
| def l2_regularization(model, l2_alpha): |
| l2_loss = [] |
| for module in model.modules(): |
| if type(module) is nn.Conv2d: |
| l2_loss.append((module.weight**2).sum() / 2.0) |
| return l2_alpha * sum(l2_loss) |
| |
| |
| class FCPE(nn.Module): |
| def __init__( |
| self, |
| input_channel=128, |
| out_dims=360, |
| n_layers=12, |
| n_chans=512, |
| use_siren=False, |
| use_full=False, |
| loss_mse_scale=10, |
| loss_l2_regularization=False, |
| loss_l2_regularization_scale=1, |
| loss_grad1_mse=False, |
| loss_grad1_mse_scale=1, |
| f0_max=1975.5, |
| f0_min=32.70, |
| confidence=False, |
| threshold=0.05, |
| use_input_conv=True, |
| ): |
| super().__init__() |
| if use_siren is True: |
| raise ValueError("Siren is not supported yet.") |
| if use_full is True: |
| raise ValueError("Full model is not supported yet.") |
| |
| self.loss_mse_scale = loss_mse_scale if (loss_mse_scale is not None) else 10 |
| self.loss_l2_regularization = ( |
| loss_l2_regularization if (loss_l2_regularization is not None) else False |
| ) |
| self.loss_l2_regularization_scale = ( |
| loss_l2_regularization_scale |
| if (loss_l2_regularization_scale is not None) |
| else 1 |
| ) |
| self.loss_grad1_mse = loss_grad1_mse if (loss_grad1_mse is not None) else False |
| self.loss_grad1_mse_scale = ( |
| loss_grad1_mse_scale if (loss_grad1_mse_scale is not None) else 1 |
| ) |
| self.f0_max = f0_max if (f0_max is not None) else 1975.5 |
| self.f0_min = f0_min if (f0_min is not None) else 32.70 |
| self.confidence = confidence if (confidence is not None) else False |
| self.threshold = threshold if (threshold is not None) else 0.05 |
| self.use_input_conv = use_input_conv if (use_input_conv is not None) else True |
| |
| self.cent_table_b = torch.Tensor( |
| np.linspace( |
| self.f0_to_cent(torch.Tensor([f0_min]))[0], |
| self.f0_to_cent(torch.Tensor([f0_max]))[0], |
| out_dims, |
| ) |
| ) |
| self.register_buffer("cent_table", self.cent_table_b) |
| |
| _leaky = nn.LeakyReLU() |
| self.stack = nn.Sequential( |
| nn.Conv1d(input_channel, n_chans, 3, 1, 1), |
| nn.GroupNorm(4, n_chans), |
| _leaky, |
| nn.Conv1d(n_chans, n_chans, 3, 1, 1), |
| ) |
| |
| self.decoder = PCmer( |
| num_layers=n_layers, |
| num_heads=8, |
| dim_model=n_chans, |
| dim_keys=n_chans, |
| dim_values=n_chans, |
| residual_dropout=0.1, |
| attention_dropout=0.1, |
| ) |
| self.norm = nn.LayerNorm(n_chans) |
| |
| self.n_out = out_dims |
| self.dense_out = weight_norm(nn.Linear(n_chans, self.n_out)) |
| |
| def forward( |
| self, mel, infer=True, gt_f0=None, return_hz_f0=False, cdecoder="local_argmax" |
| ): |
| if cdecoder == "argmax": |
| self.cdecoder = self.cents_decoder |
| elif cdecoder == "local_argmax": |
| self.cdecoder = self.cents_local_decoder |
| |
| x = ( |
| self.stack(mel.transpose(1, 2)).transpose(1, 2) |
| if self.use_input_conv |
| else mel |
| ) |
| x = self.decoder(x) |
| x = self.norm(x) |
| x = self.dense_out(x) |
| x = torch.sigmoid(x) |
| |
| if not infer: |
| gt_cent_f0 = self.f0_to_cent(gt_f0) |
| gt_cent_f0 = self.gaussian_blurred_cent(gt_cent_f0) |
| loss_all = self.loss_mse_scale * F.binary_cross_entropy(x, gt_cent_f0) |
| if self.loss_l2_regularization: |
| loss_all = loss_all + l2_regularization( |
| model=self, l2_alpha=self.loss_l2_regularization_scale |
| ) |
| x = loss_all |
| if infer: |
| x = self.cdecoder(x) |
| x = self.cent_to_f0(x) |
| x = (1 + x / 700).log() if not return_hz_f0 else x |
| |
| return x |
| |
| def cents_decoder(self, y, mask=True): |
| B, N, _ = y.size() |
| ci = self.cent_table[None, None, :].expand(B, N, -1) |
| rtn = torch.sum(ci * y, dim=-1, keepdim=True) / torch.sum(y, dim=-1, keepdim=True) |
| if mask: |
| confident = torch.max(y, dim=-1, keepdim=True)[0] |
| confident_mask = torch.ones_like(confident) |
| confident_mask[confident <= self.threshold] = float("-INF") |
| rtn = rtn * confident_mask |
| return (rtn, confident) if self.confidence else rtn |
| |
| def cents_local_decoder(self, y, mask=True): |
| B, N, _ = y.size() |
| ci = self.cent_table[None, None, :].expand(B, N, -1) |
| confident, max_index = torch.max(y, dim=-1, keepdim=True) |
| local_argmax_index = torch.arange(0, 9).to(max_index.device) + (max_index - 4) |
| local_argmax_index = torch.clamp(local_argmax_index, 0, self.n_out - 1) |
| ci_l = torch.gather(ci, -1, local_argmax_index) |
| y_l = torch.gather(y, -1, local_argmax_index) |
| rtn = torch.sum(ci_l * y_l, dim=-1, keepdim=True) / torch.sum( |
| y_l, dim=-1, keepdim=True |
| ) |
| if mask: |
| confident_mask = torch.ones_like(confident) |
| confident_mask[confident <= self.threshold] = float("-INF") |
| rtn = rtn * confident_mask |
| return (rtn, confident) if self.confidence else rtn |
| |
| def cent_to_f0(self, cent): |
| return 10.0 * 2 ** (cent / 1200.0) |
| |
| def f0_to_cent(self, f0): |
| return 1200.0 * torch.log2(f0 / 10.0) |
| |
| def gaussian_blurred_cent(self, cents): |
| mask = (cents > 0.1) & (cents < (1200.0 * np.log2(self.f0_max / 10.0))) |
| B, N, _ = cents.size() |
| ci = self.cent_table[None, None, :].expand(B, N, -1) |
| return torch.exp(-torch.square(ci - cents) / 1250) * mask.float() |
| |
| |
| class FCPEInfer: |
| def __init__(self, model_path, device=None, dtype=torch.float32): |
| if device is None: |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| self.device = device |
| ckpt = torch.load(model_path, map_location=torch.device(self.device)) |
| self.args = DotDict(ckpt["config"]) |
| self.dtype = dtype |
| model = FCPE( |
| input_channel=self.args.model.input_channel, |
| out_dims=self.args.model.out_dims, |
| n_layers=self.args.model.n_layers, |
| n_chans=self.args.model.n_chans, |
| use_siren=self.args.model.use_siren, |
| use_full=self.args.model.use_full, |
| loss_mse_scale=self.args.loss.loss_mse_scale, |
| loss_l2_regularization=self.args.loss.loss_l2_regularization, |
| loss_l2_regularization_scale=self.args.loss.loss_l2_regularization_scale, |
| loss_grad1_mse=self.args.loss.loss_grad1_mse, |
| loss_grad1_mse_scale=self.args.loss.loss_grad1_mse_scale, |
| f0_max=self.args.model.f0_max, |
| f0_min=self.args.model.f0_min, |
| confidence=self.args.model.confidence, |
| ) |
| model.to(self.device).to(self.dtype) |
| model.load_state_dict(ckpt["model"]) |
| model.eval() |
| self.model = model |
| self.wav2mel = Wav2Mel(self.args, dtype=self.dtype, device=self.device) |
| |
| @torch.no_grad() |
| def __call__(self, audio, sr, threshold=0.05): |
| self.model.threshold = threshold |
| audio = audio[None, :] |
| mel = self.wav2mel(audio=audio, sample_rate=sr).to(self.dtype) |
| f0 = self.model(mel=mel, infer=True, return_hz_f0=True) |
| return f0 |
| |
| |
| class Wav2Mel: |
| def __init__(self, args, device=None, dtype=torch.float32): |
| self.sample_rate = args.mel.sampling_rate |
| self.hop_size = args.mel.hop_size |
| if device is None: |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| self.device = device |
| self.dtype = dtype |
| self.stft = STFT( |
| args.mel.sampling_rate, |
| args.mel.num_mels, |
| args.mel.n_fft, |
| args.mel.win_size, |
| args.mel.hop_size, |
| args.mel.fmin, |
| args.mel.fmax, |
| ) |
| self.resample_kernel = {} |
| |
| def extract_nvstft(self, audio, keyshift=0, train=False): |
| mel = self.stft.get_mel(audio, keyshift=keyshift, train=train).transpose(1, 2) |
| return mel |
| |
| def extract_mel(self, audio, sample_rate, keyshift=0, train=False): |
| audio = audio.to(self.dtype).to(self.device) |
| if sample_rate == self.sample_rate: |
| audio_res = audio |
| else: |
| key_str = str(sample_rate) |
| if key_str not in self.resample_kernel: |
| self.resample_kernel[key_str] = Resample( |
| sample_rate, self.sample_rate, lowpass_filter_width=128 |
| ) |
| self.resample_kernel[key_str] = ( |
| self.resample_kernel[key_str].to(self.dtype).to(self.device) |
| ) |
| audio_res = self.resample_kernel[key_str](audio) |
| |
| mel = self.extract_nvstft(audio_res, keyshift=keyshift, train=train) |
| n_frames = int(audio.shape[1] // self.hop_size) + 1 |
| mel = torch.cat((mel, mel[:, -1:, :]), 1) if n_frames > int(mel.shape[1]) else mel |
| mel = mel[:, :n_frames, :] if n_frames < int(mel.shape[1]) else mel |
| return mel |
| |
| def __call__(self, audio, sample_rate, keyshift=0, train=False): |
| return self.extract_mel(audio, sample_rate, keyshift=keyshift, train=train) |
| |
| |
| class DotDict(dict): |
| def __getattr__(*args): |
| val = dict.get(*args) |
| return DotDict(val) if type(val) is dict else val |
| |
| __setattr__ = dict.__setitem__ |
| __delattr__ = dict.__delitem__ |
| |
| |
| class F0Predictor(object): |
| def compute_f0(self, wav, p_len): |
| pass |
| |
| def compute_f0_uv(self, wav, p_len): |
| pass |
| |
| |
| class FCPEF0Predictor(F0Predictor): |
| def __init__( |
| self, |
| model_path, |
| hop_length=512, |
| f0_min=50, |
| f0_max=1100, |
| dtype=torch.float32, |
| device=None, |
| sample_rate=44100, |
| threshold=0.05, |
| ): |
| self.fcpe = FCPEInfer(model_path, device=device, dtype=dtype) |
| self.hop_length = hop_length |
| self.f0_min = f0_min |
| self.f0_max = f0_max |
| self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
| self.threshold = threshold |
| self.sample_rate = sample_rate |
| self.dtype = dtype |
| self.name = "fcpe" |
| |
| def repeat_expand( |
| self, |
| content: Union[torch.Tensor, np.ndarray], |
| target_len: int, |
| mode: str = "nearest", |
| ): |
| ndim = content.ndim |
| content = ( |
| content[None, None] if ndim == 1 else content[None] if ndim == 2 else content |
| ) |
| assert content.ndim == 3 |
| is_np = isinstance(content, np.ndarray) |
| content = torch.from_numpy(content) if is_np else content |
| results = torch.nn.functional.interpolate(content, size=target_len, mode=mode) |
| results = results.numpy() if is_np else results |
| return results[0, 0] if ndim == 1 else results[0] if ndim == 2 else results |
| |
| def post_process(self, x, sample_rate, f0, pad_to): |
| f0 = ( |
| torch.from_numpy(f0).float().to(x.device) |
| if isinstance(f0, np.ndarray) |
| else f0 |
| ) |
| f0 = self.repeat_expand(f0, pad_to) if pad_to is not None else f0 |
| |
| vuv_vector = torch.zeros_like(f0) |
| vuv_vector[f0 > 0.0] = 1.0 |
| vuv_vector[f0 <= 0.0] = 0.0 |
| |
| nzindex = torch.nonzero(f0).squeeze() |
| f0 = torch.index_select(f0, dim=0, index=nzindex).cpu().numpy() |
| time_org = self.hop_length / sample_rate * nzindex.cpu().numpy() |
| time_frame = np.arange(pad_to) * self.hop_length / sample_rate |
| |
| vuv_vector = F.interpolate(vuv_vector[None, None, :], size=pad_to)[0][0] |
| |
| if f0.shape[0] <= 0: |
| return np.zeros(pad_to), vuv_vector.cpu().numpy() |
| if f0.shape[0] == 1: |
| return np.ones(pad_to) * f0[0], vuv_vector.cpu().numpy() |
| |
| f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1]) |
| return f0, vuv_vector.cpu().numpy() |
| |
| def compute_f0(self, wav, p_len=None): |
| x = torch.FloatTensor(wav).to(self.dtype).to(self.device) |
| p_len = x.shape[0] // self.hop_length if p_len is None else p_len |
| f0 = self.fcpe(x, sr=self.sample_rate, threshold=self.threshold)[0, :, 0] |
| if torch.all(f0 == 0): |
| return f0.cpu().numpy() if p_len is None else np.zeros(p_len), ( |
| f0.cpu().numpy() if p_len is None else np.zeros(p_len) |
| ) |
| return self.post_process(x, self.sample_rate, f0, p_len)[0] |
| |
| def compute_f0_uv(self, wav, p_len=None): |
| x = torch.FloatTensor(wav).to(self.dtype).to(self.device) |
| p_len = x.shape[0] // self.hop_length if p_len is None else p_len |
| f0 = self.fcpe(x, sr=self.sample_rate, threshold=self.threshold)[0, :, 0] |
| if torch.all(f0 == 0): |
| return f0.cpu().numpy() if p_len is None else np.zeros(p_len), ( |
| f0.cpu().numpy() if p_len is None else np.zeros(p_len) |
| ) |
| return self.post_process(x, self.sample_rate, f0, p_len) |
| |
| ''' |
|
|
| with open(os.sep.join([current_dir, dirs[6], "FCPE.py"]), 'w') as f: |
| f.write(FCPE) |
| |
|
|
| VBACH_CLI = ''' |
| import gc |
| import os |
| import datetime |
| import gradio as gr |
| import torch |
| import librosa |
| import tempfile |
| from datetime import datetime |
| import argparse |
| from vbach.infer.infer import Config, load_hubert, get_vc, rvc_infer |
| |
| # Константы |
| |
| RVC_MODELS_DIR = os.path.join(os.getcwd(), "voice_models") |
| HUBERT_MODEL_PATH = os.path.join( |
| os.getcwd(), "vbach", "models", "embedders", "hubert_base.pt" |
| ) |
| OUTPUT_FORMAT = ["mp3", "wav", "flac", "aiff", "m4a", "aac", "ogg", "opus"] |
| |
| audio_extensions = {".mp3", ".wav", ".flac", ".aiff", ".m4a", ".aac", ".ogg", ".opus"} |
| |
| |
| # Важные функции |
| |
| def load_rvc_model(voice_model): |
| model_dir = os.path.join(RVC_MODELS_DIR, voice_model) |
| model_files = os.listdir(model_dir) |
| rvc_model_path = next( |
| (os.path.join(model_dir, f) for f in model_files if f.endswith(".pth")), None |
| ) |
| rvc_index_path = next( |
| (os.path.join(model_dir, f) for f in model_files if f.endswith(".index")), None |
| ) |
| |
| if not rvc_model_path: |
| raise ValueError( |
| f"\033[91mМодели {voice_model} не существует. " |
| "Возможно, вы неправильно ввели имя.\033[0m" |
| ) |
| |
| return rvc_model_path, rvc_index_path |
| |
| def voice_conversion( |
| voice_model, |
| vocals_path, |
| output_path, |
| pitch, |
| f0_method, |
| index_rate, |
| filter_radius, |
| volume_envelope, |
| protect, |
| hop_length, |
| f0_min, |
| f0_max, |
| format_output, |
| output_bitrate, |
| stereo_mode |
| ): |
| rvc_model_path, rvc_index_path = load_rvc_model(voice_model) |
| |
| config = Config() |
| hubert_model = load_hubert(config.device, config.is_half, HUBERT_MODEL_PATH) |
| cpt, version, net_g, tgt_sr, vc = get_vc( |
| config.device, config.is_half, config, rvc_model_path |
| ) |
| |
| output_audio = rvc_infer( |
| rvc_index_path, |
| index_rate, |
| vocals_path, |
| output_path, |
| pitch, |
| f0_method, |
| cpt, |
| version, |
| net_g, |
| filter_radius, |
| tgt_sr, |
| volume_envelope, |
| protect, |
| hop_length, |
| vc, |
| hubert_model, |
| f0_min, |
| f0_max, |
| format_output, |
| output_bitrate, |
| stereo_mode |
| ) |
| |
| del hubert_model, cpt, net_g, vc |
| gc.collect() |
| torch.cuda.empty_cache() |
| return output_audio |
| |
| def cli_conversion(input_audios, template="NAME_MODEL_F0METHOD_PITCH", output_dir="output", model_name="", index_rate=0, output_format="wav", stereo_mode="mono", method_pitch="rmvpe+", pitch=0, hop_length=128, filter_radius=3, rms=0.25, protect=0.33, f0_min=50, f0_max=1100): |
| if not input_audios: |
| raise ValueError( |
| "Не удалось найти аудиофайл(ы). " |
| "Убедитесь, что файл загрузился или проверьте правильность пути к нему." |
| ) |
| if not model_name: |
| raise ValueError("Выберите модель голоса для преобразования.") |
| if not os.path.exists(input_audios): |
| raise ValueError(f"Файл {input_audios} не найден.") |
| |
| if not os.path.exists(input_audios): |
| raise FileNotFoundError(f"Ошибка: '{input_audios}' не существует.") |
| |
| os.makedirs(output_dir, exist_ok=True) |
| |
| if os.path.isfile(input_audios): |
| # Проверяем, является ли файл аудио |
| ext = os.path.splitext(input_audios)[1].lower() |
| if ext not in audio_extensions: |
| raise ValueError(f"Ошибка: '{input_audios}' не является аудиофайлом (допустимые расширения: {audio_extensions}).") |
| print(f"Найден аудиофайл: {input_audios}") |
| |
| try: |
| file_name = os.path.basename(input_audios) |
| namefile = os.path.splitext(file_name)[0] |
| time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S") |
| output_name = template |
| output_path = os.path.join(output_dir, f"{output_name}.{output_format}") |
| voice_conversion(model_name, input_audios, output_path, pitch, method_pitch, index_rate, filter_radius, rms, protect, hop_length, f0_min, f0_max, output_format, "320k", stereo_mode) |
| finally: |
| print("Вокал успешно преобразован") |
| |
| elif os.path.isdir(input_audios): |
| # Ищем аудиофайлы в папке |
| audio_files = [] |
| for file in os.listdir(input_audios): |
| ext = os.path.splitext(file)[1].lower() |
| if ext in audio_extensions: |
| audio_files.append(os.path.join(input_audios, file)) |
| |
| if not audio_files: |
| raise FileNotFoundError(f"Ошибка: в папке '{input_audios}' нет аудиофайлов (допустимые расширения: {audio_extensions}).") |
| |
| print(f"Найдены аудиофайлы: {audio_files}") |
| |
| try: |
| output_paths = [] |
| for file in audio_files: |
| file_name = os.path.basename(file) |
| namefile = os.path.splitext(file_name)[0] |
| time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S") |
| output_name = ( |
| template |
| .replace("DATETIME", time_create_file) |
| .replace("NAME", namefile) |
| .replace("MODEL", model_name) |
| .replace("F0METHOD", method_pitch) |
| .replace("PITCH", f"{pitch}") |
| ) |
| output_path = os.path.join(output_dir, f"{output_name}.{output_format}") |
| voice_conversion(model_name, file, output_path, pitch, method_pitch, index_rate, filter_radius, rms, protect, hop_length, 50, 1100, output_format, "320k", stereo_mode) |
| output_paths.append(output_path) |
| finally: |
| print("Вокалы успешно преобразованы") |
| else: |
| raise ValueError(f"Ошибка: '{input_audios}' не является ни файлом, ни папкой.") |
| |
| def setup_args(): |
| parser = argparse.ArgumentParser(description='Vbach CLI') |
| |
| # Обязательные аргументы |
| parser.add_argument( |
| 'input_audios', |
| type=str, |
| help='Путь к аудиофайлу или папке с аудиофайлами для обработки' |
| ) |
| parser.add_argument( |
| 'output_dir', |
| type=str, |
| help='Папка для сохранения результатов конвертации' |
| ) |
| parser.add_argument( |
| 'model_name', |
| type=str, |
| help='Название голосовой модели RVC для преобразования' |
| ) |
| |
| # Необязательные аргументы с значениями по умолчанию |
| parser.add_argument( |
| '--template', |
| type=str, |
| default="NAME_MODEL_F0METHOD_PITCH", |
| help='Шаблон имени выходного файла (доступные замены: DATETIME, NAME, MODEL, F0METHOD, PITCH)' |
| ) |
| parser.add_argument( |
| '--index_rate', |
| type=float, |
| default=0, |
| help='Интенсивность использования индексного файла (от 0.0 до 1.0)', |
| metavar='[0.0-1.0]' |
| ) |
| parser.add_argument( |
| '--output_format', |
| type=str, |
| default="wav", |
| choices=OUTPUT_FORMAT, |
| help='Формат выходного аудиофайла' |
| ) |
| parser.add_argument( |
| '--stereo_mode', |
| type=str, |
| default="mono", |
| choices=["mono", "left/right", "sim/dif"], |
| help='Режим каналов: моно или стерео' |
| ) |
| parser.add_argument( |
| '--method_pitch', |
| type=str, |
| default="rmvpe+", |
| help='Метод извлечения pitch (тона)' |
| ) |
| parser.add_argument( |
| '--pitch', |
| type=int, |
| default=0, |
| help='Корректировка тона в полутонах' |
| ) |
| parser.add_argument( |
| '--hop_length', |
| type=int, |
| default=128, |
| help='Длина hop (в семплах) для обработки' |
| ) |
| parser.add_argument( |
| '--filter_radius', |
| type=int, |
| default=3, |
| help='Радиус фильтра для сглаживания' |
| ) |
| parser.add_argument( |
| '--rms', |
| type=float, |
| default=0.25, |
| help='Масштабирование огибающей громкости (RMS)' |
| ) |
| parser.add_argument( |
| '--protect', |
| type=float, |
| default=0.33, |
| help='Защита для глухих согласных звуков' |
| ) |
| parser.add_argument( |
| '--f0_min', |
| type=int, |
| default=50, |
| help='Минимальная частота pitch (F0) в Hz' |
| ) |
| parser.add_argument( |
| '--f0_max', |
| type=int, |
| default=1100, |
| help='Максимальная частота pitch (F0) в Hz' |
| ) |
| |
| return parser.parse_args() |
| |
| # Пример использования: |
| if __name__ == "__main__": |
| args = setup_args() |
| cli_conversion( |
| input_audios=args.input_audios, |
| output_dir=args.output_dir, |
| model_name=args.model_name, |
| template=args.template, |
| index_rate=args.index_rate, |
| output_format=args.output_format, |
| stereo_mode=args.stereo_mode, |
| method_pitch=args.method_pitch, |
| pitch=args.pitch, |
| hop_length=args.hop_length, |
| filter_radius=args.filter_radius, |
| rms=args.rms, |
| protect=args.protect, |
| f0_min=args.f0_min, |
| f0_max=args.f0_max |
| ) |
| |
| |
| |
| ''' |
|
|
| with open(os.sep.join([current_dir, dirs[2], "vbach.py"]), 'w') as f: |
| f.write(VBACH_CLI) |
|
|
| def set_language(lang): |
| global CURRENT_LANG |
| CURRENT_LANG = lang |
|
|
|
|
| def t(key, **kwargs): |
| translation = TRANSLATIONS[CURRENT_LANG].get(key, key) |
| if isinstance(translation, dict): |
| return translation |
| return translation.format(**kwargs) if kwargs else translation |
|
|
| def download_file(url, zip_name, progress): |
| try: |
| if "drive.google.com" in url: |
| progress(0.5, desc=t('downloading_google')) |
| download_from_google_drive(url, zip_name, progress) |
| elif "huggingface.co" in url: |
| progress(0.5, desc=t('downloading_huggingface')) |
| download_from_huggingface(url, zip_name, progress) |
| elif "pixeldrain.com" in url: |
| progress(0.5, desc=t('downloading_pixeldrain')) |
| download_from_pixeldrain(url, zip_name, progress) |
| elif "mega.nz" in url: |
| print(t('mega_unsupported')) |
| elif "disk.yandex.ru" in url or "yadi.sk" in url: |
| progress(0.5, desc=t('downloading_yandex')) |
| download_from_yandex(url, zip_name, progress) |
| else: |
| raise ValueError(t('unsupported_source', url=url)) |
| except Exception as e: |
| raise gr.Error(t('download_error', error=str(e))) |
|
|
| def download_from_google_drive(url, zip_name, progress): |
| file_id = ( |
| url.split("file/d/")[1].split("/")[0] |
| if "file/d/" in url |
| else url.split("id=")[1].split("&")[0] |
| ) |
| gdown.download(id=file_id, output=str(zip_name), quiet=False) |
|
|
| def download_from_huggingface(url, zip_name, progress): |
| urllib.request.urlretrieve(url, zip_name) |
|
|
| def download_from_pixeldrain(url, zip_name, progress): |
| file_id = url.split("pixeldrain.com/u/")[1] |
| response = requests.get(f"https://pixeldrain.com/api/file/{file_id}") |
| with open(zip_name, "wb") as f: |
| f.write(response.content) |
|
|
| def download_from_yandex(url, zip_name, progress): |
| yandex_public_key = f"download?public_key={url}" |
| yandex_api_url = f"https://cloud-api.yandex.net/v1/disk/public/resources/{yandex_public_key}" |
| response = requests.get(yandex_api_url) |
| if response.status_code == 200: |
| download_link = response.json().get("href") |
| urllib.request.urlretrieve(download_link, zip_name) |
| else: |
| raise gr.Error(t('yandex_api_error', status=response.status_code)) |
|
|
| def extract_zip(extraction_folder, zip_name): |
| os.makedirs(extraction_folder, exist_ok=True) |
| with zipfile.ZipFile(zip_name, "r") as zip_ref: |
| zip_ref.extractall(extraction_folder) |
| os.remove(zip_name) |
|
|
| index_filepath, model_filepath = None, None |
| for root, _, files in os.walk(extraction_folder): |
| for name in files: |
| file_path = os.path.join(root, name) |
| if name.endswith(".index") and os.stat(file_path).st_size > 1024 * 100: |
| index_filepath = file_path |
| if name.endswith(".pth") and os.stat(file_path).st_size > 1024 * 1024 * 40: |
| model_filepath = file_path |
|
|
| if not model_filepath: |
| raise gr.Error(t('pth_not_found', folder=extraction_folder)) |
|
|
| rename_and_cleanup(extraction_folder, model_filepath, index_filepath) |
|
|
| def rename_and_cleanup(extraction_folder, model_filepath, index_filepath): |
| os.rename( |
| model_filepath, |
| os.path.join(extraction_folder, os.path.basename(model_filepath)), |
| ) |
| if index_filepath: |
| os.rename( |
| index_filepath, |
| os.path.join(extraction_folder, os.path.basename(index_filepath)), |
| ) |
|
|
| for filepath in os.listdir(extraction_folder): |
| full_path = os.path.join(extraction_folder, filepath) |
| if os.path.isdir(full_path): |
| shutil.rmtree(full_path) |
|
|
| def download_from_url(url, dir_name, progress=gr.Progress()): |
| try: |
| progress(0, desc=t('downloading_model', dir_name=dir_name)) |
| zip_name = os.path.join(dirs[0], dir_name + ".zip") |
| extraction_folder = os.path.join(current_dir, dirs[0], dir_name) |
| |
| if os.path.exists(extraction_folder): |
| raise gr.Error(t('model_exists', dir_name=dir_name)) |
|
|
| download_file(url, zip_name, progress) |
| progress(0.8, desc=t('unpacking_zip')) |
| extract_zip(extraction_folder, zip_name) |
| return t('model_uploaded', dir_name=dir_name) |
| except Exception as e: |
| raise gr.Error(t('model_load_error', error=str(e))) |
|
|
| def upload_zip_file(zip_path, dir_name, progress=gr.Progress()): |
| try: |
| extraction_folder = os.path.join(current_dir, dirs[0], dir_name) |
| if os.path.exists(extraction_folder): |
| raise gr.Error(t('model_exists', dir_name=dir_name)) |
|
|
| zip_name = zip_path.name |
| progress(0.8, desc=t('unpacking_zip')) |
| extract_zip(extraction_folder, zip_name) |
| return t('model_uploaded', dir_name=dir_name) |
| except Exception as e: |
| raise gr.Error(t('model_load_error', error=str(e))) |
|
|
| def upload_separate_files(pth_file, index_file, dir_name, progress=gr.Progress()): |
| try: |
| extraction_folder = os.path.join(current_dir, dirs[0], dir_name) |
| if os.path.exists(extraction_folder): |
| raise gr.Error(t('model_exists', dir_name=dir_name)) |
|
|
| os.makedirs(extraction_folder, exist_ok=True) |
|
|
| if pth_file: |
| pth_path = os.path.join(extraction_folder, os.path.basename(pth_file.name)) |
| shutil.copyfile(pth_file.name, pth_path) |
|
|
| if index_file: |
| index_path = os.path.join(extraction_folder, os.path.basename(index_file.name)) |
| shutil.copyfile(index_file.name, index_path) |
| |
| return t('model_uploaded', dir_name=dir_name) |
| except Exception as e: |
| raise gr.Error(t('model_load_error', error=str(e))) |
|
|
| def delete_model_name(dir_name): |
| model_dir = os.path.join(current_dir, dirs[0], dir_name) |
| if os.path.exists(model_dir): |
| try: |
| if os.path.isdir(model_dir): |
| shutil.rmtree(model_dir) |
| return t('model_deleted', dir_name=dir_name) |
| except Exception as e: |
| raise gr.Error(t('model_delete_error', error=str(e))) |
| else: |
| return t('model_not_found', dir_name=dir_name) |
|
|
| from vbach.cli.vbach import voice_conversion |
|
|
| def process_audio( |
| input_file: str = None, |
| input_list: str = None, |
| template: str = "NAME_MODEL_F0METHOD_PITCH", |
| model_name: str = "", |
| index_rate: float = 0, |
| output_format: str = "wav", |
| output_bitrate: int = 320, |
| stereo_mode: str = "mono", |
| method_pitch: str = "rmvpe+", |
| pitch: float = 0, |
| hop_length: int = 128, |
| filter_radius: int = 3, |
| rms: float = 0.25, |
| protect: float = 0.33, |
| f0_min: int = 50, |
| f0_max: int = 1100 |
| ): |
|
|
| keys = ["NAME", "PITCH", "F0_METHOD", "DATETIME", "MODEL"] |
|
|
| if any(key in template for key in keys): |
| pass |
| else: |
| template = "DATETIME_Vbach_F0METHOD_PITCH" |
|
|
| if not isinstance(input_list, list) and not input_file: |
| try: |
| print(input_list) |
| input_list = ast.literal_eval(input_list) |
| except Exception as e: |
| print(e) |
| gr.Warning(t("error_strlist_is_not_list")) |
| return None |
|
|
| if input_file is not None: |
| try: |
| print(input_file) |
| input_list = ast.literal_eval(input_file) |
| gr.Warning(t("error_path_is_list")) |
| return None |
| except Exception as e: |
| pass |
|
|
|
|
| output_bitrate = f"{output_bitrate}k" |
| if not input_file and not input_list: |
| raise gr.Error(t("error_no_audio")) |
| if not model_name: |
| raise gr.Error(t("error_no_model")) |
| if input_file is not None and isinstance(input_file, str) and input_list == None: |
| if not os.path.exists(input_file): |
| gr.Warning(t("warning_file_not_found", file=input_file)) |
| return None |
|
|
| file_name = os.path.basename(input_file) |
| namefile = os.path.splitext(file_name)[0] |
| time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S") |
| output_name = template |
| output_dir = tempfile.mkdtemp(prefix="converted_voice_") |
| print(output_dir) |
| output_name = ( |
| template |
| .replace("DATETIME", time_create_file) |
| .replace("NAME", namefile) |
| .replace("MODEL", model_name) |
| .replace("F0METHOD", method_pitch) |
| .replace("PITCH", f"{pitch}") |
| ) |
| output_path = os.path.join(output_dir, f"{output_name}.{output_format}") |
| try: |
| output_path = voice_conversion( |
| model_name, |
| input_file, |
| output_path, |
| pitch, |
| method_pitch, |
| index_rate, |
| filter_radius, |
| rms, |
| protect, |
| hop_length, |
| f0_min, |
| f0_max, |
| output_format, |
| output_bitrate, |
| stereo_mode |
| ) |
| except Exception as e: |
| print(e) |
| finally: |
| print(t("success_single")) |
| return output_path |
| |
| if input_file is None and input_list is not None and isinstance(input_list, list): |
| output_dir = tempfile.mkdtemp(prefix="converted_voice_") |
| print(output_dir) |
| output_paths = [] |
| progress = gr.Progress() |
| for i, file in enumerate(input_list): |
| |
| if not os.path.exists(file): |
| gr.Warning(t("warning_file_not_found", file=file)) |
| continue |
| |
| total_steps = len(input_list) |
| file_name = os.path.basename(file) |
| namefile = os.path.splitext(file_name)[0] |
| time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S") |
| progress( |
| (i+1, total_steps), |
| desc=t("processing", namefile=namefile), |
| unit=t("files") |
| ) |
| output_name = ( |
| template |
| .replace("DATETIME", time_create_file) |
| .replace("NAME", namefile) |
| .replace("MODEL", model_name) |
| .replace("F0METHOD", method_pitch) |
| .replace("PITCH", f"{pitch}") |
| ) |
| output_path = os.path.join(output_dir, f"{output_name}.{output_format}") |
| try: |
| output_path = voice_conversion( |
| model_name, |
| file, |
| output_path, |
| pitch, |
| method_pitch, |
| index_rate, |
| filter_radius, |
| rms, |
| protect, |
| hop_length, |
| f0_min, |
| f0_max, |
| output_format, |
| output_bitrate, |
| stereo_mode |
| ) |
| except Exception as e: |
| print(e) |
|
|
| finally: |
| output_paths.append(output_path) |
| print(t("success_batch")) |
| return output_paths |
|
|
| def vbach_plugin_name(): |
| return "VBach" |
|
|
| def vbach_plugin(lang="ru"): |
| set_language(lang) |
|
|
| with gr.TabItem(t("inference")): |
| with gr.Column(): |
| with gr.Column(scale=3) as input_voice_group: |
| with gr.Group() as single_voice_file: |
| input_voice = gr.Audio(label=t("select_file"), interactive=True, type="filepath") |
| batch_upload_btn = gr.Button(t("batch_upload")) |
| with gr.Group(visible=False) as batch_voice_file: |
| input_voices = gr.Files(type="filepath", interactive=True, show_label=False) |
| single_upload_btn = gr.Button(t("single_upload")) |
| input_voice_path = gr.Textbox(label=t("audio_path"), info=t("audio_path_info"), interactive=True) |
| input_voice.upload(fn=(lambda x: gr.update(value=x)), inputs=input_voice, outputs=input_voice_path) |
| input_voices.upload(fn=(lambda x: gr.update(value=str(x))), inputs=input_voices, outputs=input_voice_path) |
| with gr.Column(): |
| with gr.Row(equal_height=True): |
| model_name = gr.Dropdown(label=t("model_name"), choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))], interactive=True, filterable=False, scale=6) |
| model_update_btn = gr.Button(t("update_button"), variant="primary", scale=3, size="lg") |
| model_update_btn.click(fn=(lambda : gr.update(choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))])), inputs=None, outputs=model_name) |
| with gr.Row(): |
| method_pitch = gr.Dropdown(label=t("pitch_method"), choices=["mangio-crepe", "rmvpe+", "fcpe"], value="rmvpe+", interactive=True, filterable=False) |
| hop_length = gr.Slider(minimum=2, maximum=512, step=1, value=128, label=t("hop_length"), interactive=True, visible=False) |
| with gr.Row(): |
| pitch = gr.Slider(minimum=-48, maximum=48, step=12, value=0, label=t("pitch"), interactive=True) |
| with gr.Row(): |
| f0_min = gr.Slider(minimum=50, maximum=3500, step=1, value=50, label=t("f0_min"), interactive=True) |
| f0_max = gr.Slider(minimum=500, maximum=3500, step=1, value=1100, label=t("f0_max"), interactive=True) |
|
|
| with gr.Column(variant="panel"): |
| with gr.Group(): |
| with gr.Row(equal_height=True): |
| with gr.Column(scale=3): |
| stereo_mode = gr.Dropdown( |
| label=t("audio_processing"), |
| choices=list(t("stereo_modes").keys()), |
| value="mono", |
| interactive=True, |
| filterable=False |
| ) |
| output_format = gr.Dropdown(label=t("output_format"), choices=OUTPUT_FORMAT) |
| output_bitrate = gr.Slider(32, 320, step=1, label=t("bitrate"), value=320, interactive=True) |
| with gr.Column(scale=6) as single_output_group: |
| converted_voice = gr.Audio(label=t("converted_voice"), type="filepath", interactive=False, show_download_button=True, elem_classes="fixed-height") |
| with gr.Column(scale=6, visible=False) as batch_output_group: |
| converted_voices = gr.Files(label=t("converted_voices"), type="filepath", interactive=False, height="100%", elem_classes="fixed-height") |
| convert_btn = gr.Button(t("convert_single"), variant="primary", scale=3) |
| convert_batch_btn = gr.Button(t("convert_batch"), variant="primary", visible=False, scale=3) |
|
|
|
|
| with gr.Column(): |
| with gr.Tab(t("name_format")): |
| template_info = gr.Markdown(t("name_format_info"), line_breaks=True) |
| template = gr.Text(label=t("name_format"), value="NAME_MODEL_F0METHOD_PITCH", interactive=True) |
| |
| with gr.Tab(t("advanced_settings")): |
| with gr.Row(): |
| with gr.Column(scale=3): |
| filter_radius = gr.Slider(minimum=0, maximum=7, step=1, value=3, label=t("filter_radius"), interactive=True) |
| index_rate = gr.Slider(minimum=0, maximum=1, step=0.01, value=0, label=t("index_rate"), interactive=True) |
| rms = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.25, label=t("rms"), interactive=True) |
| protect = gr.Slider(minimum=0, maximum=0.5, step=0.01, value=0.33, label=t("protect"), interactive=True) |
| |
| |
| with gr.TabItem(t("model_manager")): |
| with gr.TabItem(t("download_url")): |
| with gr.Row(): |
| with gr.Column(variant="panel"): |
| gr.HTML(f"<center><h3>{t('download_link')}</h3></center>") |
| model_zip_link = gr.Text(label=t("download_link")) |
| with gr.Group(): |
| zip_model_name = gr.Text( |
| label=t("model_name"), |
| info=t("unique_name"), |
| ) |
| download_btn = gr.Button(t("download_button"), variant="primary") |
|
|
| gr.HTML( |
| f"<h3>{t('supported_sites')}: " |
| "<a href='https://huggingface.co/' target='_blank'>HuggingFace</a>, " |
| "<a href='https://pixeldrain.com/' target='_blank'>Pixeldrain</a>, " |
| "<a href='https://drive.google.com/' target='_blank'>Google Drive</a>, " |
| "<a href='https://disk.yandex.ru/' target='_blank'>Яндекс Диск</a>" |
| "</h3>" |
| ) |
|
|
| dl_output_message = gr.Text(label=t("output_message"), interactive=False) |
| download_btn.click( |
| download_from_url, |
| inputs=[model_zip_link, zip_model_name], |
| outputs=dl_output_message, |
| ) |
|
|
| with gr.Tab(t("download_zip")): |
| with gr.Row(): |
| with gr.Column(): |
| zip_file = gr.File( |
| label=t("zip_file"), file_types=[".zip"], file_count="single" |
| ) |
| with gr.Column(variant="panel"): |
| gr.HTML(t("upload_steps")) |
| with gr.Group(): |
| local_model_name = gr.Text( |
| label=t("model_name"), |
| info=t("unique_name"), |
| ) |
| model_upload_button = gr.Button(t("download_button"), variant="primary") |
|
|
| local_upload_output_message = gr.Text(label=t("output_message"), interactive=False) |
| model_upload_button.click( |
| upload_zip_file, |
| inputs=[zip_file, local_model_name], |
| outputs=local_upload_output_message, |
| ) |
|
|
| with gr.TabItem(t("download_files")): |
| with gr.Group(): |
| with gr.Row(): |
| pth_file = gr.File( |
| label=t("pth_file"), file_types=[".pth"], file_count="single" |
| ) |
| index_file = gr.File( |
| label=t("index_file"), file_types=[".index"], file_count="single" |
| ) |
| with gr.Column(variant="panel"): |
| with gr.Group(): |
| separate_model_name = gr.Text( |
| label=t("model_name"), |
| info=t("unique_name"), |
| ) |
| separate_upload_button = gr.Button(t("download_button"), variant="primary") |
|
|
| separate_upload_output_message = gr.Text( |
| label=t("output_message"), interactive=False |
| ) |
| separate_upload_button.click( |
| upload_separate_files, |
| inputs=[pth_file, index_file, separate_model_name], |
| outputs=separate_upload_output_message, |
| ) |
|
|
| with gr.TabItem(t("delete_model")): |
| with gr.Column(variant="panel"): |
| with gr.Group(): |
| delete_voicemodel_name = gr.Dropdown( |
| label=t("model_name"), |
| info=t("delete_info"), |
| choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))], |
| interactive=True, |
| filterable=False |
| ) |
| refresh_delete_btn = gr.Button(t("refresh_button")) |
| refresh_delete_btn.click(fn=(lambda : gr.update(choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))])), inputs=None, outputs=delete_voicemodel_name) |
| delete_model_output_message = gr.Text( |
| label=t("output_message"), interactive=False |
| ) |
| delete_model_btn = gr.Button(t("delete_button")) |
| delete_model_btn.click( |
| fn=delete_model_name, |
| inputs=delete_voicemodel_name, |
| outputs=delete_model_output_message |
| ) |
|
|
|
|
| method_pitch.change(fn=lambda x: gr.update(visible=True if x == "mangio-crepe" else False), inputs=method_pitch, outputs=hop_length) |
| batch_upload_btn.click(fn=(lambda : (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True))), inputs=None, outputs=[single_voice_file, batch_voice_file, single_output_group, batch_output_group, convert_btn, convert_batch_btn]) |
| single_upload_btn.click(fn=(lambda : (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True))), inputs=None, outputs=[batch_voice_file, single_voice_file, batch_output_group, single_output_group, convert_batch_btn, convert_btn]) |
| convert_btn.click(fn=process_audio, inputs=[input_voice_path, gr.State(None), template, model_name, index_rate, output_format, output_bitrate, stereo_mode, method_pitch, pitch, hop_length, filter_radius, rms, protect, f0_min, f0_max], outputs=converted_voice) |
| convert_batch_btn.click(fn=process_audio, inputs=[gr.State(None), input_voice_path, template, model_name, index_rate, output_format, output_bitrate, stereo_mode, method_pitch, pitch, hop_length, filter_radius, rms, protect, f0_min, f0_max], outputs=converted_voices) |