import os import sys import requests import urllib.request import zipfile import shutil import gradio as gr import tempfile from datetime import datetime import logging logging.basicConfig(level=logging.WARNING) # Устанавливаем уровень логирования на WARNING или выше OUTPUT_FORMAT = ["mp3", "wav", "flac", "aiff", "m4a", "aac", "ogg", "opus"] current_dir = os.getcwd() add_requirements = [ "torchcrepe==0.0.23", "praat-parselmouth==0.4.5", "faiss-cpu==1.11", "https://github.com/noblebarkrr/mvsepless/raw/refs/heads/beta/fixed/fairseq-0.12.3-cp312-cp312-linux_x86_64.whl", "local-attention==1.11.1", "tenacity==9.1.2", "gdown" ] CURRENT_LANG = "ru" RMVPE_PATH = os.path.join(current_dir, "vbach", "models", "predictors", "rmvpe.pt") FCPE_PATH = os.path.join(current_dir, "vbach", "models", "predictors", "fcpe.pt") RVC_MODELS_DIR = os.path.join(current_dir, "voice_models") HUBERT_MODEL_PATH = os.path.join( current_dir, "vbach", "models", "embedders", "hubert_base.pt" ) TRANSLATIONS = { "ru": { "app_title": "VBach", "inference": "Инференс", "select_file": "Выберите файл", "audio_path": "Путь к файлу", "audio_path_info": "Здесь можно ввести путь к файлу/список путей к файлам , либо загрузить его/их выше и получить путь к нему/их список", "audio_processing": "Режим обработки аудио", "output_format": "Формат вывода", "name_format": "Шаблон", "name_format_info": """ > Доступные ключи для формата имени вывода: > (изменить формат имени вывода можно здесь) > * **NAME** - Имя входного файла > * **DATETIME** - Дата и время создания результатов > * **MODEL** - Имя голосовой модели > * **F0METHOD** - Метод извлечения тона > * **PITCH** - Высота тона > Пример: > * **Шаблон:** NAME_MODEL_F0METHOD_PITCH > * **Результат:** name_your-model_rmvpe+_12
Используйте ТОЛЬКО указанные ключи (NAME, DATETIME, MODEL, F0METHOD, PITCH) для избежания повреждения файла. НЕ добавляйте дополнительный текст или символы вне этих ключей, либо делайте это с осторожностью.
""", "convert_single": "Конвертировать один", "convert_batch": "Конвертировать несколько", "model_name": "Имя модели", "pitch_method": "Метод извлечения тона", "pitch": "Высота тона", "pitch_step": "Менять только октавы", "hop_length": "Длина шага", "hop_length_info": "Меньшие значения приводят к более длительным преобразованиям, что увеличивает риск появления артефактов в голосе, однако при этом достигается более точная передача тона.", "bitrate": "Битрейт (Кбит/сек)", "f0_min": "Минимальный диапазон тона", "f0_min_info": "Определяет нижнюю границу диапазона тона, который алгоритм будет использовать для определения основной частоты (F0) в аудиосигнале.", "f0_max": "Максимальный диапазон тона", "f0_max_info": "Определяет верхнюю границу диапазона тона, который алгоритм будет использовать для определения основной частоты (F0) в аудиосигнале.", "conversion_settings": "Настройки преобразования", "standart_settings": "Стандартные настройки", "advanced_settings": "Дополнительные настройки", "export_settings": "Настройки экспорта", "filter_radius": "Радиус фильтра", "filter_radius_info": "Если это число больше или равно трем, использование медианной фильтрации по собранным результатам тона может привести к снижению дыхания..", "index_rate": "Влияние индекса", "index_info": "Влияние, оказываемое индексным файлом;\nЧем выше значение, тем больше влияние.\nОднако выбор более низких значений может помочь смягчить артефакты, присутствующие в аудио.", "rms": "Огибающая громкости", "rms_info": "Заменить или смешать с огибающей громкости выходного сигнала.\nЧем ближе значение к 1, тем больше используется огибающая выходного сигнала.", "protect": "Защита согласных", "protect_info": "Защитить согласные и звуки дыхания, чтобы избежать электроакустических разрывов и артефактов.\nМаксимальное значение параметра 0.5 обеспечивает полную защиту.\nУменьшение этого значения может снизить защиту, но уменьшить эффект индексирования.", "model_manager": "Менеджер моделей", "download_url": "Загрузить архив по ссылке", "download_zip": "Загрузить ZIP архивом", "download_files": "Загрузить файлами", "delete_model": "Удалить модель", "download_link": "Ссылка на загрузку модели", "unique_name": "Дайте вашей загружаемой модели уникальное имя, отличное от других голосовых моделей.", "download_button": "Загрузить модель", "download_tab": "Загрузить по ссылке", "download_pth_link": "Ссылка на *.pth файл", "download_index_link": "Ссылка на *.index файл", "download_files_link": "Ссылка на загрузку файлов модели", "download_url_pth": "Загрузить файлы по ссылке", "supported_sites": "Поддерживаемые сайты", "output_message": "Сообщение вывода", "zip_file": "Zip-файл", "upload_steps": "

1. Найдите и скачайте файлы: .pth и необязательный файл .index

2. Закиньте файл(-ы) в ZIP-архив и поместите его в область загрузки

3. Дождитесь полной загрузки ZIP-архива в интерфейс

", "pth_file": "pth-файл", "index_file": "index-файл", "delete_info": "Выберите модель, которую надо удалить", "refresh_button": "Обновить список моделей", "delete_button": "Удалить модель", "batch_upload": "Пакетная загрузка", "single_upload": "Одиночная загрузка", "converted_voice": "Преобразованный вокал", "converted_voices": "Преобразованные вокалы", "update_button": "Обновить", "processing": "Сейчас обрабатывается - {namefile}", "files": "файлов", "error_no_audio": "Не удалось найти аудиофайл(ы). Убедитесь, что файл загрузился или проверьте правильность пути к нему.", "error_no_model": "Выберите модель голоса для преобразования голоса", "warning_file_not_found": "Файл {file} не найден.", "success_single": "Вокал успешно преобразован", "success_batch": "Вокалы успешно преобразованы", "language": "Язык", "stereo_modes": { "mono": "Моно", "left/right": "Левый/Правый", "sim/dif": "Сходство/Различия" }, "stereo_mode_info": "mono - монофоническая обработка аудио, \nleft/right - обработка левого и правого каналов отдельно, \nsim/dif - обработка фантомного центра и стерео-базы, разделенную на левый и правый каналы", 'downloading_google': "[~] Загрузка модели с Google Drive...", 'downloading_huggingface': "[~] Загрузка модели с HuggingFace...", 'downloading_pixeldrain': "[~] Загрузка модели с Pixeldrain...", 'downloading_yandex': "[~] Загрузка модели с Яндекс Диска...", 'downloading_model': "[~] Загрузка голосовой модели {dir_name}...", 'unpacking_zip': "[~] Распаковка zip-файла...", # Уведомления об ошибках 'unsupported_source': "Неподдерживаемый источник: {url}", 'download_error': "Ошибка при скачивании: {error}", 'yandex_api_error': "Ошибка при получении ссылки с Яндекс Диска: {status}", 'pth_not_found': "Не найден файл модели .pth в распакованном zip-файле. Проверьте содержимое в {folder}.", 'model_exists': "Директория голосовой модели {dir_name} уже существует! Выберите другое имя.", 'model_load_error': "Ошибка при загрузке модели: {error}", 'model_delete_error': "Ошибка при удалении модели: {error}", # Статус операции 'mega_unsupported': "Mega не поддерживается!", 'model_uploaded': "[+] Модель {dir_name} успешно загружена!", 'model_deleted': "[-] Модель {dir_name} успешно удалена!", 'model_not_found': "[-] Модели {dir_name} не существует", "error_strlist_is_not_list": "Эта строка не является списком файлов", "error_path_is_list": "Путь к файлу является списком" }, "en": { "app_title": "VBach", "inference": "Inference", "select_file": "Select File", "audio_path": "Audio path", "audio_path_info": "You can enter a file path or a list of file paths here, or upload the file(s) above to obtain their path(s)", "audio_processing": "Audio Processing Mode", "output_format": "Output Format", "name_format": "Template", "name_format_info": """ > Available keys for the output name format: > (you can change the output name format here) > * **NAME** - Name of the input file > * **DATETIME** - Date and time of result creation > * **MODEL** - Name of the voice model > * **F0METHOD** - Pitch extraction method > * **PITCH** - Pitch value > Example: > * **Template:** NAME_MODEL_F0METHOD_PITCH > * **Result:** name_your-model_rmvpe+_12
Use ONLY the specified keys (NAME, DATETIME, MODEL, F0METHOD, PITCH) to avoid file corruption. Do NOT add additional text or characters outside of these keys, or do so with caution.
""", "convert_single": "Convert Single", "convert_batch": "Convert Batch", "model_name": "Model Name", "pitch_method": "Pitch Extraction Method", "pitch": "Pitch", "pitch_step": "Change Only Octaves", "hop_length": "Hop Length", "hop_length_info": "Smaller values lead to longer conversion, which increases the risk of artifacts in the voice, but achieves more accurate tone transmission.", "bitrate": "Bitrate (Kbit/sec)", "f0_min": "F0 Min", "f0_min_info": "Defines the lower limit of the pitch range that the algorithm will use to determine the fundamental frequency (F0) in the audio signal.", "f0_max": "F0 Max", "f0_max_info": "Defines the upper limit of the pitch range that the algorithm will use to determine the fundamental frequency (F0) in the audio signal.", "conversion_settings": "Conversion Settings", "standart_settings": "Standart Settings", "advanced_settings": "Advanced Settings", "export_settings": "Export Settings", "filter_radius": "Filter Radius", "filter_radius_info": "If this number is greater than or equal to three, using median filtering on the collected pitch results may lead to reduced breathiness.", "index_rate": "Index Rate", "index_info": "Controls the influence of the index file on the result;\nThe higher the value, the greater the influence.\nHowever, choosing lower values may help mitigate artifacts present in the audio.", "rms": "RMS Envelope", "rms_info": "Replace or mix with the output signal's RMS envelope.\nThe closer the value is to 1, the more the output signal's envelope is used.", "protect": "Consonant Protection", "protect_info": "Protect consonants and breath sounds to avoid electroacoustic breaks and artifacts.\nA maximum value of 0.5 provides full protection.\nReducing this value may decrease protection but reduce the indexing effect.", "model_manager": "Model Manager", "download_url": "Download ZIP", "download_zip": "Upload ZIP Archive", "download_files": "Upload Files", "delete_model": "Delete Model", "download_link": "Model Download Link", "unique_name": "Give your model a unique name different from other voice models.", "download_button": "Download Model", "download_tab": "Download by URL", "download_pth_link": "*.pth File Link", "download_index_link": "*.index File Link", "download_files_link": "Model Files Download link", "download_url_pth": "Download files", "supported_sites": "Supported Sites", "output_message": "Output Message", "zip_file": "Zip File", "upload_steps": "

1. Find and download files: .pth and optional .index

2. Put file(s) in a ZIP archive and upload it

3. Wait for the ZIP archive to be fully uploaded

", "pth_file": "PTH File", "index_file": "Index File", "delete_info": "Select the model to delete", "refresh_button": "Refresh Model List", "delete_button": "Delete Model", "batch_upload": "Batch Upload", "single_upload": "Single Upload", "converted_voice": "Converted Voice", "converted_voices": "Converted Voices", "update_button": "Refresh", "processing": "Processing - {namefile}", "files": "files", "error_no_audio": "Could not find audio file(s). Make sure the file is uploaded or check the file path.", "error_no_model": "Select a voice model for voice conversion", "warning_file_not_found": "File {file} not found.", "success_single": "Voice successfully converted", "success_batch": "Voices successfully converted", "language": "Language", "stereo_modes": { "mono": "Mono", "left/right": "Left/Right", "sim/dif": "Similarity/Difference" }, "stereo_mode_info": "mono - mono audio processing, \nleft/right - separate processing of left and right channels, \nsim/dif - processing of phantom center and stereo base, split into left and right channels", 'downloading_google': "[~] Downloading model from Google Drive...", 'downloading_huggingface': "[~] Downloading model from HuggingFace...", 'downloading_pixeldrain': "[~] Downloading model from Pixeldrain...", 'downloading_yandex': "[~] Downloading model from Yandex Disk...", 'downloading_model': "[~] Downloading voice model {dir_name}...", 'unpacking_zip': "[~] Unpacking zip file...", # Error messages 'unsupported_source': "Unsupported source: {url}", 'download_error': "Download error: {error}", 'yandex_api_error': "Yandex Disk API error: {status}", 'pth_not_found': "Model .pth file not found in unzipped archive. Check contents in {folder}.", 'model_exists': "Voice model directory {dir_name} already exists! Choose another name.", 'model_load_error': "Error loading model: {error}", 'model_delete_error': "Error deleting model: {error}", # Operation status 'mega_unsupported': "Mega is not supported!", 'model_uploaded': "[+] Model {dir_name} uploaded successfully!", 'model_deleted': "[-] Model {dir_name} deleted successfully!", 'model_not_found': "[-] Model {dir_name} does not exist", "error_strlist_is_not_list": "This string is not a file list", "error_path_is_list": "The file path is a list" } } def set_language(lang): global CURRENT_LANG CURRENT_LANG = lang def t(key, **kwargs): translation = TRANSLATIONS[CURRENT_LANG].get(key, key) if isinstance(translation, dict): return translation return translation.format(**kwargs) if kwargs else translation downloadable_model_paths = [["https://huggingface.co/Politrees/RVC_resources/resolve/main/predictors/rmvpe.pt", RMVPE_PATH], ["https://huggingface.co/Politrees/RVC_resources/resolve/main/predictors/fcpe.pt", FCPE_PATH], ["https://huggingface.co/Politrees/RVC_resources/resolve/main/embedders/hubert_base.pt", HUBERT_MODEL_PATH]] def normalize_path(path): '''Приводит путь к кросс-платформенному формату''' return path.replace('\\', '/') def recreate_structure(base_dir='.'): # Нормализуем базовый путь base_dir = normalize_path(base_dir) # Создаем папки folders = ['vbach', 'vbach/cli', 'vbach/infer', 'vbach/lib', 'vbach/lib/algorithm', 'vbach/lib/predictors', 'vbach/models', 'vbach/models/embedders', 'vbach/models/predictors', 'vbach/utils', 'voice_models'] for folder in folders: # Нормализуем путь и объединяем с базовым folder = normalize_path(folder) path = os.path.join(base_dir, folder) # Заменяем оставшиесе разделители на системные path = os.path.normpath(path) os.makedirs(path, exist_ok=True) print(f"Created directory: {path}") # Создаем файлы files = {'vbach/cli/vbach.py': '\nimport gc\nimport os\nimport datetime\nimport gradio as gr\nimport torch\nimport librosa\nimport tempfile\nfrom datetime import datetime\nimport argparse\nfrom vbach.infer.infer import Config, load_hubert, get_vc, rvc_infer\n\n# Константы\n\nRVC_MODELS_DIR = os.path.join(os.getcwd(), "voice_models")\nHUBERT_MODEL_PATH = os.path.join(\n os.getcwd(), "vbach", "models", "embedders", "hubert_base.pt"\n)\nOUTPUT_FORMAT = ["mp3", "wav", "flac", "aiff", "m4a", "aac", "ogg", "opus"]\n\naudio_extensions = {".mp3", ".wav", ".flac", ".aiff", ".m4a", ".aac", ".ogg", ".opus"}\n \n\n# Важные функции\n\ndef load_rvc_model(voice_model):\n model_dir = os.path.join(RVC_MODELS_DIR, voice_model)\n model_files = os.listdir(model_dir)\n rvc_model_path = next(\n (os.path.join(model_dir, f) for f in model_files if f.endswith(".pth")), None\n )\n rvc_index_path = next(\n (os.path.join(model_dir, f) for f in model_files if f.endswith(".index")), None\n )\n\n if not rvc_model_path:\n raise ValueError(\n f"\x1b[91mМодели {voice_model} не существует. "\n "Возможно, вы неправильно ввели имя.\x1b[0m"\n )\n\n return rvc_model_path, rvc_index_path\n\ndef voice_conversion(\n voice_model,\n vocals_path,\n output_path,\n pitch,\n f0_method,\n index_rate,\n filter_radius,\n volume_envelope,\n protect,\n hop_length,\n f0_min,\n f0_max,\n format_output,\n output_bitrate,\n stereo_mode\n):\n rvc_model_path, rvc_index_path = load_rvc_model(voice_model)\n\n config = Config()\n hubert_model = load_hubert(config.device, config.is_half, HUBERT_MODEL_PATH)\n cpt, version, net_g, tgt_sr, vc = get_vc(\n config.device, config.is_half, config, rvc_model_path\n )\n\n output_audio = rvc_infer(\n rvc_index_path,\n index_rate,\n vocals_path,\n output_path,\n pitch,\n f0_method,\n cpt,\n version,\n net_g,\n filter_radius,\n tgt_sr,\n volume_envelope,\n protect,\n hop_length,\n vc,\n hubert_model,\n f0_min,\n f0_max,\n format_output,\n output_bitrate,\n stereo_mode\n )\n\n del hubert_model, cpt, net_g, vc\n gc.collect()\n torch.cuda.empty_cache()\n return output_audio\n\ndef cli_conversion(input_audios, template="NAME_MODEL_F0METHOD_PITCH", output_dir="output", model_name="", index_rate=0, output_format="wav", stereo_mode="mono", method_pitch="rmvpe+", pitch=0, hop_length=128, filter_radius=3, rms=0.25, protect=0.33, f0_min=50, f0_max=1100):\n if not input_audios:\n raise ValueError(\n "Не удалось найти аудиофайл(ы). "\n "Убедитесь, что файл загрузился или проверьте правильность пути к нему."\n )\n if not model_name:\n raise ValueError("Выберите модель голоса для преобразования.")\n if not os.path.exists(input_audios):\n raise ValueError(f"Файл {input_audios} не найден.")\n\n if not os.path.exists(input_audios):\n raise FileNotFoundError(f"Ошибка: \'{input_audios}\' не существует.")\n\n os.makedirs(output_dir, exist_ok=True)\n\n if os.path.isfile(input_audios):\n # Проверяем, является ли файл аудио\n ext = os.path.splitext(input_audios)[1].lower()\n if ext not in audio_extensions:\n raise ValueError(f"Ошибка: \'{input_audios}\' не является аудиофайлом (допустимые расширения: {audio_extensions}).")\n print(f"Найден аудиофайл: {input_audios}")\n\n try:\n file_name = os.path.basename(input_audios)\n namefile = os.path.splitext(file_name)[0]\n time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S")\n output_name = template\n output_path = os.path.join(output_dir, f"{output_name}.{output_format}")\n voice_conversion(model_name, input_audios, output_path, pitch, method_pitch, index_rate, filter_radius, rms, protect, hop_length, f0_min, f0_max, output_format, "320k", stereo_mode)\n finally:\n print("Вокал успешно преобразован") \n \n elif os.path.isdir(input_audios):\n # Ищем аудиофайлы в папке\n audio_files = []\n for file in os.listdir(input_audios):\n ext = os.path.splitext(file)[1].lower()\n if ext in audio_extensions:\n audio_files.append(os.path.join(input_audios, file))\n\n if not audio_files:\n raise FileNotFoundError(f"Ошибка: в папке \'{input_audios}\' нет аудиофайлов (допустимые расширения: {audio_extensions}).")\n\n print(f"Найдены аудиофайлы: {audio_files}")\n \n try:\n output_paths = []\n for file in audio_files:\n file_name = os.path.basename(file)\n namefile = os.path.splitext(file_name)[0]\n time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S")\n output_name = (\n template\n .replace("DATETIME", time_create_file)\n .replace("NAME", namefile)\n .replace("MODEL", model_name)\n .replace("F0METHOD", method_pitch)\n .replace("PITCH", f"{pitch}")\n )\n output_path = os.path.join(output_dir, f"{output_name}.{output_format}")\n voice_conversion(model_name, file, output_path, pitch, method_pitch, index_rate, filter_radius, rms, protect, hop_length, 50, 1100, output_format, "320k", stereo_mode)\n output_paths.append(output_path)\n finally:\n print("Вокалы успешно преобразованы") \n else:\n raise ValueError(f"Ошибка: \'{input_audios}\' не является ни файлом, ни папкой.")\n\ndef setup_args():\n parser = argparse.ArgumentParser(description=\'Vbach CLI\')\n \n # Обязательные аргументы\n parser.add_argument(\n \'input_audios\',\n type=str,\n help=\'Путь к аудиофайлу или папке с аудиофайлами для обработки\'\n )\n parser.add_argument(\n \'output_dir\',\n type=str,\n help=\'Папка для сохранения результатов конвертации\'\n )\n parser.add_argument(\n \'model_name\',\n type=str,\n help=\'Название голосовой модели RVC для преобразования\'\n )\n \n # Необязательные аргументы с значениями по умолчанию\n parser.add_argument(\n \'--template\',\n type=str,\n default="NAME_MODEL_F0METHOD_PITCH",\n help=\'Шаблон имени выходного файла (доступные замены: DATETIME, NAME, MODEL, F0METHOD, PITCH)\'\n )\n parser.add_argument(\n \'--index_rate\',\n type=float,\n default=0,\n help=\'Интенсивность использования индексного файла (от 0.0 до 1.0)\',\n metavar=\'[0.0-1.0]\'\n )\n parser.add_argument(\n \'--output_format\',\n type=str,\n default="wav",\n choices=OUTPUT_FORMAT,\n help=\'Формат выходного аудиофайла\'\n )\n parser.add_argument(\n \'--stereo_mode\',\n type=str,\n default="mono",\n choices=["mono", "left/right", "sim/dif"],\n help=\'Режим каналов: моно или стерео\'\n )\n parser.add_argument(\n \'--method_pitch\',\n type=str,\n default="rmvpe+",\n help=\'Метод извлечения pitch (тона)\'\n )\n parser.add_argument(\n \'--pitch\',\n type=int,\n default=0,\n help=\'Корректировка тона в полутонах\'\n )\n parser.add_argument(\n \'--hop_length\',\n type=int,\n default=128,\n help=\'Длина hop (в семплах) для обработки\'\n )\n parser.add_argument(\n \'--filter_radius\',\n type=int,\n default=3,\n help=\'Радиус фильтра для сглаживания\'\n )\n parser.add_argument(\n \'--rms\',\n type=float,\n default=0.25,\n help=\'Масштабирование огибающей громкости (RMS)\'\n )\n parser.add_argument(\n \'--protect\',\n type=float,\n default=0.33,\n help=\'Защита для глухих согласных звуков\'\n )\n parser.add_argument(\n \'--f0_min\',\n type=int,\n default=50,\n help=\'Минимальная частота pitch (F0) в Hz\'\n )\n parser.add_argument(\n \'--f0_max\',\n type=int,\n default=1100,\n help=\'Максимальная частота pitch (F0) в Hz\'\n )\n \n return parser.parse_args()\n\n# Пример использования:\nif __name__ == "__main__":\n args = setup_args()\n cli_conversion(\n input_audios=args.input_audios,\n output_dir=args.output_dir,\n model_name=args.model_name,\n template=args.template,\n index_rate=args.index_rate,\n output_format=args.output_format,\n stereo_mode=args.stereo_mode,\n method_pitch=args.method_pitch,\n pitch=args.pitch,\n hop_length=args.hop_length,\n filter_radius=args.filter_radius,\n rms=args.rms,\n protect=args.protect,\n f0_min=args.f0_min,\n f0_max=args.f0_max\n )\n\n\n\n', 'vbach/infer/infer.py': '\nimport torch\nimport numpy as np\nimport librosa\nfrom multiprocessing import cpu_count\nfrom fairseq import checkpoint_utils\n\nfrom vbach.lib.algorithm.synthesizers import Synthesizer\nfrom .pipeline import VC\n\nfrom separator.audio_writer import write_audio_file\n\nfrom vbach.utils.remove_center import remove_center\n\ndef overlay_mono_on_stereo(mono_audio, stereo_audio, gain=0.5):\n if mono_audio is None or stereo_audio is None:\n raise ValueError("Input audio arrays cannot be None")\n \n # Ensure float32 for processing\n mono_audio = mono_audio.astype(np.float32)\n stereo_audio = stereo_audio.astype(np.float32)\n\n # Convert mono to stereo if needed\n if mono_audio.ndim == 1:\n mono_audio = np.vstack([mono_audio, mono_audio])\n elif mono_audio.shape[0] == 1:\n mono_audio = np.vstack([mono_audio[0], mono_audio[0]])\n\n if mono_audio.shape[0] != 2 or stereo_audio.shape[0] != 2:\n raise ValueError("Shapes must be (2, N)")\n\n min_len = min(mono_audio.shape[1], stereo_audio.shape[1])\n if min_len == 0:\n raise ValueError("Audio arrays cannot be empty")\n\n mono_audio = mono_audio[:, :min_len]\n stereo_audio = stereo_audio[:, :min_len]\n \n result = stereo_audio + mono_audio * gain\n\n # Normalize to prevent clipping\n max_amp = np.max(np.abs(result))\n if max_amp > 0:\n result /= max_amp\n\n # Convert back to int16 for output (if needed)\n result = (result * 32767).astype(np.int16)\n\n return result\n\ndef load_audio(\n file_path: str,\n target_sr: int,\n stereo_mode: str\n) -> np.ndarray:\n """\n Загружает аудиофайл с помощью librosa, обрабатывает и возвращает аудиосигнал\n \n Параметры:\n file_path: Путь к аудиофайлу\n target_sr: Целевая частота дискретизации\n mono: Преобразовать в моно (по умолчанию True)\n normalize: Нормализовать аудио (по умолчанию False)\n duration: Загрузить только указанную длительность (в секундах)\n offset: Начальное смещение для загрузки (в секундах)\n \n Возвращает:\n Аудиоданные в виде numpy array (моно: (samples,), стерео: (channels, samples))\n \n Исключения:\n RuntimeError: При ошибках загрузки или обработки аудио\n """\n try:\n mid, left, right = None, None, None\n \n if stereo_mode == "mono":\n # Загрузка аудио с помощью librosa\n mid_audio, sr = librosa.load(\n file_path,\n sr=None,\n mono=True\n )\n mid_audio = librosa.resample(\n mid_audio, # Исправлено: было audio\n orig_sr=sr, \n target_sr=target_sr\n )\n mid = mid_audio.flatten()\n \n elif stereo_mode == "left/right" or stereo_mode == "sim/dif":\n # Загрузка аудио с помощью librosa\n stereo_audio, sr = librosa.load(\n file_path,\n sr=None,\n mono=False\n )\n\n if stereo_mode == "left/right":\n left_audio = stereo_audio[0] # Исправлено: было [:, 0]\n right_audio = stereo_audio[1] # Исправлено: было [:, 1]\n left_audio = librosa.resample(\n left_audio, \n orig_sr=sr, \n target_sr=target_sr\n )\n right_audio = librosa.resample(\n right_audio, \n orig_sr=sr, \n target_sr=target_sr\n )\n\n left = left_audio.flatten()\n right = right_audio.flatten()\n\n elif stereo_mode == "sim/dif":\n mid_left, mid_right, dif_left, dif_right = remove_center(input_array=stereo_audio, samplerate=sr)\n mid_audio = (mid_left + mid_right) * 0.5\n\n mid_audio = librosa.resample(\n mid_audio, \n orig_sr=sr, \n target_sr=target_sr\n )\n dif_left = librosa.resample(\n dif_left, \n orig_sr=sr, \n target_sr=target_sr\n )\n dif_right = librosa.resample(\n dif_right, \n orig_sr=sr, \n target_sr=target_sr\n )\n\n mid = mid_audio.flatten()\n left = dif_left.flatten() # Исправлено: было left_audio\n right = dif_right.flatten() # Исправлено: было right_audio\n\n return mid, left, right\n \n except Exception as e:\n raise RuntimeError(f"Ошибка загрузки аудио \'{file_path}\': {str(e)}")\n\nclass Config:\n def __init__(self):\n self.device = self.get_device()\n self.is_half = self.device == "cpu"\n self.n_cpu = cpu_count()\n self.gpu_name = None\n self.gpu_mem = None\n self.x_pad, self.x_query, self.x_center, self.x_max = self.device_config()\n\n def get_device(self):\n if torch.cuda.is_available():\n return "cuda"\n elif torch.backends.mps.is_available():\n return "mps"\n else:\n return "cpu"\n\n def device_config(self):\n if torch.cuda.is_available():\n print("Используется устройство CUDA")\n self._configure_gpu()\n elif torch.backends.mps.is_available():\n print("Используется устройство MPS")\n self.device = "mps"\n else:\n print("Используется CPU")\n self.device = "cpu"\n self.is_half = True\n\n x_pad, x_query, x_center, x_max = (\n (3, 10, 60, 65) if self.is_half else (1, 6, 38, 41)\n )\n if self.gpu_mem is not None and self.gpu_mem <= 4:\n x_pad, x_query, x_center, x_max = (1, 5, 30, 32)\n\n return x_pad, x_query, x_center, x_max\n\n def _configure_gpu(self):\n self.gpu_name = torch.cuda.get_device_name(self.device)\n low_end_gpus = ["16", "P40", "P10", "1060", "1070", "1080"]\n if (\n any(gpu in self.gpu_name for gpu in low_end_gpus)\n and "V100" not in self.gpu_name.upper()\n ):\n self.is_half = False\n self.gpu_mem = int(\n torch.cuda.get_device_properties(self.device).total_memory\n / 1024\n / 1024\n / 1024\n + 0.4\n )\n\n# Загрузка модели Hubert\ndef load_hubert(device, is_half, model_path):\n models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task(\n [model_path], suffix=""\n )\n hubert = models[0].to(device)\n hubert = hubert.half() if is_half else hubert.float()\n hubert.eval()\n return hubert\n\n# Получение голосового преобразователя\ndef get_vc(device, is_half, config, model_path):\n cpt = torch.load(model_path, map_location="cpu", weights_only=False)\n if "config" not in cpt or "weight" not in cpt:\n raise ValueError(\n f"Некорректный формат для {model_path}. "\n "Используйте голосовую модель, обученную с использованием RVC v2."\n )\n\n tgt_sr = cpt["config"][-1]\n cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0]\n pitch_guidance = cpt.get("f0", 1)\n version = cpt.get("version", "v1")\n input_dim = 768 if version == "v2" else 256\n\n net_g = Synthesizer(\n *cpt["config"],\n use_f0=pitch_guidance,\n input_dim=input_dim,\n is_half=is_half,\n )\n\n del net_g.enc_q\n print(net_g.load_state_dict(cpt["weight"], strict=False))\n net_g.eval().to(device)\n net_g = net_g.half() if is_half else net_g.float()\n\n vc = VC(tgt_sr, config)\n return cpt, version, net_g, tgt_sr, vc\n\ndef rvc_infer(\n index_path,\n index_rate,\n input_path,\n output_path,\n pitch,\n f0_method,\n cpt,\n version,\n net_g,\n filter_radius,\n tgt_sr,\n volume_envelope,\n protect,\n hop_length,\n vc,\n hubert_model,\n f0_min=50,\n f0_max=1100,\n format_output="wav",\n output_bitrate="320k",\n stereo_mode="mono"\n):\n\n mid, left, right = load_audio(input_path, 16000, stereo_mode)\n pitch_guidance = cpt.get("f0", 1)\n \n if stereo_mode == "mono":\n if mid is None:\n raise ValueError("Mono audio data is None")\n audio_opt = vc.pipeline(\n hubert_model,\n net_g,\n 0,\n mid,\n input_path,\n pitch,\n f0_method,\n index_path,\n index_rate,\n pitch_guidance,\n filter_radius,\n tgt_sr,\n 0,\n volume_envelope,\n version,\n protect,\n hop_length,\n f0_file=None,\n f0_min=f0_min,\n f0_max=f0_max,\n )\n \n elif stereo_mode == "left/right":\n if left is None or right is None:\n raise ValueError("Left or right audio channel is None")\n \n left_audio_opt = vc.pipeline(\n hubert_model,\n net_g,\n 0,\n left,\n input_path,\n pitch,\n f0_method,\n index_path,\n index_rate,\n pitch_guidance,\n filter_radius,\n tgt_sr,\n 0,\n volume_envelope,\n version,\n protect,\n hop_length,\n f0_file=None,\n f0_min=f0_min,\n f0_max=f0_max,\n )\n right_audio_opt = vc.pipeline(\n hubert_model,\n net_g,\n 0,\n right,\n input_path,\n pitch,\n f0_method,\n index_path,\n index_rate,\n pitch_guidance,\n filter_radius,\n tgt_sr,\n 0,\n volume_envelope,\n version,\n protect,\n hop_length,\n f0_file=None,\n f0_min=f0_min,\n f0_max=f0_max,\n )\n\n # Ensure both channels have the same length\n min_len = min(len(left_audio_opt), len(right_audio_opt))\n if min_len == 0:\n raise ValueError("Processed audio is empty")\n\n left_audio_opt = left_audio_opt[:min_len]\n right_audio_opt = right_audio_opt[:min_len]\n\n audio_opt = np.stack((left_audio_opt, right_audio_opt), axis=0)\n\n elif stereo_mode == "sim/dif":\n if mid is None or left is None or right is None:\n raise ValueError("Mid, left or right audio channel is None")\n \n mid_audio_opt = vc.pipeline(\n hubert_model,\n net_g,\n 0,\n mid,\n input_path,\n pitch,\n f0_method,\n index_path,\n index_rate,\n pitch_guidance,\n filter_radius,\n tgt_sr,\n 0,\n volume_envelope,\n version,\n protect,\n hop_length,\n f0_file=None,\n f0_min=f0_min,\n f0_max=f0_max,\n )\n left_audio_opt = vc.pipeline(\n hubert_model,\n net_g,\n 0,\n left,\n input_path,\n pitch,\n f0_method,\n index_path,\n index_rate,\n pitch_guidance,\n filter_radius,\n tgt_sr,\n 0,\n volume_envelope,\n version,\n protect,\n hop_length,\n f0_file=None,\n f0_min=f0_min,\n f0_max=f0_max,\n )\n right_audio_opt = vc.pipeline(\n hubert_model,\n net_g,\n 0,\n right,\n input_path,\n pitch,\n f0_method,\n index_path,\n index_rate,\n pitch_guidance,\n filter_radius,\n tgt_sr,\n 0,\n volume_envelope,\n version,\n protect,\n hop_length,\n f0_file=None,\n f0_min=f0_min,\n f0_max=f0_max,\n )\n\n # Ensure all channels have the same length\n min_len = min(len(mid_audio_opt), len(left_audio_opt), len(right_audio_opt))\n if min_len == 0:\n raise ValueError("Processed audio is empty")\n\n mid_audio_opt = mid_audio_opt[:min_len]\n left_audio_opt = left_audio_opt[:min_len]\n right_audio_opt = right_audio_opt[:min_len]\n\n dif_audio_opt = np.stack((left_audio_opt, right_audio_opt), axis=0)\n \n audio_opt = overlay_mono_on_stereo(mid_audio_opt, dif_audio_opt)\n\n write_audio_file(output_path, audio_opt, tgt_sr, format_output, output_bitrate)\n return output_path\n', 'vbach/infer/pipeline.py': '\nimport os\nimport gc\nimport torch\nimport torch.nn.functional as F\nimport torchcrepe\nimport faiss\nimport librosa\nimport numpy as np\nfrom scipy import signal\n\nfrom vbach.lib.predictors.FCPE import FCPEF0Predictor\nfrom vbach.lib.predictors.RMVPE import RMVPE0Predictor\n\nPREDICTORS_DIR = os.path.join(os.getcwd(), "vbach", "models", "predictors")\nRMVPE_DIR = os.path.join(PREDICTORS_DIR, "rmvpe.pt")\nFCPE_DIR = os.path.join(PREDICTORS_DIR, "fcpe.pt")\n\n# Фильтр Баттерворта для высоких частот\nFILTER_ORDER = 5 # Порядок фильтра\nCUTOFF_FREQUENCY = 48 # Частота среза (в Гц)\nSAMPLE_RATE = 16000 # Частота дискретизации (в Гц)\nbh, ah = signal.butter(N=FILTER_ORDER, Wn=CUTOFF_FREQUENCY, btype="high", fs=SAMPLE_RATE)\n\n\ninput_audio_path2wav = {}\n\n\n# Класс для обработки аудио\nclass AudioProcessor:\n @staticmethod\n def change_rms(source_audio, source_rate, target_audio, target_rate, rate):\n """\n Изменяет RMS (среднеквадратичное значение) аудио.\n """\n rms1 = librosa.feature.rms(\n y=source_audio,\n frame_length=source_rate // 2 * 2,\n hop_length=source_rate // 2,\n )\n rms2 = librosa.feature.rms(\n y=target_audio,\n frame_length=target_rate // 2 * 2,\n hop_length=target_rate // 2,\n )\n\n rms1 = F.interpolate(\n torch.from_numpy(rms1).float().unsqueeze(0),\n size=target_audio.shape[0],\n mode="linear",\n ).squeeze()\n rms2 = F.interpolate(\n torch.from_numpy(rms2).float().unsqueeze(0),\n size=target_audio.shape[0],\n mode="linear",\n ).squeeze()\n rms2 = torch.maximum(rms2, torch.zeros_like(rms2) + 1e-6)\n\n adjusted_audio = (\n target_audio * (torch.pow(rms1, 1 - rate) * torch.pow(rms2, rate - 1)).numpy()\n )\n return adjusted_audio\n\n\n# Класс для преобразования голоса\nclass VC:\n def __init__(self, tgt_sr, config):\n """\n Инициализация параметров для преобразования голоса.\n """\n self.x_pad = config.x_pad\n self.x_query = config.x_query\n self.x_center = config.x_center\n self.x_max = config.x_max\n self.is_half = config.is_half\n self.sample_rate = 16000\n self.window = 160\n self.t_pad = self.sample_rate * self.x_pad\n self.t_pad_tgt = tgt_sr * self.x_pad\n self.t_pad2 = self.t_pad * 2\n self.t_query = self.sample_rate * self.x_query\n self.t_center = self.sample_rate * self.x_center\n self.t_max = self.sample_rate * self.x_max\n self.time_step = self.window / self.sample_rate * 1000\n self.device = config.device\n\n def get_f0_crepe(self, x, f0_min, f0_max, p_len, hop_length, model="full"):\n """\n Получает F0 с использованием модели crepe.\n """\n x = x.astype(np.float32)\n x /= np.quantile(np.abs(x), 0.999)\n audio = torch.from_numpy(x).to(self.device, copy=True).unsqueeze(0)\n if audio.ndim == 2 and audio.shape[0] > 1:\n audio = torch.mean(audio, dim=0, keepdim=True)\n\n pitch = torchcrepe.predict(\n audio,\n self.sample_rate,\n hop_length,\n f0_min,\n f0_max,\n model,\n batch_size=hop_length * 2,\n device=self.device,\n pad=True,\n )\n\n p_len = p_len or x.shape[0] // hop_length\n source = np.array(pitch.squeeze(0).cpu().float().numpy())\n source[source < 0.001] = np.nan\n target = np.interp(\n np.arange(0, len(source) * p_len, len(source)) / p_len,\n np.arange(0, len(source)),\n source,\n )\n f0 = np.nan_to_num(target)\n return f0\n\n def get_f0_rmvpe(self, x, f0_min=1, f0_max=40000, *args, **kwargs):\n """\n Получает F0 с использованием модели rmvpe.\n """\n if not hasattr(self, "model_rmvpe"):\n self.model_rmvpe = RMVPE0Predictor(\n RMVPE_DIR, is_half=self.is_half, device=self.device\n )\n f0 = self.model_rmvpe.infer_from_audio_with_pitch(\n x, thred=0.03, f0_min=f0_min, f0_max=f0_max\n )\n return f0\n\n def get_f0(\n self,\n input_audio_path,\n x,\n p_len,\n pitch,\n f0_method,\n filter_radius,\n hop_length,\n inp_f0=None,\n f0_min=50,\n f0_max=1100,\n ):\n """\n Получает F0 с использованием выбранного метода.\n """\n global input_audio_path2wav\n f0_mel_min = 1127 * np.log(1 + f0_min / 700)\n f0_mel_max = 1127 * np.log(1 + f0_max / 700)\n\n if f0_method == "mangio-crepe":\n f0 = self.get_f0_crepe(x, f0_min, f0_max, p_len, int(hop_length))\n\n elif f0_method == "rmvpe+":\n params = {\n "x": x,\n "p_len": p_len,\n "pitch": pitch,\n "f0_min": f0_min,\n "f0_max": f0_max,\n "time_step": self.time_step,\n "filter_radius": filter_radius,\n "crepe_hop_length": int(hop_length),\n "model": "full",\n }\n f0 = self.get_f0_rmvpe(**params)\n\n elif f0_method == "fcpe":\n self.model_fcpe = FCPEF0Predictor(\n FCPE_DIR,\n f0_min=int(f0_min),\n f0_max=int(f0_max),\n dtype=torch.float32,\n device=self.device,\n sample_rate=self.sample_rate,\n threshold=0.03,\n )\n f0 = self.model_fcpe.compute_f0(x, p_len=p_len)\n del self.model_fcpe\n gc.collect()\n\n f0 *= pow(2, pitch / 12)\n tf0 = self.sample_rate // self.window\n if inp_f0 is not None:\n delta_t = np.round(\n (inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1\n ).astype("int16")\n replace_f0 = np.interp(list(range(delta_t)), inp_f0[:, 0] * 100, inp_f0[:, 1])\n shape = f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)].shape[0]\n f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)] = replace_f0[:shape]\n\n f0bak = f0.copy()\n f0_mel = 1127 * np.log(1 + f0 / 700)\n f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (\n f0_mel_max - f0_mel_min\n ) + 1\n f0_mel[f0_mel <= 1] = 1\n f0_mel[f0_mel > 255] = 255\n f0_coarse = np.rint(f0_mel).astype(int)\n return f0_coarse, f0bak\n\n def vc(\n self,\n model,\n net_g,\n sid,\n audio0,\n pitch,\n pitchf,\n index,\n big_npy,\n index_rate,\n version,\n protect,\n ):\n """\n Преобразует аудио с использованием модели.\n """\n feats = torch.from_numpy(audio0)\n feats = feats.half() if self.is_half else feats.float()\n if feats.dim() == 2:\n feats = feats.mean(-1)\n assert feats.dim() == 1, feats.dim()\n feats = feats.view(1, -1)\n padding_mask = torch.BoolTensor(feats.shape).to(self.device).fill_(False)\n\n inputs = {\n "source": feats.to(self.device),\n "padding_mask": padding_mask,\n "output_layer": 9 if version == "v1" else 12,\n }\n\n with torch.no_grad():\n logits = model.extract_features(**inputs)\n feats = model.final_proj(logits[0]) if version == "v1" else logits[0]\n if protect < 0.5 and pitch is not None and pitchf is not None:\n feats0 = feats.clone()\n if index is not None and big_npy is not None and index_rate != 0:\n npy = feats[0].cpu().numpy()\n npy = npy.astype("float32") if self.is_half else npy\n score, ix = index.search(npy, k=8)\n weight = np.square(1 / score)\n weight /= weight.sum(axis=1, keepdims=True)\n npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1)\n npy = npy.astype("float16") if self.is_half else npy\n feats = (\n torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate\n + (1 - index_rate) * feats\n )\n\n feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1)\n if protect < 0.5 and pitch is not None and pitchf is not None:\n feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute(\n 0, 2, 1\n )\n p_len = audio0.shape[0] // self.window\n if feats.shape[1] < p_len:\n p_len = feats.shape[1]\n if pitch is not None and pitchf is not None:\n pitch = pitch[:, :p_len]\n pitchf = pitchf[:, :p_len]\n\n if protect < 0.5 and pitch is not None and pitchf is not None:\n pitchff = pitchf.clone()\n pitchff[pitchf > 0] = 1\n pitchff[pitchf < 1] = protect\n pitchff = pitchff.unsqueeze(-1)\n feats = feats * pitchff + feats0 * (1 - pitchff)\n feats = feats.to(feats0.dtype)\n p_len = torch.tensor([p_len], device=self.device).long()\n with torch.no_grad():\n if pitch is not None and pitchf is not None:\n audio1 = (\n (net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0])\n .data.cpu()\n .float()\n .numpy()\n )\n else:\n audio1 = (\n (net_g.infer(feats, p_len, sid)[0][0, 0]).data.cpu().float().numpy()\n )\n del feats, p_len, padding_mask\n if torch.cuda.is_available():\n torch.cuda.empty_cache()\n return audio1\n\n def pipeline(\n self,\n model,\n net_g,\n sid,\n audio,\n input_audio_path,\n pitch,\n f0_method,\n file_index,\n index_rate,\n pitch_guidance,\n filter_radius,\n tgt_sr,\n resample_sr,\n volume_envelope,\n version,\n protect,\n hop_length,\n f0_file,\n f0_min=50,\n f0_max=1100,\n ):\n """\n Основной конвейер для преобразования аудио.\n """\n if (\n file_index is not None\n and file_index != ""\n and os.path.exists(file_index)\n and index_rate != 0\n ):\n try:\n index = faiss.read_index(file_index)\n big_npy = index.reconstruct_n(0, index.ntotal)\n except Exception as e:\n print(f"Произошла ошибка при чтении индекса FAISS: {e}")\n index = big_npy = None\n else:\n index = big_npy = None\n audio = signal.filtfilt(bh, ah, audio)\n audio_pad = np.pad(audio, (self.window // 2, self.window // 2), mode="reflect")\n opt_ts = []\n if audio_pad.shape[0] > self.t_max:\n audio_sum = np.zeros_like(audio)\n for i in range(self.window):\n audio_sum += audio_pad[i : i - self.window]\n for t in range(self.t_center, audio.shape[0], self.t_center):\n opt_ts.append(\n t\n - self.t_query\n + np.where(\n np.abs(audio_sum[t - self.t_query : t + self.t_query])\n == np.abs(audio_sum[t - self.t_query : t + self.t_query]).min()\n )[0][0]\n )\n s = 0\n audio_opt = []\n t = None\n audio_pad = np.pad(audio, (self.t_pad, self.t_pad), mode="reflect")\n p_len = audio_pad.shape[0] // self.window\n inp_f0 = None\n if f0_file and hasattr(f0_file, "name"):\n try:\n with open(f0_file.name, "r") as f:\n lines = f.read().strip("\\n").split("\\n")\n inp_f0 = np.array(\n [[float(i) for i in line.split(",")] for line in lines],\n dtype="float32",\n )\n except Exception as e:\n print(f"Произошла ошибка при чтении файла F0: {e}")\n sid = torch.tensor(sid, device=self.device).unsqueeze(0).long()\n if pitch_guidance:\n pitch, pitchf = self.get_f0(\n input_audio_path,\n audio_pad,\n p_len,\n pitch,\n f0_method,\n filter_radius,\n hop_length,\n inp_f0,\n f0_min,\n f0_max,\n )\n pitch = pitch[:p_len]\n pitchf = pitchf[:p_len]\n if self.device == "mps":\n pitchf = pitchf.astype(np.float32)\n pitch = torch.tensor(pitch, device=self.device).unsqueeze(0).long()\n pitchf = torch.tensor(pitchf, device=self.device).unsqueeze(0).float()\n for t in opt_ts:\n t = t // self.window * self.window\n if pitch_guidance:\n audio_opt.append(\n self.vc(\n model,\n net_g,\n sid,\n audio_pad[s : t + self.t_pad2 + self.window],\n pitch[:, s // self.window : (t + self.t_pad2) // self.window],\n pitchf[:, s // self.window : (t + self.t_pad2) // self.window],\n index,\n big_npy,\n index_rate,\n version,\n protect,\n )[self.t_pad_tgt : -self.t_pad_tgt]\n )\n else:\n audio_opt.append(\n self.vc(\n model,\n net_g,\n sid,\n audio_pad[s : t + self.t_pad2 + self.window],\n None,\n None,\n index,\n big_npy,\n index_rate,\n version,\n protect,\n )[self.t_pad_tgt : -self.t_pad_tgt]\n )\n s = t\n if pitch_guidance:\n audio_opt.append(\n self.vc(\n model,\n net_g,\n sid,\n audio_pad[t:],\n pitch[:, t // self.window :] if t is not None else pitch,\n pitchf[:, t // self.window :] if t is not None else pitchf,\n index,\n big_npy,\n index_rate,\n version,\n protect,\n )[self.t_pad_tgt : -self.t_pad_tgt]\n )\n else:\n audio_opt.append(\n self.vc(\n model,\n net_g,\n sid,\n audio_pad[t:],\n None,\n None,\n index,\n big_npy,\n index_rate,\n version,\n protect,\n )[self.t_pad_tgt : -self.t_pad_tgt]\n )\n\n audio_opt = np.concatenate(audio_opt)\n if volume_envelope != 1:\n audio_opt = AudioProcessor.change_rms(\n audio, self.sample_rate, audio_opt, tgt_sr, volume_envelope\n )\n if resample_sr >= self.sample_rate and tgt_sr != resample_sr:\n audio_opt = librosa.resample(audio_opt, orig_sr=tgt_sr, target_sr=resample_sr)\n\n audio_max = np.abs(audio_opt).max() / 0.99\n max_int16 = 32768\n if audio_max > 1:\n max_int16 /= audio_max\n audio_opt = (audio_opt * max_int16).astype(np.int16)\n\n del pitch, pitchf, sid\n if torch.cuda.is_available():\n torch.cuda.empty_cache()\n\n return audio_opt\n', 'vbach/lib/algorithm/attentions.py': '\nimport math\nimport torch\nfrom torch import nn\nfrom torch.nn import functional as F\n\nfrom .commons import convert_pad_shape\n\n\nclass MultiHeadAttention(nn.Module):\n def __init__(\n self,\n channels,\n out_channels,\n n_heads,\n p_dropout=0.0,\n window_size=None,\n heads_share=True,\n block_length=None,\n proximal_bias=False,\n proximal_init=False,\n ):\n super().__init__()\n assert channels % n_heads == 0\n\n self.channels = channels\n self.out_channels = out_channels\n self.n_heads = n_heads\n self.p_dropout = p_dropout\n self.window_size = window_size\n self.heads_share = heads_share\n self.block_length = block_length\n self.proximal_bias = proximal_bias\n self.proximal_init = proximal_init\n self.attn = None\n\n self.k_channels = channels // n_heads\n self.conv_q = nn.Conv1d(channels, channels, 1)\n self.conv_k = nn.Conv1d(channels, channels, 1)\n self.conv_v = nn.Conv1d(channels, channels, 1)\n self.conv_o = nn.Conv1d(channels, out_channels, 1)\n self.drop = nn.Dropout(p_dropout)\n\n if window_size is not None:\n n_heads_rel = 1 if heads_share else n_heads\n rel_stddev = self.k_channels**-0.5\n self.emb_rel_k = nn.Parameter(\n torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels)\n * rel_stddev\n )\n self.emb_rel_v = nn.Parameter(\n torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels)\n * rel_stddev\n )\n\n nn.init.xavier_uniform_(self.conv_q.weight)\n nn.init.xavier_uniform_(self.conv_k.weight)\n nn.init.xavier_uniform_(self.conv_v.weight)\n if proximal_init:\n with torch.no_grad():\n self.conv_k.weight.copy_(self.conv_q.weight)\n self.conv_k.bias.copy_(self.conv_q.bias)\n\n def forward(self, x, c, attn_mask=None):\n q = self.conv_q(x)\n k = self.conv_k(c)\n v = self.conv_v(c)\n\n x, self.attn = self.attention(q, k, v, mask=attn_mask)\n\n x = self.conv_o(x)\n return x\n\n def attention(self, query, key, value, mask=None):\n b, d, t_s, t_t = (*key.size(), query.size(2))\n query = query.view(b, self.n_heads, self.k_channels, t_t).transpose(2, 3)\n key = key.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3)\n value = value.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3)\n\n scores = torch.matmul(query / math.sqrt(self.k_channels), key.transpose(-2, -1))\n if self.window_size is not None:\n assert t_s == t_t, "Relative attention is only available for self-attention."\n key_relative_embeddings = self._get_relative_embeddings(self.emb_rel_k, t_s)\n rel_logits = self._matmul_with_relative_keys(\n query / math.sqrt(self.k_channels), key_relative_embeddings\n )\n scores_local = self._relative_position_to_absolute_position(rel_logits)\n scores = scores + scores_local\n if self.proximal_bias:\n assert t_s == t_t, "Proximal bias is only available for self-attention."\n scores = scores + self._attention_bias_proximal(t_s).to(\n device=scores.device, dtype=scores.dtype\n )\n if mask is not None:\n scores = scores.masked_fill(mask == 0, -1e4)\n if self.block_length is not None:\n assert t_s == t_t, "Local attention is only available for self-attention."\n block_mask = (\n torch.ones_like(scores)\n .triu(-self.block_length)\n .tril(self.block_length)\n )\n scores = scores.masked_fill(block_mask == 0, -1e4)\n p_attn = F.softmax(scores, dim=-1)\n p_attn = self.drop(p_attn)\n output = torch.matmul(p_attn, value)\n if self.window_size is not None:\n relative_weights = self._absolute_position_to_relative_position(p_attn)\n value_relative_embeddings = self._get_relative_embeddings(self.emb_rel_v, t_s)\n output = output + self._matmul_with_relative_values(\n relative_weights, value_relative_embeddings\n )\n output = output.transpose(2, 3).contiguous().view(b, d, t_t)\n return output, p_attn\n\n def _matmul_with_relative_values(self, x, y):\n ret = torch.matmul(x, y.unsqueeze(0))\n return ret\n\n def _matmul_with_relative_keys(self, x, y):\n ret = torch.matmul(x, y.unsqueeze(0).transpose(-2, -1))\n return ret\n\n def _get_relative_embeddings(self, relative_embeddings, length):\n pad_length = max(length - (self.window_size + 1), 0)\n slice_start_position = max((self.window_size + 1) - length, 0)\n slice_end_position = slice_start_position + 2 * length - 1\n if pad_length > 0:\n padded_relative_embeddings = F.pad(\n relative_embeddings,\n convert_pad_shape([[0, 0], [pad_length, pad_length], [0, 0]]),\n )\n else:\n padded_relative_embeddings = relative_embeddings\n used_relative_embeddings = padded_relative_embeddings[\n :, slice_start_position:slice_end_position\n ]\n return used_relative_embeddings\n\n def _relative_position_to_absolute_position(self, x):\n batch, heads, length, _ = x.size()\n\n x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, 1]]))\n\n x_flat = x.view([batch, heads, length * 2 * length])\n x_flat = F.pad(x_flat, convert_pad_shape([[0, 0], [0, 0], [0, length - 1]]))\n\n x_final = x_flat.view([batch, heads, length + 1, 2 * length - 1])[\n :, :, :length, length - 1 :\n ]\n return x_final\n\n def _absolute_position_to_relative_position(self, x):\n batch, heads, length, _ = x.size()\n x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, length - 1]]))\n x_flat = x.view([batch, heads, length**2 + length * (length - 1)])\n x_flat = F.pad(x_flat, convert_pad_shape([[0, 0], [0, 0], [length, 0]]))\n x_final = x_flat.view([batch, heads, length, 2 * length])[:, :, :, 1:]\n return x_final\n\n def _attention_bias_proximal(self, length):\n r = torch.arange(length, dtype=torch.float32)\n diff = torch.unsqueeze(r, 0) - torch.unsqueeze(r, 1)\n return torch.unsqueeze(torch.unsqueeze(-torch.log1p(torch.abs(diff)), 0), 0)\n\n\nclass FFN(nn.Module):\n def __init__(\n self,\n in_channels,\n out_channels,\n filter_channels,\n kernel_size,\n p_dropout=0.0,\n activation=None,\n causal=False,\n ):\n super().__init__()\n self.in_channels = in_channels\n self.out_channels = out_channels\n self.filter_channels = filter_channels\n self.kernel_size = kernel_size\n self.p_dropout = p_dropout\n self.activation = activation\n self.causal = causal\n\n if causal:\n self.padding = self._causal_padding\n else:\n self.padding = self._same_padding\n\n self.conv_1 = nn.Conv1d(in_channels, filter_channels, kernel_size)\n self.conv_2 = nn.Conv1d(filter_channels, out_channels, kernel_size)\n self.drop = nn.Dropout(p_dropout)\n\n def forward(self, x, x_mask):\n x = self.conv_1(self.padding(x * x_mask))\n if self.activation == "gelu":\n x = x * torch.sigmoid(1.702 * x)\n else:\n x = torch.relu(x)\n x = self.drop(x)\n x = self.conv_2(self.padding(x * x_mask))\n return x * x_mask\n\n def _causal_padding(self, x):\n if self.kernel_size == 1:\n return x\n pad_l = self.kernel_size - 1\n pad_r = 0\n padding = [[0, 0], [0, 0], [pad_l, pad_r]]\n x = F.pad(x, convert_pad_shape(padding))\n return x\n\n def _same_padding(self, x):\n if self.kernel_size == 1:\n return x\n pad_l = (self.kernel_size - 1) // 2\n pad_r = self.kernel_size // 2\n padding = [[0, 0], [0, 0], [pad_l, pad_r]]\n x = F.pad(x, convert_pad_shape(padding))\n return x\n\n ', 'vbach/lib/algorithm/commons.py': '\nimport math\nimport torch\nfrom torch.nn import functional as F\nfrom typing import List, Optional\n\n\ndef init_weights(m, mean=0.0, std=0.01):\n classname = m.__class__.__name__\n if classname.find("Conv") != -1:\n m.weight.data.normal_(mean, std)\n\n\ndef get_padding(kernel_size, dilation=1):\n return int((kernel_size * dilation - dilation) / 2)\n\n\ndef convert_pad_shape(pad_shape):\n l = pad_shape[::-1]\n pad_shape = [item for sublist in l for item in sublist]\n return pad_shape\n\n\ndef kl_divergence(m_p, logs_p, m_q, logs_q):\n kl = (logs_q - logs_p) - 0.5\n kl += 0.5 * (torch.exp(2.0 * logs_p) + ((m_p - m_q) ** 2)) * torch.exp(-2.0 * logs_q)\n return kl\n\n\ndef slice_segments(\n x: torch.Tensor, ids_str: torch.Tensor, segment_size: int = 4, dim: int = 2\n):\n if dim == 2:\n ret = torch.zeros_like(x[:, :segment_size])\n elif dim == 3:\n ret = torch.zeros_like(x[:, :, :segment_size])\n\n for i in range(x.size(0)):\n idx_str = ids_str[i].item()\n idx_end = idx_str + segment_size\n if dim == 2:\n ret[i] = x[i, idx_str:idx_end]\n else:\n ret[i] = x[i, :, idx_str:idx_end]\n\n return ret\n\n\ndef rand_slice_segments(x, x_lengths=None, segment_size=4):\n b, d, t = x.size()\n if x_lengths is None:\n x_lengths = t\n ids_str_max = x_lengths - segment_size + 1\n ids_str = (torch.rand([b]).to(device=x.device) * ids_str_max).to(dtype=torch.long)\n ret = slice_segments(x, ids_str, segment_size, dim=3)\n return ret, ids_str\n\n\ndef get_timing_signal_1d(length, channels, min_timescale=1.0, max_timescale=1.0e4):\n position = torch.arange(length, dtype=torch.float)\n num_timescales = channels // 2\n log_timescale_increment = math.log(float(max_timescale) / float(min_timescale)) / (\n num_timescales - 1\n )\n inv_timescales = min_timescale * torch.exp(\n torch.arange(num_timescales, dtype=torch.float) * -log_timescale_increment\n )\n scaled_time = position.unsqueeze(0) * inv_timescales.unsqueeze(1)\n signal = torch.cat([torch.sin(scaled_time), torch.cos(scaled_time)], 0)\n signal = F.pad(signal, [0, 0, 0, channels % 2])\n signal = signal.view(1, channels, length)\n return signal\n\n\ndef subsequent_mask(length):\n mask = torch.tril(torch.ones(length, length)).unsqueeze(0).unsqueeze(0)\n return mask\n\n\n@torch.jit.script\ndef fused_add_tanh_sigmoid_multiply(input_a, input_b, n_channels):\n n_channels_int = n_channels[0]\n in_act = input_a + input_b\n t_act = torch.tanh(in_act[:, :n_channels_int, :])\n s_act = torch.sigmoid(in_act[:, n_channels_int:, :])\n acts = t_act * s_act\n return acts\n\n\ndef sequence_mask(length: torch.Tensor, max_length: Optional[int] = None):\n if max_length is None:\n max_length = length.max()\n x = torch.arange(max_length, dtype=length.dtype, device=length.device)\n return x.unsqueeze(0) < length.unsqueeze(1)\n\n\ndef clip_grad_value(parameters, clip_value, norm_type=2):\n if isinstance(parameters, torch.Tensor):\n parameters = [parameters]\n parameters = List(filter(lambda p: p.grad is not None, parameters))\n norm_type = float(norm_type)\n if clip_value is not None:\n clip_value = float(clip_value)\n\n total_norm = 0\n for p in parameters:\n param_norm = p.grad.data.norm(norm_type)\n total_norm += param_norm.item() ** norm_type\n if clip_value is not None:\n p.grad.data.clamp_(min=-clip_value, max=clip_value)\n total_norm = total_norm ** (1.0 / norm_type)\n return total_norm\n\n ', 'vbach/lib/algorithm/discriminators.py': '\nimport torch\nfrom torch import nn\nfrom torch.nn import functional as F\nfrom torch.nn.utils.parametrizations import spectral_norm, weight_norm\n\nfrom .commons import get_padding\nfrom .residuals import LRELU_SLOPE\n\n\nPERIODS_V1 = [2, 3, 5, 7, 11, 17]\nPERIODS_V2 = [2, 3, 5, 7, 11, 17, 23, 37]\nIN_CHANNELS = [1, 32, 128, 512, 1024]\nOUT_CHANNELS = [32, 128, 512, 1024, 1024]\n\n\nclass MultiPeriodDiscriminator(nn.Module):\n def __init__(self, use_spectral_norm=False):\n super(MultiPeriodDiscriminator, self).__init__()\n self.discriminators = nn.ModuleList(\n [DiscriminatorS(use_spectral_norm=use_spectral_norm)]\n + [DiscriminatorP(p, use_spectral_norm=use_spectral_norm) for p in PERIODS_V1]\n )\n\n def forward(self, y, y_hat):\n y_d_rs, y_d_gs, fmap_rs, fmap_gs = [], [], [], []\n for d in self.discriminators:\n y_d_r, fmap_r = d(y)\n y_d_g, fmap_g = d(y_hat)\n y_d_rs.append(y_d_r)\n y_d_gs.append(y_d_g)\n fmap_rs.append(fmap_r)\n fmap_gs.append(fmap_g)\n\n return y_d_rs, y_d_gs, fmap_rs, fmap_gs\n\n\nclass MultiPeriodDiscriminatorV2(nn.Module):\n def __init__(self, use_spectral_norm=False):\n super(MultiPeriodDiscriminatorV2, self).__init__()\n self.discriminators = nn.ModuleList(\n [DiscriminatorS(use_spectral_norm=use_spectral_norm)]\n + [DiscriminatorP(p, use_spectral_norm=use_spectral_norm) for p in PERIODS_V2]\n )\n\n def forward(self, y, y_hat):\n y_d_rs, y_d_gs, fmap_rs, fmap_gs = [], [], [], []\n for d in self.discriminators:\n y_d_r, fmap_r = d(y)\n y_d_g, fmap_g = d(y_hat)\n y_d_rs.append(y_d_r)\n y_d_gs.append(y_d_g)\n fmap_rs.append(fmap_r)\n fmap_gs.append(fmap_g)\n\n return y_d_rs, y_d_gs, fmap_rs, fmap_gs\n\n\nclass DiscriminatorS(nn.Module):\n def __init__(self, use_spectral_norm=False):\n super(DiscriminatorS, self).__init__()\n norm_f = spectral_norm if use_spectral_norm else weight_norm\n self.convs = nn.ModuleList(\n [\n norm_f(nn.Conv1d(1, 16, 15, 1, padding=7)),\n norm_f(nn.Conv1d(16, 64, 41, 4, groups=4, padding=20)),\n norm_f(nn.Conv1d(64, 256, 41, 4, groups=16, padding=20)),\n norm_f(nn.Conv1d(256, 1024, 41, 4, groups=64, padding=20)),\n norm_f(nn.Conv1d(1024, 1024, 41, 4, groups=256, padding=20)),\n norm_f(nn.Conv1d(1024, 1024, 5, 1, padding=2)),\n ]\n )\n self.conv_post = norm_f(nn.Conv1d(1024, 1, 3, 1, padding=1))\n self.lrelu = nn.LeakyReLU(LRELU_SLOPE)\n\n def forward(self, x):\n fmap = []\n for conv in self.convs:\n x = self.lrelu(conv(x))\n fmap.append(x)\n x = self.conv_post(x)\n fmap.append(x)\n x = torch.flatten(x, 1, -1)\n return x, fmap\n\n\nclass DiscriminatorP(nn.Module):\n def __init__(self, period, kernel_size=5, stride=3, use_spectral_norm=False):\n super(DiscriminatorP, self).__init__()\n self.period = period\n norm_f = spectral_norm if use_spectral_norm else weight_norm\n\n self.convs = nn.ModuleList(\n [\n norm_f(\n nn.Conv2d(\n in_ch,\n out_ch,\n (kernel_size, 1),\n (stride, 1),\n padding=(get_padding(kernel_size, 1), 0),\n )\n )\n for in_ch, out_ch in zip(IN_CHANNELS, OUT_CHANNELS)\n ]\n )\n\n self.conv_post = norm_f(nn.Conv2d(1024, 1, (3, 1), 1, padding=(1, 0)))\n self.lrelu = nn.LeakyReLU(LRELU_SLOPE)\n\n def forward(self, x):\n fmap = []\n b, c, t = x.shape\n if t % self.period != 0:\n n_pad = self.period - (t % self.period)\n x = F.pad(x, (0, n_pad), "reflect")\n x = x.view(b, c, -1, self.period)\n\n for conv in self.convs:\n x = self.lrelu(conv(x))\n fmap.append(x)\n\n x = self.conv_post(x)\n fmap.append(x)\n x = torch.flatten(x, 1, -1)\n return x, fmap\n\n ', 'vbach/lib/algorithm/encoders.py': '\nimport math\nimport torch\nfrom torch import nn\nfrom torch.nn.utils.weight_norm import remove_weight_norm\nfrom typing import Optional\n\nfrom .attentions import FFN, MultiHeadAttention\nfrom .commons import sequence_mask\nfrom .modules import WaveNet\nfrom .normalization import LayerNorm\n\n\nclass Encoder(nn.Module):\n def __init__(\n self,\n hidden_channels,\n filter_channels,\n n_heads,\n n_layers,\n kernel_size=1,\n p_dropout=0.0,\n window_size=10,\n **kwargs\n ):\n super().__init__()\n self.hidden_channels = hidden_channels\n self.filter_channels = filter_channels\n self.n_heads = n_heads\n self.n_layers = n_layers\n self.kernel_size = kernel_size\n self.p_dropout = p_dropout\n self.window_size = window_size\n\n self.drop = nn.Dropout(p_dropout)\n self.attn_layers = nn.ModuleList()\n self.norm_layers_1 = nn.ModuleList()\n self.ffn_layers = nn.ModuleList()\n self.norm_layers_2 = nn.ModuleList()\n for i in range(self.n_layers):\n self.attn_layers.append(\n MultiHeadAttention(\n hidden_channels,\n hidden_channels,\n n_heads,\n p_dropout=p_dropout,\n window_size=window_size,\n )\n )\n self.norm_layers_1.append(LayerNorm(hidden_channels))\n self.ffn_layers.append(\n FFN(\n hidden_channels,\n hidden_channels,\n filter_channels,\n kernel_size,\n p_dropout=p_dropout,\n )\n )\n self.norm_layers_2.append(LayerNorm(hidden_channels))\n\n def forward(self, x, x_mask):\n attn_mask = x_mask.unsqueeze(2) * x_mask.unsqueeze(-1)\n x = x * x_mask\n for i in range(self.n_layers):\n y = self.attn_layers[i](x, x, attn_mask)\n y = self.drop(y)\n x = self.norm_layers_1[i](x + y)\n\n y = self.ffn_layers[i](x, x_mask)\n y = self.drop(y)\n x = self.norm_layers_2[i](x + y)\n x = x * x_mask\n return x\n\n\nclass TextEncoder(nn.Module):\n def __init__(\n self,\n out_channels,\n hidden_channels,\n filter_channels,\n n_heads,\n n_layers,\n kernel_size,\n p_dropout,\n embedding_dim,\n f0=True,\n ):\n super(TextEncoder, self).__init__()\n self.out_channels = out_channels\n self.hidden_channels = hidden_channels\n self.filter_channels = filter_channels\n self.n_heads = n_heads\n self.n_layers = n_layers\n self.kernel_size = kernel_size\n self.p_dropout = float(p_dropout)\n self.emb_phone = nn.Linear(embedding_dim, hidden_channels)\n self.lrelu = nn.LeakyReLU(0.1, inplace=True)\n if f0:\n self.emb_pitch = nn.Embedding(256, hidden_channels)\n self.encoder = Encoder(\n hidden_channels,\n filter_channels,\n n_heads,\n n_layers,\n kernel_size,\n float(p_dropout),\n )\n self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)\n\n def forward(\n self, phone: torch.Tensor, pitch: Optional[torch.Tensor], lengths: torch.Tensor\n ):\n if pitch is None:\n x = self.emb_phone(phone)\n else:\n x = self.emb_phone(phone) + self.emb_pitch(pitch)\n x = x * math.sqrt(self.hidden_channels)\n x = self.lrelu(x)\n x = torch.transpose(x, 1, -1)\n x_mask = torch.unsqueeze(sequence_mask(lengths, x.size(2)), 1).to(x.dtype)\n x = self.encoder(x * x_mask, x_mask)\n stats = self.proj(x) * x_mask\n\n m, logs = torch.split(stats, self.out_channels, dim=1)\n return m, logs, x_mask\n\n\nclass PosteriorEncoder(nn.Module):\n def __init__(\n self,\n in_channels,\n out_channels,\n hidden_channels,\n kernel_size,\n dilation_rate,\n n_layers,\n gin_channels=0,\n ):\n super(PosteriorEncoder, self).__init__()\n self.in_channels = in_channels\n self.out_channels = out_channels\n self.hidden_channels = hidden_channels\n self.kernel_size = kernel_size\n self.dilation_rate = dilation_rate\n self.n_layers = n_layers\n self.gin_channels = gin_channels\n\n self.pre = nn.Conv1d(in_channels, hidden_channels, 1)\n self.enc = WaveNet(\n hidden_channels,\n kernel_size,\n dilation_rate,\n n_layers,\n gin_channels=gin_channels,\n )\n self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)\n\n def forward(\n self, x: torch.Tensor, x_lengths: torch.Tensor, g: Optional[torch.Tensor] = None\n ):\n x_mask = torch.unsqueeze(sequence_mask(x_lengths, x.size(2)), 1).to(x.dtype)\n x = self.pre(x) * x_mask\n x = self.enc(x, x_mask, g=g)\n stats = self.proj(x) * x_mask\n m, logs = torch.split(stats, self.out_channels, dim=1)\n z = (m + torch.randn_like(m) * torch.exp(logs)) * x_mask\n return z, m, logs, x_mask\n\n def remove_weight_norm(self):\n self.enc.remove_weight_norm()\n\n def __prepare_scriptable__(self):\n for hook in self.enc._forward_pre_hooks.values():\n if (\n hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"\n and hook.__class__.__name__ == "_WeightNorm"\n ):\n remove_weight_norm(self.enc)\n return self\n\n ', 'vbach/lib/algorithm/generators.py': '\nimport torch\nfrom torch import nn\nfrom torch.nn import functional as F\nfrom torch.nn.utils.weight_norm import remove_weight_norm\nfrom torch.nn.utils.parametrizations import weight_norm\nfrom typing import Optional\n\nfrom .commons import init_weights\nfrom .residuals import LRELU_SLOPE, ResBlock1, ResBlock2\n\n\nclass Generator(nn.Module):\n def __init__(\n self,\n initial_channel,\n resblock,\n resblock_kernel_sizes,\n resblock_dilation_sizes,\n upsample_rates,\n upsample_initial_channel,\n upsample_kernel_sizes,\n gin_channels=0,\n ):\n super(Generator, self).__init__()\n self.num_kernels = len(resblock_kernel_sizes)\n self.num_upsamples = len(upsample_rates)\n self.conv_pre = nn.Conv1d(\n initial_channel, upsample_initial_channel, 7, 1, padding=3\n )\n resblock = ResBlock1 if resblock == "1" else ResBlock2\n\n self.ups_and_resblocks = nn.ModuleList()\n for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)):\n self.ups_and_resblocks.append(\n weight_norm(\n nn.ConvTranspose1d(\n upsample_initial_channel // (2**i),\n upsample_initial_channel // (2 ** (i + 1)),\n k,\n u,\n padding=(k - u) // 2,\n )\n )\n )\n ch = upsample_initial_channel // (2 ** (i + 1))\n for j, (k, d) in enumerate(\n zip(resblock_kernel_sizes, resblock_dilation_sizes)\n ):\n self.ups_and_resblocks.append(resblock(ch, k, d))\n\n self.conv_post = nn.Conv1d(ch, 1, 7, 1, padding=3, bias=False)\n self.ups_and_resblocks.apply(init_weights)\n\n if gin_channels != 0:\n self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1)\n\n def forward(self, x: torch.Tensor, g: Optional[torch.Tensor] = None):\n x = self.conv_pre(x)\n if g is not None:\n x = x + self.cond(g)\n\n resblock_idx = 0\n for _ in range(self.num_upsamples):\n x = F.leaky_relu(x, LRELU_SLOPE)\n x = self.ups_and_resblocks[resblock_idx](x)\n resblock_idx += 1\n xs = 0\n for _ in range(self.num_kernels):\n xs += self.ups_and_resblocks[resblock_idx](x)\n resblock_idx += 1\n x = xs / self.num_kernels\n\n x = F.leaky_relu(x)\n x = self.conv_post(x)\n x = torch.tanh(x)\n\n return x\n\n def __prepare_scriptable__(self):\n for l in self.ups_and_resblocks:\n for hook in l._forward_pre_hooks.values():\n if (\n hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"\n and hook.__class__.__name__ == "_WeightNorm"\n ):\n remove_weight_norm(l)\n return self\n\n def remove_weight_norm(self):\n for l in self.ups_and_resblocks:\n remove_weight_norm(l)\n\n\nclass SineGen(nn.Module):\n def __init__(\n self,\n samp_rate,\n harmonic_num=0,\n sine_amp=0.1,\n noise_std=0.003,\n voiced_threshold=0,\n flag_for_pulse=False,\n ):\n super(SineGen, self).__init__()\n self.sine_amp = sine_amp\n self.noise_std = noise_std\n self.harmonic_num = harmonic_num\n self.dim = self.harmonic_num + 1\n self.sample_rate = samp_rate\n self.voiced_threshold = voiced_threshold\n\n def _f02uv(self, f0):\n uv = torch.ones_like(f0)\n uv = uv * (f0 > self.voiced_threshold)\n return uv\n\n def forward(self, f0: torch.Tensor, upp: int):\n with torch.no_grad():\n f0 = f0[:, None].transpose(1, 2)\n f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim, device=f0.device)\n f0_buf[:, :, 0] = f0[:, :, 0]\n f0_buf[:, :, 1:] = (\n f0_buf[:, :, 0:1]\n * torch.arange(2, self.harmonic_num + 2, device=f0.device)[None, None, :]\n )\n rad_values = (f0_buf / float(self.sample_rate)) % 1\n rand_ini = torch.rand(f0_buf.shape[0], f0_buf.shape[2], device=f0_buf.device)\n rand_ini[:, 0] = 0\n rad_values[:, 0, :] = rad_values[:, 0, :] + rand_ini\n tmp_over_one = torch.cumsum(rad_values, 1)\n tmp_over_one *= upp\n tmp_over_one = F.interpolate(\n tmp_over_one.transpose(2, 1),\n scale_factor=float(upp),\n mode="linear",\n align_corners=True,\n ).transpose(2, 1)\n rad_values = F.interpolate(\n rad_values.transpose(2, 1), scale_factor=float(upp), mode="nearest"\n ).transpose(2, 1)\n tmp_over_one %= 1\n tmp_over_one_idx = (tmp_over_one[:, 1:, :] - tmp_over_one[:, :-1, :]) < 0\n cumsum_shift = torch.zeros_like(rad_values)\n cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0\n sine_waves = torch.sin(\n torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * torch.pi\n )\n sine_waves = sine_waves * self.sine_amp\n uv = self._f02uv(f0)\n uv = F.interpolate(\n uv.transpose(2, 1), scale_factor=float(upp), mode="nearest"\n ).transpose(2, 1)\n noise_amp = uv * self.noise_std + (1 - uv) * self.sine_amp / 3\n noise = noise_amp * torch.randn_like(sine_waves)\n sine_waves = sine_waves * uv + noise\n return sine_waves, uv, noise\n\n ', 'vbach/lib/algorithm/modules.py': '\nimport torch\nfrom torch import nn\nfrom torch.nn.utils.weight_norm import remove_weight_norm\nfrom torch.nn.utils.parametrizations import weight_norm\n\nfrom .commons import fused_add_tanh_sigmoid_multiply\n\n\nclass WaveNet(nn.Module):\n def __init__(\n self,\n hidden_channels,\n kernel_size,\n dilation_rate,\n n_layers,\n gin_channels=0,\n p_dropout=0,\n ):\n super(WaveNet, self).__init__()\n assert kernel_size % 2 == 1\n self.hidden_channels = hidden_channels\n self.kernel_size = (kernel_size,)\n self.dilation_rate = dilation_rate\n self.n_layers = n_layers\n self.gin_channels = gin_channels\n self.p_dropout = p_dropout\n\n self.in_layers = nn.ModuleList()\n self.res_skip_layers = nn.ModuleList()\n self.drop = nn.Dropout(p_dropout)\n\n if gin_channels != 0:\n cond_layer = nn.Conv1d(gin_channels, 2 * hidden_channels * n_layers, 1)\n self.cond_layer = weight_norm(cond_layer, name="weight")\n\n dilations = [dilation_rate**i for i in range(n_layers)]\n paddings = [(kernel_size * d - d) // 2 for d in dilations]\n\n for i in range(n_layers):\n in_layer = nn.Conv1d(\n hidden_channels,\n 2 * hidden_channels,\n kernel_size,\n dilation=dilations[i],\n padding=paddings[i],\n )\n in_layer = weight_norm(in_layer, name="weight")\n self.in_layers.append(in_layer)\n\n res_skip_channels = (\n hidden_channels if i == n_layers - 1 else 2 * hidden_channels\n )\n\n res_skip_layer = nn.Conv1d(hidden_channels, res_skip_channels, 1)\n res_skip_layer = weight_norm(res_skip_layer, name="weight")\n self.res_skip_layers.append(res_skip_layer)\n\n def forward(self, x, x_mask, g=None, **kwargs):\n output = torch.zeros_like(x)\n n_channels_tensor = torch.IntTensor([self.hidden_channels])\n\n if g is not None:\n g = self.cond_layer(g)\n\n for i in range(self.n_layers):\n x_in = self.in_layers[i](x)\n if g is not None:\n cond_offset = i * 2 * self.hidden_channels\n g_l = g[:, cond_offset : cond_offset + 2 * self.hidden_channels, :]\n else:\n g_l = torch.zeros_like(x_in)\n\n acts = fused_add_tanh_sigmoid_multiply(x_in, g_l, n_channels_tensor)\n\n acts = self.drop(acts)\n\n res_skip_acts = self.res_skip_layers[i](acts)\n if i < self.n_layers - 1:\n res_acts = res_skip_acts[:, : self.hidden_channels, :]\n x = (x + res_acts) * x_mask\n output = output + res_skip_acts[:, self.hidden_channels :, :]\n else:\n output = output + res_skip_acts\n return output * x_mask\n\n def remove_weight_norm(self):\n if self.gin_channels != 0:\n remove_weight_norm(self.cond_layer)\n for l in self.in_layers:\n remove_weight_norm(l)\n for l in self.res_skip_layers:\n remove_weight_norm(l)\n\n ', 'vbach/lib/algorithm/normalization.py': '\nimport torch\nfrom torch import nn\nfrom torch.nn import functional as F\n\n\nclass LayerNorm(nn.Module):\n def __init__(self, channels, eps=1e-5):\n super().__init__()\n self.eps = eps\n self.gamma = nn.Parameter(torch.ones(channels))\n self.beta = nn.Parameter(torch.zeros(channels))\n\n def forward(self, x):\n x = x.transpose(1, -1)\n x = F.layer_norm(x, (x.size(-1),), self.gamma, self.beta, self.eps)\n return x.transpose(1, -1)\n ', 'vbach/lib/algorithm/nsf.py': '\nimport math\nimport torch\nfrom torch import nn\nfrom torch.nn import functional as F\nfrom torch.nn.utils.weight_norm import remove_weight_norm\nfrom torch.nn.utils.parametrizations import weight_norm\nfrom typing import Optional\n\nfrom .commons import init_weights\nfrom .generators import SineGen\nfrom .residuals import LRELU_SLOPE, ResBlock1, ResBlock2\n\n\nclass SourceModuleHnNSF(nn.Module):\n def __init__(\n self,\n sample_rate,\n harmonic_num=0,\n sine_amp=0.1,\n add_noise_std=0.003,\n voiced_threshod=0,\n is_half=True,\n ):\n super(SourceModuleHnNSF, self).__init__()\n\n self.sine_amp = sine_amp\n self.noise_std = add_noise_std\n self.is_half = is_half\n\n self.l_sin_gen = SineGen(\n sample_rate, harmonic_num, sine_amp, add_noise_std, voiced_threshod\n )\n self.l_linear = nn.Linear(harmonic_num + 1, 1)\n self.l_tanh = nn.Tanh()\n\n def forward(self, x: torch.Tensor, upsample_factor: int = 1):\n sine_wavs, uv, _ = self.l_sin_gen(x, upsample_factor)\n sine_wavs = sine_wavs.to(dtype=self.l_linear.weight.dtype)\n sine_merge = self.l_tanh(self.l_linear(sine_wavs))\n return sine_merge, None, None\n\n\nclass GeneratorNSF(nn.Module):\n def __init__(\n self,\n initial_channel,\n resblock,\n resblock_kernel_sizes,\n resblock_dilation_sizes,\n upsample_rates,\n upsample_initial_channel,\n upsample_kernel_sizes,\n gin_channels,\n sr,\n is_half=False,\n ):\n super(GeneratorNSF, self).__init__()\n\n self.num_kernels = len(resblock_kernel_sizes)\n self.num_upsamples = len(upsample_rates)\n self.f0_upsamp = nn.Upsample(scale_factor=math.prod(upsample_rates))\n self.m_source = SourceModuleHnNSF(sample_rate=sr, harmonic_num=0, is_half=is_half)\n\n self.conv_pre = nn.Conv1d(\n initial_channel, upsample_initial_channel, 7, 1, padding=3\n )\n resblock_cls = ResBlock1 if resblock == "1" else ResBlock2\n\n self.ups = nn.ModuleList()\n self.noise_convs = nn.ModuleList()\n\n channels = [\n upsample_initial_channel // (2 ** (i + 1)) for i in range(len(upsample_rates))\n ]\n stride_f0s = [\n math.prod(upsample_rates[i + 1 :]) if i + 1 < len(upsample_rates) else 1\n for i in range(len(upsample_rates))\n ]\n\n for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)):\n self.ups.append(\n weight_norm(\n nn.ConvTranspose1d(\n upsample_initial_channel // (2**i),\n channels[i],\n k,\n u,\n padding=(k - u) // 2,\n )\n )\n )\n\n self.noise_convs.append(\n nn.Conv1d(\n 1,\n channels[i],\n kernel_size=(stride_f0s[i] * 2 if stride_f0s[i] > 1 else 1),\n stride=stride_f0s[i],\n padding=(stride_f0s[i] // 2 if stride_f0s[i] > 1 else 0),\n )\n )\n\n self.resblocks = nn.ModuleList(\n [\n resblock_cls(channels[i], k, d)\n for i in range(len(self.ups))\n for k, d in zip(resblock_kernel_sizes, resblock_dilation_sizes)\n ]\n )\n\n self.conv_post = nn.Conv1d(channels[-1], 1, 7, 1, padding=3, bias=False)\n self.ups.apply(init_weights)\n\n if gin_channels != 0:\n self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1)\n\n self.upp = math.prod(upsample_rates)\n self.lrelu_slope = LRELU_SLOPE\n\n def forward(self, x, f0, g: Optional[torch.Tensor] = None):\n har_source, _, _ = self.m_source(f0, self.upp)\n har_source = har_source.transpose(1, 2)\n x = self.conv_pre(x)\n\n if g is not None:\n x = x + self.cond(g)\n\n for i, (ups, noise_convs) in enumerate(zip(self.ups, self.noise_convs)):\n x = F.leaky_relu(x, self.lrelu_slope)\n x = ups(x)\n x = x + noise_convs(har_source)\n\n xs = sum(\n [\n resblock(x)\n for j, resblock in enumerate(self.resblocks)\n if j in range(i * self.num_kernels, (i + 1) * self.num_kernels)\n ]\n )\n x = xs / self.num_kernels\n\n x = F.leaky_relu(x)\n x = torch.tanh(self.conv_post(x))\n return x\n\n def remove_weight_norm(self):\n for l in self.ups:\n remove_weight_norm(l)\n for l in self.resblocks:\n l.remove_weight_norm()\n\n def __prepare_scriptable__(self):\n for l in self.ups:\n for hook in l._forward_pre_hooks.values():\n if (\n hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"\n and hook.__class__.__name__ == "_WeightNorm"\n ):\n remove_weight_norm(l)\n for l in self.resblocks:\n for hook in l._forward_pre_hooks.values():\n if (\n hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"\n and hook.__class__.__name__ == "_WeightNorm"\n ):\n remove_weight_norm(l)\n return self\n\n ', 'vbach/lib/algorithm/residuals.py': '\nimport torch\nfrom torch import nn\nfrom torch.nn import functional as F\nfrom torch.nn.utils.weight_norm import remove_weight_norm\nfrom torch.nn.utils.parametrizations import weight_norm\nfrom typing import Optional\n\nfrom .commons import get_padding, init_weights\nfrom .modules import WaveNet\n\n\nLRELU_SLOPE = 0.1\n\n\ndef create_conv1d_layer(channels, kernel_size, dilation):\n return weight_norm(\n nn.Conv1d(\n channels,\n channels,\n kernel_size,\n 1,\n dilation=dilation,\n padding=get_padding(kernel_size, dilation),\n )\n )\n\n\ndef apply_mask(tensor, mask):\n return tensor * mask if mask is not None else tensor\n\n\nclass ResBlockBase(nn.Module):\n def __init__(self, channels, kernel_size, dilations):\n super(ResBlockBase, self).__init__()\n self.convs1 = nn.ModuleList(\n [create_conv1d_layer(channels, kernel_size, d) for d in dilations]\n )\n self.convs1.apply(init_weights)\n\n self.convs2 = nn.ModuleList(\n [create_conv1d_layer(channels, kernel_size, 1) for _ in dilations]\n )\n self.convs2.apply(init_weights)\n\n def forward(self, x, x_mask=None):\n for c1, c2 in zip(self.convs1, self.convs2):\n xt = F.leaky_relu(x, LRELU_SLOPE)\n xt = apply_mask(xt, x_mask)\n xt = F.leaky_relu(c1(xt), LRELU_SLOPE)\n xt = apply_mask(xt, x_mask)\n xt = c2(xt)\n x = xt + x\n return apply_mask(x, x_mask)\n\n def remove_weight_norm(self):\n for conv in self.convs1 + self.convs2:\n remove_weight_norm(conv)\n\n\nclass ResBlock1(ResBlockBase):\n def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)):\n super(ResBlock1, self).__init__(channels, kernel_size, dilation)\n\n\nclass ResBlock2(ResBlockBase):\n def __init__(self, channels, kernel_size=3, dilation=(1, 3)):\n super(ResBlock2, self).__init__(channels, kernel_size, dilation)\n\n\nclass Log(nn.Module):\n def forward(self, x, x_mask, reverse=False, **kwargs):\n if not reverse:\n y = torch.log(torch.clamp_min(x, 1e-5)) * x_mask\n logdet = torch.sum(-y, [1, 2])\n return y, logdet\n else:\n x = torch.exp(x) * x_mask\n return x\n\n\nclass Flip(nn.Module):\n def forward(self, x, *args, reverse=False, **kwargs):\n x = torch.flip(x, [1])\n if not reverse:\n logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device)\n return x, logdet\n else:\n return x\n\n\nclass ElementwiseAffine(nn.Module):\n def __init__(self, channels):\n super().__init__()\n self.channels = channels\n self.m = nn.Parameter(torch.zeros(channels, 1))\n self.logs = nn.Parameter(torch.zeros(channels, 1))\n\n def forward(self, x, x_mask, reverse=False, **kwargs):\n if not reverse:\n y = self.m + torch.exp(self.logs) * x\n y = y * x_mask\n logdet = torch.sum(self.logs * x_mask, [1, 2])\n return y, logdet\n else:\n x = (x - self.m) * torch.exp(-self.logs) * x_mask\n return x\n\n\nclass ResidualCouplingBlock(nn.Module):\n def __init__(\n self,\n channels,\n hidden_channels,\n kernel_size,\n dilation_rate,\n n_layers,\n n_flows=4,\n gin_channels=0,\n ):\n super(ResidualCouplingBlock, self).__init__()\n self.channels = channels\n self.hidden_channels = hidden_channels\n self.kernel_size = kernel_size\n self.dilation_rate = dilation_rate\n self.n_layers = n_layers\n self.n_flows = n_flows\n self.gin_channels = gin_channels\n\n self.flows = nn.ModuleList()\n for i in range(n_flows):\n self.flows.append(\n ResidualCouplingLayer(\n channels,\n hidden_channels,\n kernel_size,\n dilation_rate,\n n_layers,\n gin_channels=gin_channels,\n mean_only=True,\n )\n )\n self.flows.append(Flip())\n\n def forward(\n self,\n x: torch.Tensor,\n x_mask: torch.Tensor,\n g: Optional[torch.Tensor] = None,\n reverse: bool = False,\n ):\n if not reverse:\n for flow in self.flows:\n x, _ = flow(x, x_mask, g=g, reverse=reverse)\n else:\n for flow in reversed(self.flows):\n x = flow.forward(x, x_mask, g=g, reverse=reverse)\n return x\n\n def remove_weight_norm(self):\n for i in range(self.n_flows):\n self.flows[i * 2].remove_weight_norm()\n\n def __prepare_scriptable__(self):\n for i in range(self.n_flows):\n for hook in self.flows[i * 2]._forward_pre_hooks.values():\n if (\n hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"\n and hook.__class__.__name__ == "_WeightNorm"\n ):\n remove_weight_norm(self.flows[i * 2])\n\n return self\n\n\nclass ResidualCouplingLayer(nn.Module):\n def __init__(\n self,\n channels,\n hidden_channels,\n kernel_size,\n dilation_rate,\n n_layers,\n p_dropout=0,\n gin_channels=0,\n mean_only=False,\n ):\n assert channels % 2 == 0, "channels should be divisible by 2"\n super().__init__()\n self.channels = channels\n self.hidden_channels = hidden_channels\n self.kernel_size = kernel_size\n self.dilation_rate = dilation_rate\n self.n_layers = n_layers\n self.half_channels = channels // 2\n self.mean_only = mean_only\n\n self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1)\n self.enc = WaveNet(\n hidden_channels,\n kernel_size,\n dilation_rate,\n n_layers,\n p_dropout=p_dropout,\n gin_channels=gin_channels,\n )\n self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1)\n self.post.weight.data.zero_()\n self.post.bias.data.zero_()\n\n def forward(self, x, x_mask, g=None, reverse=False):\n x0, x1 = torch.split(x, [self.half_channels] * 2, 1)\n h = self.pre(x0) * x_mask\n h = self.enc(h, x_mask, g=g)\n stats = self.post(h) * x_mask\n if not self.mean_only:\n m, logs = torch.split(stats, [self.half_channels] * 2, 1)\n else:\n m = stats\n logs = torch.zeros_like(m)\n\n if not reverse:\n x1 = m + x1 * torch.exp(logs) * x_mask\n x = torch.cat([x0, x1], 1)\n logdet = torch.sum(logs, [1, 2])\n return x, logdet\n else:\n x1 = (x1 - m) * torch.exp(-logs) * x_mask\n x = torch.cat([x0, x1], 1)\n return x\n\n def remove_weight_norm(self):\n self.enc.remove_weight_norm()\n\n ', 'vbach/lib/algorithm/synthesizers.py': '\nimport torch\nfrom torch import nn\nfrom torch.nn.utils.weight_norm import remove_weight_norm\nfrom typing import Optional\n\nfrom .commons import slice_segments, rand_slice_segments\nfrom .encoders import TextEncoder, PosteriorEncoder\nfrom .generators import Generator\nfrom .nsf import GeneratorNSF\nfrom .residuals import ResidualCouplingBlock\n\n\nclass Synthesizer(nn.Module):\n def __init__(\n self,\n spec_channels,\n segment_size,\n inter_channels,\n hidden_channels,\n filter_channels,\n n_heads,\n n_layers,\n kernel_size,\n p_dropout,\n resblock,\n resblock_kernel_sizes,\n resblock_dilation_sizes,\n upsample_rates,\n upsample_initial_channel,\n upsample_kernel_sizes,\n spk_embed_dim,\n gin_channels,\n sr,\n use_f0,\n input_dim=768,\n **kwargs\n ):\n super(Synthesizer, self).__init__()\n self.spec_channels = spec_channels\n self.inter_channels = inter_channels\n self.hidden_channels = hidden_channels\n self.filter_channels = filter_channels\n self.n_heads = n_heads\n self.n_layers = n_layers\n self.kernel_size = kernel_size\n self.p_dropout = float(p_dropout)\n self.resblock = resblock\n self.resblock_kernel_sizes = resblock_kernel_sizes\n self.resblock_dilation_sizes = resblock_dilation_sizes\n self.upsample_rates = upsample_rates\n self.upsample_initial_channel = upsample_initial_channel\n self.upsample_kernel_sizes = upsample_kernel_sizes\n self.segment_size = segment_size\n self.gin_channels = gin_channels\n self.spk_embed_dim = spk_embed_dim\n self.use_f0 = use_f0\n\n self.enc_p = TextEncoder(\n inter_channels,\n hidden_channels,\n filter_channels,\n n_heads,\n n_layers,\n kernel_size,\n float(p_dropout),\n input_dim,\n f0=use_f0,\n )\n\n if use_f0:\n self.dec = GeneratorNSF(\n inter_channels,\n resblock,\n resblock_kernel_sizes,\n resblock_dilation_sizes,\n upsample_rates,\n upsample_initial_channel,\n upsample_kernel_sizes,\n gin_channels=gin_channels,\n sr=sr,\n is_half=kwargs["is_half"],\n )\n else:\n self.dec = Generator(\n inter_channels,\n resblock,\n resblock_kernel_sizes,\n resblock_dilation_sizes,\n upsample_rates,\n upsample_initial_channel,\n upsample_kernel_sizes,\n gin_channels=gin_channels,\n )\n\n self.enc_q = PosteriorEncoder(\n spec_channels,\n inter_channels,\n hidden_channels,\n 5,\n 1,\n 16,\n gin_channels=gin_channels,\n )\n self.flow = ResidualCouplingBlock(\n inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels\n )\n self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels)\n\n def remove_weight_norm(self):\n self.dec.remove_weight_norm()\n self.flow.remove_weight_norm()\n self.enc_q.remove_weight_norm()\n\n def __prepare_scriptable__(self):\n for hook in self.dec._forward_pre_hooks.values():\n if (\n hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"\n and hook.__class__.__name__ == "_WeightNorm"\n ):\n remove_weight_norm(self.dec)\n for hook in self.flow._forward_pre_hooks.values():\n if (\n hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"\n and hook.__class__.__name__ == "_WeightNorm"\n ):\n remove_weight_norm(self.flow)\n if hasattr(self, "enc_q"):\n for hook in self.enc_q._forward_pre_hooks.values():\n if (\n hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"\n and hook.__class__.__name__ == "_WeightNorm"\n ):\n remove_weight_norm(self.enc_q)\n return self\n\n @torch.jit.ignore\n def forward(\n self,\n phone: torch.Tensor,\n phone_lengths: torch.Tensor,\n pitch: Optional[torch.Tensor] = None,\n pitchf: Optional[torch.Tensor] = None,\n y: torch.Tensor = None,\n y_lengths: torch.Tensor = None,\n ds: Optional[torch.Tensor] = None,\n ):\n g = self.emb_g(ds).unsqueeze(-1)\n m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths)\n if y is not None:\n z, m_q, logs_q, y_mask = self.enc_q(y, y_lengths, g=g)\n z_p = self.flow(z, y_mask, g=g)\n z_slice, ids_slice = rand_slice_segments(z, y_lengths, self.segment_size)\n if self.use_f0:\n pitchf = slice_segments(pitchf, ids_slice, self.segment_size, 2)\n o = self.dec(z_slice, pitchf, g=g)\n else:\n o = self.dec(z_slice, g=g)\n return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q)\n else:\n return None, None, x_mask, None, (None, None, m_p, logs_p, None, None)\n\n @torch.jit.export\n def infer(\n self,\n phone: torch.Tensor,\n phone_lengths: torch.Tensor,\n pitch: Optional[torch.Tensor] = None,\n nsff0: Optional[torch.Tensor] = None,\n sid: torch.Tensor = None,\n rate: Optional[torch.Tensor] = None,\n ):\n g = self.emb_g(sid).unsqueeze(-1)\n m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths)\n z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask\n if rate is not None:\n assert isinstance(rate, torch.Tensor)\n head = int(z_p.shape[2] * (1.0 - rate.item()))\n z_p = z_p[:, :, head:]\n x_mask = x_mask[:, :, head:]\n if self.use_f0:\n nsff0 = nsff0[:, head:]\n if self.use_f0:\n z = self.flow(z_p, x_mask, g=g, reverse=True)\n o = self.dec(z * x_mask, nsff0, g=g)\n else:\n z = self.flow(z_p, x_mask, g=g, reverse=True)\n o = self.dec(z * x_mask, g=g)\n return o, x_mask, (z, z_p, m_p, logs_p)\n\n ', 'vbach/lib/algorithm/__init__.py': '\n ', 'vbach/lib/predictors/FCPE.py': '\nfrom typing import Union\n\nimport torch.nn.functional as F\nimport numpy as np\nimport torch\nimport torch.nn as nn\nfrom torch.nn.utils.parametrizations import weight_norm\nfrom torchaudio.transforms import Resample\nimport os\nimport librosa\nimport soundfile as sf\nimport torch.utils.data\nfrom librosa.filters import mel as librosa_mel_fn\nimport math\nfrom functools import partial\n\nfrom einops import rearrange, repeat\nfrom local_attention import LocalAttention\n\nos.environ["LRU_CACHE_CAPACITY"] = "3"\n\n\ndef load_wav_to_torch(full_path, target_sr=None, return_empty_on_exception=False):\n try:\n data, sample_rate = sf.read(full_path, always_2d=True)\n except Exception as error:\n print(f"An error occurred loading {full_path}: {error}")\n if return_empty_on_exception:\n return [], sample_rate or target_sr or 48000\n else:\n raise\n\n data = data[:, 0] if len(data.shape) > 1 else data\n assert len(data) > 2\n\n max_mag = (\n -np.iinfo(data.dtype).min\n if np.issubdtype(data.dtype, np.integer)\n else max(np.amax(data), -np.amin(data))\n )\n max_mag = (\n (2**31) + 1 if max_mag > (2**15) else ((2**15) + 1 if max_mag > 1.01 else 1.0)\n )\n data = torch.FloatTensor(data.astype(np.float32)) / max_mag\n\n if (torch.isinf(data) | torch.isnan(data)).any() and return_empty_on_exception:\n return [], sample_rate or target_sr or 48000\n if target_sr is not None and sample_rate != target_sr:\n data = torch.from_numpy(\n librosa.core.resample(data.numpy(), orig_sr=sample_rate, target_sr=target_sr)\n )\n sample_rate = target_sr\n\n return data, sample_rate\n\n\ndef dynamic_range_compression(x, C=1, clip_val=1e-5):\n return np.log(np.clip(x, a_min=clip_val, a_max=None) * C)\n\n\ndef dynamic_range_decompression(x, C=1):\n return np.exp(x) / C\n\n\ndef dynamic_range_compression_torch(x, C=1, clip_val=1e-5):\n return torch.log(torch.clamp(x, min=clip_val) * C)\n\n\ndef dynamic_range_decompression_torch(x, C=1):\n return torch.exp(x) / C\n\n\nclass STFT:\n def __init__(\n self,\n sr=22050,\n n_mels=80,\n n_fft=1024,\n win_size=1024,\n hop_length=256,\n fmin=20,\n fmax=11025,\n clip_val=1e-5,\n ):\n self.target_sr = sr\n self.n_mels = n_mels\n self.n_fft = n_fft\n self.win_size = win_size\n self.hop_length = hop_length\n self.fmin = fmin\n self.fmax = fmax\n self.clip_val = clip_val\n self.mel_basis = {}\n self.hann_window = {}\n\n def get_mel(self, y, keyshift=0, speed=1, center=False, train=False):\n sample_rate = self.target_sr\n n_mels = self.n_mels\n n_fft = self.n_fft\n win_size = self.win_size\n hop_length = self.hop_length\n fmin = self.fmin\n fmax = self.fmax\n clip_val = self.clip_val\n\n factor = 2 ** (keyshift / 12)\n n_fft_new = int(np.round(n_fft * factor))\n win_size_new = int(np.round(win_size * factor))\n hop_length_new = int(np.round(hop_length * speed))\n\n mel_basis = self.mel_basis if not train else {}\n hann_window = self.hann_window if not train else {}\n\n mel_basis_key = str(fmax) + "_" + str(y.device)\n if mel_basis_key not in mel_basis:\n mel = librosa_mel_fn(\n sr=sample_rate, n_fft=n_fft, n_mels=n_mels, fmin=fmin, fmax=fmax\n )\n mel_basis[mel_basis_key] = torch.from_numpy(mel).float().to(y.device)\n\n keyshift_key = str(keyshift) + "_" + str(y.device)\n if keyshift_key not in hann_window:\n hann_window[keyshift_key] = torch.hann_window(win_size_new).to(y.device)\n\n pad_left = (win_size_new - hop_length_new) // 2\n pad_right = max(\n (win_size_new - hop_length_new + 1) // 2,\n win_size_new - y.size(-1) - pad_left,\n )\n mode = "reflect" if pad_right < y.size(-1) else "constant"\n y = torch.nn.functional.pad(y.unsqueeze(1), (pad_left, pad_right), mode=mode)\n y = y.squeeze(1)\n\n spec = torch.stft(\n y,\n n_fft_new,\n hop_length=hop_length_new,\n win_length=win_size_new,\n window=hann_window[keyshift_key],\n center=center,\n pad_mode="reflect",\n normalized=False,\n onesided=True,\n return_complex=True,\n )\n spec = torch.sqrt(spec.real.pow(2) + spec.imag.pow(2) + (1e-9))\n\n if keyshift != 0:\n size = n_fft // 2 + 1\n resize = spec.size(1)\n spec = (\n F.pad(spec, (0, 0, 0, size - resize))\n if resize < size\n else spec[:, :size, :]\n )\n spec = spec * win_size / win_size_new\n spec = torch.matmul(mel_basis[mel_basis_key], spec)\n spec = dynamic_range_compression_torch(spec, clip_val=clip_val)\n return spec\n\n def __call__(self, audiopath):\n audio, sr = load_wav_to_torch(audiopath, target_sr=self.target_sr)\n spect = self.get_mel(audio.unsqueeze(0)).squeeze(0)\n return spect\n\n\nstft = STFT()\n\n\ndef softmax_kernel(\n data, *, projection_matrix, is_query, normalize_data=True, eps=1e-4, device=None\n):\n b, h, *_ = data.shape\n\n data_normalizer = (data.shape[-1] ** -0.25) if normalize_data else 1.0\n\n ratio = projection_matrix.shape[0] ** -0.5\n projection = repeat(projection_matrix, "j d -> b h j d", b=b, h=h)\n projection = projection.type_as(data)\n data_dash = torch.einsum("...id,...jd->...ij", (data_normalizer * data), projection)\n\n diag_data = data**2\n diag_data = torch.sum(diag_data, dim=-1)\n diag_data = (diag_data / 2.0) * (data_normalizer**2)\n diag_data = diag_data.unsqueeze(dim=-1)\n\n if is_query:\n data_dash = ratio * (\n torch.exp(\n data_dash - diag_data - torch.max(data_dash, dim=-1, keepdim=True).values\n )\n + eps\n )\n else:\n data_dash = ratio * (torch.exp(data_dash - diag_data + eps))\n\n return data_dash.type_as(data)\n\n\ndef orthogonal_matrix_chunk(cols, qr_uniform_q=False, device=None):\n unstructured_block = torch.randn((cols, cols), device=device)\n q, r = torch.linalg.qr(unstructured_block.cpu(), mode="reduced")\n q, r = map(lambda t: t.to(device), (q, r))\n\n if qr_uniform_q:\n d = torch.diag(r, 0)\n q *= d.sign()\n return q.t()\n\n\ndef exists(val):\n return val is not None\n\n\ndef empty(tensor):\n return tensor.numel() == 0\n\n\ndef default(val, d):\n return val if exists(val) else d\n\n\ndef cast_tuple(val):\n return (val,) if not isinstance(val, tuple) else val\n\n\nclass PCmer(nn.Module):\n def __init__(\n self,\n num_layers,\n num_heads,\n dim_model,\n dim_keys,\n dim_values,\n residual_dropout,\n attention_dropout,\n ):\n super().__init__()\n self.num_layers = num_layers\n self.num_heads = num_heads\n self.dim_model = dim_model\n self.dim_values = dim_values\n self.dim_keys = dim_keys\n self.residual_dropout = residual_dropout\n self.attention_dropout = attention_dropout\n\n self._layers = nn.ModuleList([_EncoderLayer(self) for _ in range(num_layers)])\n\n def forward(self, phone, mask=None):\n for layer in self._layers:\n phone = layer(phone, mask)\n return phone\n\n\nclass _EncoderLayer(nn.Module):\n def __init__(self, parent: PCmer):\n super().__init__()\n self.conformer = ConformerConvModule(parent.dim_model)\n self.norm = nn.LayerNorm(parent.dim_model)\n self.dropout = nn.Dropout(parent.residual_dropout)\n self.attn = SelfAttention(\n dim=parent.dim_model, heads=parent.num_heads, causal=False\n )\n\n def forward(self, phone, mask=None):\n phone = phone + (self.attn(self.norm(phone), mask=mask))\n phone = phone + (self.conformer(phone))\n return phone\n\n\ndef calc_same_padding(kernel_size):\n pad = kernel_size // 2\n return (pad, pad - (kernel_size + 1) % 2)\n\n\nclass Swish(nn.Module):\n def forward(self, x):\n return x * x.sigmoid()\n\n\nclass Transpose(nn.Module):\n def __init__(self, dims):\n super().__init__()\n assert len(dims) == 2, "dims must be a tuple of two dimensions"\n self.dims = dims\n\n def forward(self, x):\n return x.transpose(*self.dims)\n\n\nclass GLU(nn.Module):\n def __init__(self, dim):\n super().__init__()\n self.dim = dim\n\n def forward(self, x):\n out, gate = x.chunk(2, dim=self.dim)\n return out * gate.sigmoid()\n\n\nclass DepthWiseConv1d(nn.Module):\n def __init__(self, chan_in, chan_out, kernel_size, padding):\n super().__init__()\n self.padding = padding\n self.conv = nn.Conv1d(chan_in, chan_out, kernel_size, groups=chan_in)\n\n def forward(self, x):\n x = F.pad(x, self.padding)\n return self.conv(x)\n\n\nclass ConformerConvModule(nn.Module):\n def __init__(\n self, dim, causal=False, expansion_factor=2, kernel_size=31, dropout=0.0\n ):\n super().__init__()\n\n inner_dim = dim * expansion_factor\n padding = calc_same_padding(kernel_size) if not causal else (kernel_size - 1, 0)\n\n self.net = nn.Sequential(\n nn.LayerNorm(dim),\n Transpose((1, 2)),\n nn.Conv1d(dim, inner_dim * 2, 1),\n GLU(dim=1),\n DepthWiseConv1d(\n inner_dim, inner_dim, kernel_size=kernel_size, padding=padding\n ),\n Swish(),\n nn.Conv1d(inner_dim, dim, 1),\n Transpose((1, 2)),\n nn.Dropout(dropout),\n )\n\n def forward(self, x):\n return self.net(x)\n\n\ndef linear_attention(q, k, v):\n if v is None:\n out = torch.einsum("...ed,...nd->...ne", k, q)\n return out\n else:\n k_cumsum = k.sum(dim=-2)\n D_inv = 1.0 / (torch.einsum("...nd,...d->...n", q, k_cumsum.type_as(q)) + 1e-8)\n context = torch.einsum("...nd,...ne->...de", k, v)\n out = torch.einsum("...de,...nd,...n->...ne", context, q, D_inv)\n return out\n\n\ndef gaussian_orthogonal_random_matrix(\n nb_rows, nb_columns, scaling=0, qr_uniform_q=False, device=None\n):\n nb_full_blocks = int(nb_rows / nb_columns)\n block_list = []\n\n for _ in range(nb_full_blocks):\n q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q, device=device)\n block_list.append(q)\n\n remaining_rows = nb_rows - nb_full_blocks * nb_columns\n if remaining_rows > 0:\n q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q, device=device)\n block_list.append(q[:remaining_rows])\n\n final_matrix = torch.cat(block_list)\n\n if scaling == 0:\n multiplier = torch.randn((nb_rows, nb_columns), device=device).norm(dim=1)\n elif scaling == 1:\n multiplier = math.sqrt((float(nb_columns))) * torch.ones(\n (nb_rows,), device=device\n )\n else:\n raise ValueError(f"Invalid scaling {scaling}")\n\n return torch.diag(multiplier) @ final_matrix\n\n\nclass FastAttention(nn.Module):\n def __init__(\n self,\n dim_heads,\n nb_features=None,\n ortho_scaling=0,\n causal=False,\n generalized_attention=False,\n kernel_fn=nn.ReLU(),\n qr_uniform_q=False,\n no_projection=False,\n ):\n super().__init__()\n nb_features = default(nb_features, int(dim_heads * math.log(dim_heads)))\n\n self.dim_heads = dim_heads\n self.nb_features = nb_features\n self.ortho_scaling = ortho_scaling\n\n self.create_projection = partial(\n gaussian_orthogonal_random_matrix,\n nb_rows=self.nb_features,\n nb_columns=dim_heads,\n scaling=ortho_scaling,\n qr_uniform_q=qr_uniform_q,\n )\n projection_matrix = self.create_projection()\n self.register_buffer("projection_matrix", projection_matrix)\n\n self.generalized_attention = generalized_attention\n self.kernel_fn = kernel_fn\n self.no_projection = no_projection\n self.causal = causal\n\n @torch.no_grad()\n def redraw_projection_matrix(self):\n projections = self.create_projection()\n self.projection_matrix.copy_(projections)\n del projections\n\n def forward(self, q, k, v):\n device = q.device\n\n if self.no_projection:\n q = q.softmax(dim=-1)\n k = torch.exp(k) if self.causal else k.softmax(dim=-2)\n else:\n create_kernel = partial(\n softmax_kernel, projection_matrix=self.projection_matrix, device=device\n )\n q = create_kernel(q, is_query=True)\n k = create_kernel(k, is_query=False)\n\n attn_fn = linear_attention if not self.causal else self.causal_linear_fn\n\n if v is None:\n out = attn_fn(q, k, None)\n return out\n else:\n out = attn_fn(q, k, v)\n return out\n\n\nclass SelfAttention(nn.Module):\n def __init__(\n self,\n dim,\n causal=False,\n heads=8,\n dim_head=64,\n local_heads=0,\n local_window_size=256,\n nb_features=None,\n feature_redraw_interval=1000,\n generalized_attention=False,\n kernel_fn=nn.ReLU(),\n qr_uniform_q=False,\n dropout=0.0,\n no_projection=False,\n ):\n super().__init__()\n assert dim % heads == 0, "dimension must be divisible by number of heads"\n dim_head = default(dim_head, dim // heads)\n inner_dim = dim_head * heads\n self.fast_attention = FastAttention(\n dim_head,\n nb_features,\n causal=causal,\n generalized_attention=generalized_attention,\n kernel_fn=kernel_fn,\n qr_uniform_q=qr_uniform_q,\n no_projection=no_projection,\n )\n\n self.heads = heads\n self.global_heads = heads - local_heads\n self.local_attn = (\n LocalAttention(\n window_size=local_window_size,\n causal=causal,\n autopad=True,\n dropout=dropout,\n look_forward=int(not causal),\n rel_pos_emb_config=(dim_head, local_heads),\n )\n if local_heads > 0\n else None\n )\n\n self.to_q = nn.Linear(dim, inner_dim)\n self.to_k = nn.Linear(dim, inner_dim)\n self.to_v = nn.Linear(dim, inner_dim)\n self.to_out = nn.Linear(inner_dim, dim)\n self.dropout = nn.Dropout(dropout)\n\n @torch.no_grad()\n def redraw_projection_matrix(self):\n self.fast_attention.redraw_projection_matrix()\n\n def forward(\n self,\n x,\n context=None,\n mask=None,\n context_mask=None,\n name=None,\n inference=False,\n **kwargs,\n ):\n _, _, _, h, gh = *x.shape, self.heads, self.global_heads\n\n cross_attend = exists(context)\n context = default(context, x)\n context_mask = default(context_mask, mask) if not cross_attend else context_mask\n q, k, v = self.to_q(x), self.to_k(context), self.to_v(context)\n\n q, k, v = map(lambda t: rearrange(t, "b n (h d) -> b h n d", h=h), (q, k, v))\n (q, lq), (k, lk), (v, lv) = map(lambda t: (t[:, :gh], t[:, gh:]), (q, k, v))\n\n attn_outs = []\n if not empty(q):\n if exists(context_mask):\n global_mask = context_mask[:, None, :, None]\n v.masked_fill_(~global_mask, 0.0)\n if cross_attend:\n pass\n else:\n out = self.fast_attention(q, k, v)\n attn_outs.append(out)\n\n if not empty(lq):\n assert (\n not cross_attend\n ), "local attention is not compatible with cross attention"\n out = self.local_attn(lq, lk, lv, input_mask=mask)\n attn_outs.append(out)\n\n out = torch.cat(attn_outs, dim=1)\n out = rearrange(out, "b h n d -> b n (h d)")\n out = self.to_out(out)\n return self.dropout(out)\n\n\ndef l2_regularization(model, l2_alpha):\n l2_loss = []\n for module in model.modules():\n if type(module) is nn.Conv2d:\n l2_loss.append((module.weight**2).sum() / 2.0)\n return l2_alpha * sum(l2_loss)\n\n\nclass FCPE(nn.Module):\n def __init__(\n self,\n input_channel=128,\n out_dims=360,\n n_layers=12,\n n_chans=512,\n use_siren=False,\n use_full=False,\n loss_mse_scale=10,\n loss_l2_regularization=False,\n loss_l2_regularization_scale=1,\n loss_grad1_mse=False,\n loss_grad1_mse_scale=1,\n f0_max=1975.5,\n f0_min=32.70,\n confidence=False,\n threshold=0.05,\n use_input_conv=True,\n ):\n super().__init__()\n if use_siren is True:\n raise ValueError("Siren is not supported yet.")\n if use_full is True:\n raise ValueError("Full model is not supported yet.")\n\n self.loss_mse_scale = loss_mse_scale if (loss_mse_scale is not None) else 10\n self.loss_l2_regularization = (\n loss_l2_regularization if (loss_l2_regularization is not None) else False\n )\n self.loss_l2_regularization_scale = (\n loss_l2_regularization_scale\n if (loss_l2_regularization_scale is not None)\n else 1\n )\n self.loss_grad1_mse = loss_grad1_mse if (loss_grad1_mse is not None) else False\n self.loss_grad1_mse_scale = (\n loss_grad1_mse_scale if (loss_grad1_mse_scale is not None) else 1\n )\n self.f0_max = f0_max if (f0_max is not None) else 1975.5\n self.f0_min = f0_min if (f0_min is not None) else 32.70\n self.confidence = confidence if (confidence is not None) else False\n self.threshold = threshold if (threshold is not None) else 0.05\n self.use_input_conv = use_input_conv if (use_input_conv is not None) else True\n\n self.cent_table_b = torch.Tensor(\n np.linspace(\n self.f0_to_cent(torch.Tensor([f0_min]))[0],\n self.f0_to_cent(torch.Tensor([f0_max]))[0],\n out_dims,\n )\n )\n self.register_buffer("cent_table", self.cent_table_b)\n\n _leaky = nn.LeakyReLU()\n self.stack = nn.Sequential(\n nn.Conv1d(input_channel, n_chans, 3, 1, 1),\n nn.GroupNorm(4, n_chans),\n _leaky,\n nn.Conv1d(n_chans, n_chans, 3, 1, 1),\n )\n\n self.decoder = PCmer(\n num_layers=n_layers,\n num_heads=8,\n dim_model=n_chans,\n dim_keys=n_chans,\n dim_values=n_chans,\n residual_dropout=0.1,\n attention_dropout=0.1,\n )\n self.norm = nn.LayerNorm(n_chans)\n\n self.n_out = out_dims\n self.dense_out = weight_norm(nn.Linear(n_chans, self.n_out))\n\n def forward(\n self, mel, infer=True, gt_f0=None, return_hz_f0=False, cdecoder="local_argmax"\n ):\n if cdecoder == "argmax":\n self.cdecoder = self.cents_decoder\n elif cdecoder == "local_argmax":\n self.cdecoder = self.cents_local_decoder\n\n x = (\n self.stack(mel.transpose(1, 2)).transpose(1, 2)\n if self.use_input_conv\n else mel\n )\n x = self.decoder(x)\n x = self.norm(x)\n x = self.dense_out(x)\n x = torch.sigmoid(x)\n\n if not infer:\n gt_cent_f0 = self.f0_to_cent(gt_f0)\n gt_cent_f0 = self.gaussian_blurred_cent(gt_cent_f0)\n loss_all = self.loss_mse_scale * F.binary_cross_entropy(x, gt_cent_f0)\n if self.loss_l2_regularization:\n loss_all = loss_all + l2_regularization(\n model=self, l2_alpha=self.loss_l2_regularization_scale\n )\n x = loss_all\n if infer:\n x = self.cdecoder(x)\n x = self.cent_to_f0(x)\n x = (1 + x / 700).log() if not return_hz_f0 else x\n\n return x\n\n def cents_decoder(self, y, mask=True):\n B, N, _ = y.size()\n ci = self.cent_table[None, None, :].expand(B, N, -1)\n rtn = torch.sum(ci * y, dim=-1, keepdim=True) / torch.sum(y, dim=-1, keepdim=True)\n if mask:\n confident = torch.max(y, dim=-1, keepdim=True)[0]\n confident_mask = torch.ones_like(confident)\n confident_mask[confident <= self.threshold] = float("-INF")\n rtn = rtn * confident_mask\n return (rtn, confident) if self.confidence else rtn\n\n def cents_local_decoder(self, y, mask=True):\n B, N, _ = y.size()\n ci = self.cent_table[None, None, :].expand(B, N, -1)\n confident, max_index = torch.max(y, dim=-1, keepdim=True)\n local_argmax_index = torch.arange(0, 9).to(max_index.device) + (max_index - 4)\n local_argmax_index = torch.clamp(local_argmax_index, 0, self.n_out - 1)\n ci_l = torch.gather(ci, -1, local_argmax_index)\n y_l = torch.gather(y, -1, local_argmax_index)\n rtn = torch.sum(ci_l * y_l, dim=-1, keepdim=True) / torch.sum(\n y_l, dim=-1, keepdim=True\n )\n if mask:\n confident_mask = torch.ones_like(confident)\n confident_mask[confident <= self.threshold] = float("-INF")\n rtn = rtn * confident_mask\n return (rtn, confident) if self.confidence else rtn\n\n def cent_to_f0(self, cent):\n return 10.0 * 2 ** (cent / 1200.0)\n\n def f0_to_cent(self, f0):\n return 1200.0 * torch.log2(f0 / 10.0)\n\n def gaussian_blurred_cent(self, cents):\n mask = (cents > 0.1) & (cents < (1200.0 * np.log2(self.f0_max / 10.0)))\n B, N, _ = cents.size()\n ci = self.cent_table[None, None, :].expand(B, N, -1)\n return torch.exp(-torch.square(ci - cents) / 1250) * mask.float()\n\n\nclass FCPEInfer:\n def __init__(self, model_path, device=None, dtype=torch.float32):\n if device is None:\n device = "cuda" if torch.cuda.is_available() else "cpu"\n self.device = device\n ckpt = torch.load(model_path, map_location=torch.device(self.device))\n self.args = DotDict(ckpt["config"])\n self.dtype = dtype\n model = FCPE(\n input_channel=self.args.model.input_channel,\n out_dims=self.args.model.out_dims,\n n_layers=self.args.model.n_layers,\n n_chans=self.args.model.n_chans,\n use_siren=self.args.model.use_siren,\n use_full=self.args.model.use_full,\n loss_mse_scale=self.args.loss.loss_mse_scale,\n loss_l2_regularization=self.args.loss.loss_l2_regularization,\n loss_l2_regularization_scale=self.args.loss.loss_l2_regularization_scale,\n loss_grad1_mse=self.args.loss.loss_grad1_mse,\n loss_grad1_mse_scale=self.args.loss.loss_grad1_mse_scale,\n f0_max=self.args.model.f0_max,\n f0_min=self.args.model.f0_min,\n confidence=self.args.model.confidence,\n )\n model.to(self.device).to(self.dtype)\n model.load_state_dict(ckpt["model"])\n model.eval()\n self.model = model\n self.wav2mel = Wav2Mel(self.args, dtype=self.dtype, device=self.device)\n\n @torch.no_grad()\n def __call__(self, audio, sr, threshold=0.05):\n self.model.threshold = threshold\n audio = audio[None, :]\n mel = self.wav2mel(audio=audio, sample_rate=sr).to(self.dtype)\n f0 = self.model(mel=mel, infer=True, return_hz_f0=True)\n return f0\n\n\nclass Wav2Mel:\n def __init__(self, args, device=None, dtype=torch.float32):\n self.sample_rate = args.mel.sampling_rate\n self.hop_size = args.mel.hop_size\n if device is None:\n device = "cuda" if torch.cuda.is_available() else "cpu"\n self.device = device\n self.dtype = dtype\n self.stft = STFT(\n args.mel.sampling_rate,\n args.mel.num_mels,\n args.mel.n_fft,\n args.mel.win_size,\n args.mel.hop_size,\n args.mel.fmin,\n args.mel.fmax,\n )\n self.resample_kernel = {}\n\n def extract_nvstft(self, audio, keyshift=0, train=False):\n mel = self.stft.get_mel(audio, keyshift=keyshift, train=train).transpose(1, 2)\n return mel\n\n def extract_mel(self, audio, sample_rate, keyshift=0, train=False):\n audio = audio.to(self.dtype).to(self.device)\n if sample_rate == self.sample_rate:\n audio_res = audio\n else:\n key_str = str(sample_rate)\n if key_str not in self.resample_kernel:\n self.resample_kernel[key_str] = Resample(\n sample_rate, self.sample_rate, lowpass_filter_width=128\n )\n self.resample_kernel[key_str] = (\n self.resample_kernel[key_str].to(self.dtype).to(self.device)\n )\n audio_res = self.resample_kernel[key_str](audio)\n\n mel = self.extract_nvstft(audio_res, keyshift=keyshift, train=train)\n n_frames = int(audio.shape[1] // self.hop_size) + 1\n mel = torch.cat((mel, mel[:, -1:, :]), 1) if n_frames > int(mel.shape[1]) else mel\n mel = mel[:, :n_frames, :] if n_frames < int(mel.shape[1]) else mel\n return mel\n\n def __call__(self, audio, sample_rate, keyshift=0, train=False):\n return self.extract_mel(audio, sample_rate, keyshift=keyshift, train=train)\n\n\nclass DotDict(dict):\n def __getattr__(*args):\n val = dict.get(*args)\n return DotDict(val) if type(val) is dict else val\n\n __setattr__ = dict.__setitem__\n __delattr__ = dict.__delitem__\n\n\nclass F0Predictor(object):\n def compute_f0(self, wav, p_len):\n pass\n\n def compute_f0_uv(self, wav, p_len):\n pass\n\n\nclass FCPEF0Predictor(F0Predictor):\n def __init__(\n self,\n model_path,\n hop_length=512,\n f0_min=50,\n f0_max=1100,\n dtype=torch.float32,\n device=None,\n sample_rate=44100,\n threshold=0.05,\n ):\n self.fcpe = FCPEInfer(model_path, device=device, dtype=dtype)\n self.hop_length = hop_length\n self.f0_min = f0_min\n self.f0_max = f0_max\n self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")\n self.threshold = threshold\n self.sample_rate = sample_rate\n self.dtype = dtype\n self.name = "fcpe"\n\n def repeat_expand(\n self,\n content: Union[torch.Tensor, np.ndarray],\n target_len: int,\n mode: str = "nearest",\n ):\n ndim = content.ndim\n content = (\n content[None, None] if ndim == 1 else content[None] if ndim == 2 else content\n )\n assert content.ndim == 3\n is_np = isinstance(content, np.ndarray)\n content = torch.from_numpy(content) if is_np else content\n results = torch.nn.functional.interpolate(content, size=target_len, mode=mode)\n results = results.numpy() if is_np else results\n return results[0, 0] if ndim == 1 else results[0] if ndim == 2 else results\n\n def post_process(self, x, sample_rate, f0, pad_to):\n f0 = (\n torch.from_numpy(f0).float().to(x.device)\n if isinstance(f0, np.ndarray)\n else f0\n )\n f0 = self.repeat_expand(f0, pad_to) if pad_to is not None else f0\n\n vuv_vector = torch.zeros_like(f0)\n vuv_vector[f0 > 0.0] = 1.0\n vuv_vector[f0 <= 0.0] = 0.0\n\n nzindex = torch.nonzero(f0).squeeze()\n f0 = torch.index_select(f0, dim=0, index=nzindex).cpu().numpy()\n time_org = self.hop_length / sample_rate * nzindex.cpu().numpy()\n time_frame = np.arange(pad_to) * self.hop_length / sample_rate\n\n vuv_vector = F.interpolate(vuv_vector[None, None, :], size=pad_to)[0][0]\n\n if f0.shape[0] <= 0:\n return np.zeros(pad_to), vuv_vector.cpu().numpy()\n if f0.shape[0] == 1:\n return np.ones(pad_to) * f0[0], vuv_vector.cpu().numpy()\n\n f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1])\n return f0, vuv_vector.cpu().numpy()\n\n def compute_f0(self, wav, p_len=None):\n x = torch.FloatTensor(wav).to(self.dtype).to(self.device)\n p_len = x.shape[0] // self.hop_length if p_len is None else p_len\n f0 = self.fcpe(x, sr=self.sample_rate, threshold=self.threshold)[0, :, 0]\n if torch.all(f0 == 0):\n return f0.cpu().numpy() if p_len is None else np.zeros(p_len), (\n f0.cpu().numpy() if p_len is None else np.zeros(p_len)\n )\n return self.post_process(x, self.sample_rate, f0, p_len)[0]\n\n def compute_f0_uv(self, wav, p_len=None):\n x = torch.FloatTensor(wav).to(self.dtype).to(self.device)\n p_len = x.shape[0] // self.hop_length if p_len is None else p_len\n f0 = self.fcpe(x, sr=self.sample_rate, threshold=self.threshold)[0, :, 0]\n if torch.all(f0 == 0):\n return f0.cpu().numpy() if p_len is None else np.zeros(p_len), (\n f0.cpu().numpy() if p_len is None else np.zeros(p_len)\n )\n return self.post_process(x, self.sample_rate, f0, p_len)\n\n', 'vbach/lib/predictors/RMVPE.py': '\nimport torch\nimport numpy as np\nimport torch.nn as nn\nimport torch.nn.functional as F\nfrom librosa.filters import mel\nfrom scipy.signal import get_window\nfrom librosa.util import pad_center, tiny, normalize\n\n\ndef window_sumsquare(\n window,\n n_frames,\n hop_length=200,\n win_length=800,\n n_fft=800,\n dtype=np.float32,\n norm=None,\n):\n if win_length is None:\n win_length = n_fft\n\n n = n_fft + hop_length * (n_frames - 1)\n x = np.zeros(n, dtype=dtype)\n\n win_sq = get_window(window, win_length, fftbins=True)\n win_sq = normalize(win_sq, norm=norm) ** 2\n win_sq = pad_center(win_sq, n_fft)\n\n for i in range(n_frames):\n sample = i * hop_length\n x[sample : min(n, sample + n_fft)] += win_sq[: max(0, min(n_fft, n - sample))]\n return x\n\n\nclass STFT(nn.Module):\n def __init__(\n self, filter_length=1024, hop_length=512, win_length=None, window="hann"\n ):\n super(STFT, self).__init__()\n self.filter_length = filter_length\n self.hop_length = hop_length\n self.win_length = win_length if win_length else filter_length\n self.window = window\n self.pad_amount = int(self.filter_length / 2)\n scale = self.filter_length / self.hop_length\n fourier_basis = np.fft.fft(np.eye(self.filter_length))\n\n cutoff = int((self.filter_length / 2 + 1))\n fourier_basis = np.vstack(\n [np.real(fourier_basis[:cutoff, :]), np.imag(fourier_basis[:cutoff, :])]\n )\n forward_basis = torch.FloatTensor(fourier_basis[:, None, :])\n inverse_basis = torch.FloatTensor(\n np.linalg.pinv(scale * fourier_basis).T[:, None, :]\n )\n\n assert filter_length >= self.win_length\n fft_window = get_window(window, self.win_length, fftbins=True)\n fft_window = pad_center(fft_window, size=filter_length)\n fft_window = torch.from_numpy(fft_window).float()\n\n forward_basis *= fft_window\n inverse_basis *= fft_window\n\n self.register_buffer("forward_basis", forward_basis.float())\n self.register_buffer("inverse_basis", inverse_basis.float())\n\n def transform(self, input_data):\n num_batches = input_data.shape[0]\n num_samples = input_data.shape[-1]\n\n input_data = input_data.view(num_batches, 1, num_samples)\n input_data = F.pad(\n input_data.unsqueeze(1),\n (self.pad_amount, self.pad_amount, 0, 0, 0, 0),\n mode="reflect",\n ).squeeze(1)\n forward_transform = F.conv1d(\n input_data, self.forward_basis, stride=self.hop_length, padding=0\n )\n\n cutoff = int((self.filter_length / 2) + 1)\n real_part = forward_transform[:, :cutoff, :]\n imag_part = forward_transform[:, cutoff:, :]\n return torch.sqrt(real_part**2 + imag_part**2)\n\n def inverse(self, magnitude, phase):\n recombine_magnitude_phase = torch.cat(\n [magnitude * torch.cos(phase), magnitude * torch.sin(phase)], dim=1\n )\n inverse_transform = F.conv_transpose1d(\n recombine_magnitude_phase,\n self.inverse_basis,\n stride=self.hop_length,\n padding=0,\n )\n\n if self.window is not None:\n window_sum = window_sumsquare(\n self.window,\n magnitude.size(-1),\n hop_length=self.hop_length,\n win_length=self.win_length,\n n_fft=self.filter_length,\n dtype=np.float32,\n )\n approx_nonzero_indices = torch.from_numpy(\n np.where(window_sum > tiny(window_sum))[0]\n )\n window_sum = torch.from_numpy(window_sum).to(inverse_transform.device)\n inverse_transform[:, :, approx_nonzero_indices] /= window_sum[\n approx_nonzero_indices\n ]\n inverse_transform *= float(self.filter_length) / self.hop_length\n\n inverse_transform = inverse_transform[..., self.pad_amount :]\n inverse_transform = inverse_transform[..., : self.num_samples]\n return inverse_transform.squeeze(1)\n\n def forward(self, input_data):\n self.magnitude, self.phase = self.transform(input_data)\n return self.inverse(self.magnitude, self.phase)\n\n\nclass BiGRU(nn.Module):\n def __init__(self, input_features, hidden_features, num_layers):\n super(BiGRU, self).__init__()\n self.gru = nn.GRU(\n input_features,\n hidden_features,\n num_layers=num_layers,\n batch_first=True,\n bidirectional=True,\n )\n\n def forward(self, x):\n return self.gru(x)[0]\n\n\nclass ConvBlockRes(nn.Module):\n def __init__(self, in_channels, out_channels, momentum=0.01):\n super(ConvBlockRes, self).__init__()\n self.conv = nn.Sequential(\n nn.Conv2d(\n in_channels=in_channels,\n out_channels=out_channels,\n kernel_size=(3, 3),\n stride=(1, 1),\n padding=(1, 1),\n bias=False,\n ),\n nn.BatchNorm2d(out_channels, momentum=momentum),\n nn.ReLU(),\n nn.Conv2d(\n in_channels=out_channels,\n out_channels=out_channels,\n kernel_size=(3, 3),\n stride=(1, 1),\n padding=(1, 1),\n bias=False,\n ),\n nn.BatchNorm2d(out_channels, momentum=momentum),\n nn.ReLU(),\n )\n self.shortcut = (\n nn.Conv2d(in_channels, out_channels, (1, 1))\n if in_channels != out_channels\n else None\n )\n\n def forward(self, x):\n out = self.conv(x)\n if self.shortcut is not None:\n x = self.shortcut(x)\n return out + x\n\n\nclass ResEncoderBlock(nn.Module):\n def __init__(self, in_channels, out_channels, kernel_size, n_blocks=1, momentum=0.01):\n super(ResEncoderBlock, self).__init__()\n self.conv = nn.ModuleList(\n [\n ConvBlockRes(\n in_channels if i == 0 else out_channels, out_channels, momentum\n )\n for i in range(n_blocks)\n ]\n )\n self.pool = (\n nn.AvgPool2d(kernel_size=kernel_size) if kernel_size is not None else None\n )\n\n def forward(self, x):\n for conv in self.conv:\n x = conv(x)\n pooled = self.pool(x) if self.pool is not None else x\n return pooled, x\n\n\nclass Encoder(nn.Module):\n def __init__(\n self,\n in_channels,\n in_size,\n n_encoders,\n kernel_size,\n n_blocks,\n out_channels=16,\n momentum=0.01,\n ):\n super(Encoder, self).__init__()\n self.bn = nn.BatchNorm2d(in_channels, momentum=momentum)\n self.layers = nn.ModuleList()\n self.latent_channels = []\n for _ in range(n_encoders):\n self.layers.append(\n ResEncoderBlock(\n in_channels, out_channels, kernel_size, n_blocks, momentum=momentum\n )\n )\n self.latent_channels.append([out_channels, in_size])\n in_channels = out_channels\n out_channels *= 2\n in_size //= 2\n self.out_size = in_size\n self.out_channel = out_channels\n\n def forward(self, x):\n concat_tensors = []\n x = self.bn(x)\n for layer in self.layers:\n x, pooled = layer(x)\n concat_tensors.append(pooled)\n return x, concat_tensors\n\n\nclass Intermediate(nn.Module):\n def __init__(self, in_channels, out_channels, n_inters, n_blocks, momentum=0.01):\n super(Intermediate, self).__init__()\n self.layers = nn.ModuleList(\n [\n ResEncoderBlock(\n in_channels if i == 0 else out_channels,\n out_channels,\n None,\n n_blocks,\n momentum,\n )\n for i in range(n_inters)\n ]\n )\n\n def forward(self, x):\n for layer in self.layers:\n _, x = layer(x)\n return x\n\n\nclass ResDecoderBlock(nn.Module):\n def __init__(self, in_channels, out_channels, stride, n_blocks=1, momentum=0.01):\n super(ResDecoderBlock, self).__init__()\n out_padding = (0, 1) if stride == (1, 2) else (1, 1)\n self.conv1 = nn.Sequential(\n nn.ConvTranspose2d(\n in_channels=in_channels,\n out_channels=out_channels,\n kernel_size=(3, 3),\n stride=stride,\n padding=(1, 1),\n output_padding=out_padding,\n bias=False,\n ),\n nn.BatchNorm2d(out_channels, momentum=momentum),\n nn.ReLU(),\n )\n self.conv2 = nn.ModuleList(\n [\n ConvBlockRes(\n out_channels * 2 if i == 0 else out_channels, out_channels, momentum\n )\n for i in range(n_blocks)\n ]\n )\n\n def forward(self, x, concat_tensor):\n x = self.conv1(x)\n x = torch.cat((x, concat_tensor), dim=1)\n for conv in self.conv2:\n x = conv(x)\n return x\n\n\nclass Decoder(nn.Module):\n def __init__(self, in_channels, n_decoders, stride, n_blocks, momentum=0.01):\n super(Decoder, self).__init__()\n self.layers = nn.ModuleList()\n for _ in range(n_decoders):\n out_channels = in_channels // 2\n self.layers.append(\n ResDecoderBlock(in_channels, out_channels, stride, n_blocks, momentum)\n )\n in_channels = out_channels\n\n def forward(self, x, concat_tensors):\n for layer, concat_tensor in zip(self.layers, reversed(concat_tensors)):\n x = layer(x, concat_tensor)\n return x\n\n\nclass DeepUnet(nn.Module):\n def __init__(\n self,\n kernel_size,\n n_blocks,\n en_de_layers=5,\n inter_layers=4,\n in_channels=1,\n en_out_channels=16,\n ):\n super(DeepUnet, self).__init__()\n self.encoder = Encoder(\n in_channels, 128, en_de_layers, kernel_size, n_blocks, en_out_channels\n )\n self.intermediate = Intermediate(\n self.encoder.out_channel // 2,\n self.encoder.out_channel,\n inter_layers,\n n_blocks,\n )\n self.decoder = Decoder(\n self.encoder.out_channel, en_de_layers, kernel_size, n_blocks\n )\n\n def forward(self, x):\n x, concat_tensors = self.encoder(x)\n x = self.intermediate(x)\n return self.decoder(x, concat_tensors)\n\n\nclass E2E(nn.Module):\n def __init__(\n self,\n n_blocks,\n n_gru,\n kernel_size,\n en_de_layers=5,\n inter_layers=4,\n in_channels=1,\n en_out_channels=16,\n ):\n super(E2E, self).__init__()\n self.unet = DeepUnet(\n kernel_size,\n n_blocks,\n en_de_layers,\n inter_layers,\n in_channels,\n en_out_channels,\n )\n self.cnn = nn.Conv2d(en_out_channels, 3, (3, 3), padding=(1, 1))\n if n_gru:\n self.fc = nn.Sequential(\n BiGRU(3 * 128, 256, n_gru),\n nn.Linear(512, 360),\n nn.Dropout(0.25),\n nn.Sigmoid(),\n )\n else:\n self.fc = nn.Sequential(\n nn.Linear(3 * nn.N_MELS, nn.N_CLASS), nn.Dropout(0.25), nn.Sigmoid()\n )\n\n def forward(self, mel):\n mel = mel.transpose(-1, -2).unsqueeze(1)\n x = self.cnn(self.unet(mel)).transpose(1, 2).flatten(-2)\n return self.fc(x)\n\n\nclass MelSpectrogram(nn.Module):\n def __init__(\n self,\n is_half,\n n_mel_channels,\n sample_rate,\n win_length,\n hop_length,\n n_fft=None,\n mel_fmin=0,\n mel_fmax=None,\n clamp=1e-5,\n ):\n super(MelSpectrogram, self).__init__()\n n_fft = win_length if n_fft is None else n_fft\n self.hann_window = {}\n mel_basis = mel(\n sr=sample_rate,\n n_fft=n_fft,\n n_mels=n_mel_channels,\n fmin=mel_fmin,\n fmax=mel_fmax,\n htk=True,\n )\n self.register_buffer("mel_basis", torch.from_numpy(mel_basis).float())\n self.n_fft = n_fft\n self.hop_length = hop_length\n self.win_length = win_length\n self.sample_rate = sample_rate\n self.n_mel_channels = n_mel_channels\n self.clamp = clamp\n self.is_half = is_half\n\n def forward(self, audio, keyshift=0, speed=1, center=True):\n factor = 2 ** (keyshift / 12)\n n_fft_new = int(np.round(self.n_fft * factor))\n win_length_new = int(np.round(self.win_length * factor))\n hop_length_new = int(np.round(self.hop_length * speed))\n keyshift_key = f"{keyshift}_{audio.device}"\n if keyshift_key not in self.hann_window:\n self.hann_window[keyshift_key] = torch.hann_window(win_length_new).to(\n audio.device\n )\n if not hasattr(self, "stft"):\n self.stft = STFT(\n filter_length=n_fft_new,\n hop_length=hop_length_new,\n win_length=win_length_new,\n window="hann",\n ).to(audio.device)\n magnitude = self.stft.transform(audio)\n if keyshift != 0:\n size = self.n_fft // 2 + 1\n resize = magnitude.size(1)\n if resize < size:\n magnitude = F.pad(magnitude, (0, 0, 0, size - resize))\n magnitude = magnitude[:, :size, :] * self.win_length / win_length_new\n mel_output = torch.matmul(self.mel_basis, magnitude)\n if self.is_half:\n mel_output = mel_output.half()\n return torch.log(torch.clamp(mel_output, min=self.clamp))\n\n\nclass RMVPE0Predictor:\n def __init__(self, model_path, is_half, device=None):\n self.resample_kernel = {}\n self.is_half = is_half\n if device is None:\n device = "cuda" if torch.cuda.is_available() else "cpu"\n self.device = device\n self.mel_extractor = MelSpectrogram(\n is_half, 128, 16000, 1024, 160, None, 30, 8000\n ).to(device)\n model = E2E(4, 1, (2, 2))\n ckpt = torch.load(model_path, map_location="cpu", weights_only=True)\n model.load_state_dict(ckpt)\n model.eval()\n if is_half:\n model = model.half()\n self.model = model.to(device)\n self.cents_mapping = np.pad(20 * np.arange(360) + 1997.3794084376191, (4, 4))\n\n def mel2hidden(self, mel):\n with torch.no_grad():\n n_frames = mel.shape[-1]\n mel = mel.float()\n padding = min(32 * ((n_frames - 1) // 32 + 1) - n_frames, n_frames)\n mel = F.pad(mel, (0, padding), mode="reflect")\n if self.is_half:\n mel = mel.half()\n hidden = self.model(mel)\n return hidden[:, :n_frames]\n\n def decode(self, hidden, thred=0.03):\n cents_pred = self.to_local_average_cents(hidden, thred=thred)\n f0 = 10 * (2 ** (cents_pred / 1200))\n f0[f0 == 10] = 0\n return f0\n\n def infer_from_audio(self, audio, thred=0.03):\n audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0)\n mel = self.mel_extractor(audio, center=True)\n hidden = self.mel2hidden(mel)\n hidden = hidden.squeeze(0).cpu().numpy()\n if self.is_half:\n hidden = hidden.astype("float32")\n return self.decode(hidden, thred=thred)\n\n def infer_from_audio_with_pitch(self, audio, thred=0.03, f0_min=50, f0_max=1100):\n audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0)\n mel = self.mel_extractor(audio, center=True)\n hidden = self.mel2hidden(mel)\n hidden = hidden.squeeze(0).cpu().numpy()\n if self.is_half:\n hidden = hidden.astype("float32")\n f0 = self.decode(hidden, thred=thred)\n f0[(f0 < f0_min) | (f0 > f0_max)] = 0\n return f0\n\n def to_local_average_cents(self, salience, thred=0.05):\n center = np.argmax(salience, axis=1)\n salience = np.pad(salience, ((0, 0), (4, 4)))\n center += 4\n todo_salience = []\n todo_cents_mapping = []\n starts = center - 4\n ends = center + 5\n for idx in range(salience.shape[0]):\n todo_salience.append(salience[:, starts[idx] : ends[idx]][idx])\n todo_cents_mapping.append(self.cents_mapping[starts[idx] : ends[idx]])\n todo_salience = np.array(todo_salience)\n todo_cents_mapping = np.array(todo_cents_mapping)\n product_sum = np.sum(todo_salience * todo_cents_mapping, 1)\n weight_sum = np.sum(todo_salience, 1)\n divided = product_sum / weight_sum\n maxx = np.max(salience, axis=1)\n divided[maxx <= thred] = 0\n return divided\n\n', 'vbach/utils/remove_center.py': '\nimport numpy as np\nfrom scipy import signal\n\ndef remove_center(input_array, samplerate, rdf=0.99999, window_size=2048, overlap=2, window_type="blackman", stereo_mode="stereo"):\n # Validate input\n # if input_array.ndim != 2 or input_array.shape[1] != 2:\n # raise ValueError("Input must be a stereo array with shape (samples, 2)")\n \n left = input_array[0]\n right = input_array[1]\n # mono = np.mean(input_array, axis=1)\n\n # Adjust window size if input is too short\n nperseg = min(window_size, len(left))\n if nperseg < 16: # Minimum reasonable window size\n nperseg = 16\n if len(left) < 16:\n # For very short inputs, just return the original with warning\n import warnings\n warnings.warn(f"Input too short ({len(left)} samples), returning original audio")\n return left, right, left, right\n \n noverlap = nperseg // overlap # Ensure noverlap < nperseg\n if noverlap >= nperseg:\n noverlap = nperseg - 1 # Ensure at least 1 sample difference\n\n # Compute STFT\n f, t, Z_left = signal.stft(left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)\n f, t, Z_right = signal.stft(right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)\n # f, t, Z_mono = signal.stft(mono, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)\n\n if stereo_mode == "mono":\n Z_common_left = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_mono))\n Z_common_right = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_mono))\n else:\n Z_common_left = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_right))\n Z_common_right = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_left))\n\n reduction_factor = rdf\n\n Z_new_left = Z_left - Z_common_left * reduction_factor\n Z_new_right = Z_right - Z_common_right * reduction_factor\n\n # Compute ISTFT\n _, new_left = signal.istft(Z_new_left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)\n _, new_right = signal.istft(Z_new_right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)\n _, common_signal_left = signal.istft(Z_common_left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)\n _, common_signal_right = signal.istft(Z_common_right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)\n\n # Trim to original length\n new_left = new_left[:len(left)]\n new_right = new_right[:len(right)]\n common_signal_left = common_signal_left[:len(left)]\n common_signal_right = common_signal_right[:len(left)]\n\n # Normalize\n peak = np.max([np.abs(new_left).max(), np.abs(new_right).max()])\n if peak > 1.0:\n new_left = new_left / peak\n new_right = new_right / peak\n\n inverted_center_left = -common_signal_left\n inverted_center_right = -common_signal_right\n\n mixed_left = left + inverted_center_left\n mixed_right = right + inverted_center_right\n\n peak_mixed = np.max([np.abs(mixed_left).max(), np.abs(mixed_right).max()])\n if peak_mixed > 1.0:\n mixed_left = mixed_left / peak_mixed\n mixed_right = mixed_right / peak_mixed\n\n return common_signal_left, common_signal_right, new_left, new_right\n'} for filepath, content in files.items(): # Нормализуем путь и объединяем с базовым filepath = normalize_path(filepath) full_path = os.path.join(base_dir, filepath) # Заменяем оставшиесе разделители на системные full_path = os.path.normpath(full_path) # Создаем родительские папки если нужно os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, 'w', encoding='utf-8') as f: f.write(content) print(f"Created file: {full_path}") recreate_structure(current_dir) for req in add_requirements: os.system(f"pip install -qq {req}") import gdown for url, file in downloadable_model_paths: if not os.path.exists(file): try: r = requests.get(url, stream=True) r.raise_for_status() with open(os.path.join(file), "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) except requests.exceptions.RequestException as e: print(f"Произошла ошибка при загрузке модели: {e}") except Exception as e: print(f"Произошла непредвиденная ошибка: {e}") def download_file(url, zip_name, progress): try: if "drive.google.com" in url: progress(0.5, desc=t('downloading_google')) download_from_google_drive(url, zip_name, progress) elif "huggingface.co" in url: progress(0.5, desc=t('downloading_huggingface')) download_from_huggingface(url, zip_name, progress) elif "pixeldrain.com" in url: progress(0.5, desc=t('downloading_pixeldrain')) download_from_pixeldrain(url, zip_name, progress) elif "mega.nz" in url: print(t('mega_unsupported')) elif "disk.yandex.ru" in url or "yadi.sk" in url: progress(0.5, desc=t('downloading_yandex')) download_from_yandex(url, zip_name, progress) else: raise ValueError(t('unsupported_source', url=url)) except Exception as e: raise gr.Error(t('download_error', error=str(e))) def download_from_google_drive(url, zip_name, progress): file_id = ( url.split("file/d/")[1].split("/")[0] if "file/d/" in url else url.split("id=")[1].split("&")[0] ) gdown.download(id=file_id, output=str(zip_name), quiet=False) def download_from_huggingface(url, zip_name, progress): urllib.request.urlretrieve(url, zip_name) def download_from_pixeldrain(url, zip_name, progress): file_id = url.split("pixeldrain.com/u/")[1] response = requests.get(f"https://pixeldrain.com/api/file/{file_id}") with open(zip_name, "wb") as f: f.write(response.content) def download_from_yandex(url, zip_name, progress): yandex_public_key = f"download?public_key={url}" yandex_api_url = f"https://cloud-api.yandex.net/v1/disk/public/resources/{yandex_public_key}" response = requests.get(yandex_api_url) if response.status_code == 200: download_link = response.json().get("href") urllib.request.urlretrieve(download_link, zip_name) else: raise gr.Error(t('yandex_api_error', status=response.status_code)) def extract_zip(extraction_folder, zip_name): os.makedirs(extraction_folder, exist_ok=True) with zipfile.ZipFile(zip_name, "r") as zip_ref: zip_ref.extractall(extraction_folder) os.remove(zip_name) index_filepath, model_filepath = None, None for root, _, files in os.walk(extraction_folder): for name in files: file_path = os.path.join(root, name) if name.endswith(".index") and os.stat(file_path).st_size > 1024 * 100: index_filepath = file_path if name.endswith(".pth") and os.stat(file_path).st_size > 1024 * 1024 * 40: model_filepath = file_path if not model_filepath: raise gr.Error(t('pth_not_found', folder=extraction_folder)) rename_and_cleanup(extraction_folder, model_filepath, index_filepath) def rename_and_cleanup(extraction_folder, model_filepath, index_filepath): os.rename( model_filepath, os.path.join(extraction_folder, os.path.basename(model_filepath)), ) if index_filepath: os.rename( index_filepath, os.path.join(extraction_folder, os.path.basename(index_filepath)), ) for filepath in os.listdir(extraction_folder): full_path = os.path.join(extraction_folder, filepath) if os.path.isdir(full_path): shutil.rmtree(full_path) def download_from_url(url, dir_name, progress=gr.Progress()): try: progress(0, desc=t('downloading_model', dir_name=dir_name)) zip_name = os.path.join(RVC_MODELS_DIR, dir_name + ".zip") extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name) if os.path.exists(extraction_folder): raise gr.Error(t('model_exists', dir_name=dir_name)) download_file(url, zip_name, progress) progress(0.8, desc=t('unpacking_zip')) extract_zip(extraction_folder, zip_name) return t('model_uploaded', dir_name=dir_name) except Exception as e: raise gr.Error(t('model_load_error', error=str(e))) def download_from_url_2(pth_url, index_url, dir_name, progress=gr.Progress()): try: progress(0, desc=t('downloading_model', dir_name=dir_name)) pth_name = os.path.join(RVC_MODELS_DIR, dir_name, f"{dir_name}.pth") index_name = os.path.join(RVC_MODELS_DIR, dir_name, f"{dir_name}.index") if os.path.exists(os.path.join(RVC_MODELS_DIR, dir_name)): raise gr.Error(t('model_exists', dir_name=dir_name)) else: os.makedirs(os.path.join(RVC_MODELS_DIR, dir_name), exist_ok=True) download_file(pth_url, pth_name, progress) if index_url != "" or index_url is not None: download_file(index_url, index_name, progress) return t('model_uploaded', dir_name=dir_name) except Exception as e: raise gr.Error(t('model_load_error', error=str(e))) def upload_zip_file(zip_path, dir_name, progress=gr.Progress()): try: extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name) if os.path.exists(extraction_folder): raise gr.Error(t('model_exists', dir_name=dir_name)) zip_name = zip_path.name progress(0.8, desc=t('unpacking_zip')) extract_zip(extraction_folder, zip_name) return t('model_uploaded', dir_name=dir_name) except Exception as e: raise gr.Error(t('model_load_error', error=str(e))) def upload_separate_files(pth_file, index_file, dir_name, progress=gr.Progress()): try: extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name) if os.path.exists(extraction_folder): raise gr.Error(t('model_exists', dir_name=dir_name)) os.makedirs(extraction_folder, exist_ok=True) if pth_file: pth_path = os.path.join(extraction_folder, os.path.basename(pth_file.name)) shutil.copyfile(pth_file.name, pth_path) if index_file: index_path = os.path.join(extraction_folder, os.path.basename(index_file.name)) shutil.copyfile(index_file.name, index_path) return t('model_uploaded', dir_name=dir_name) except Exception as e: raise gr.Error(t('model_load_error', error=str(e))) def delete_model_name(dir_name): model_dir = os.path.join(current_dir, RVC_MODELS_DIR, dir_name) if os.path.exists(model_dir): try: if os.path.isdir(model_dir): shutil.rmtree(model_dir) return t('model_deleted', dir_name=dir_name) except Exception as e: raise gr.Error(t('model_delete_error', error=str(e))) else: return t('model_not_found', dir_name=dir_name) from vbach.cli.vbach import voice_conversion def round_nearest_multiple_of_12(x): if x == 0: return 0 return round(x / 12) * 12 def renamer(template: str, input_file: str, model_name: str, method_pitch: str, pitch: float): """ Renames the output file based on the provided template and parameters. Args: template (str): The template string for renaming. input_file (str): Path to the input file. model_name (str): The name of the model used. method_pitch (str): The pitch extraction method used. pitch (float): The pitch value to include in the filename. Returns: str: The formatted output filename. """ time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S") return ( template .replace("DATETIME", time_create_file) .replace("NAME", os.path.splitext(os.path.basename(input_file))[0]) .replace("MODEL", model_name) .replace("F0METHOD", method_pitch) .replace("PITCH", f"{pitch}") ) def convert_voice( input_path: str = None, template: str = "NAME_MODEL_F0METHOD_PITCH", model_name: str = "", input_file: str = None, index_rate: float = 0, output_format: str = "wav", output_bitrate: int = 320, stereo_mode: str = "mono", method_pitch: str = "rmvpe+", pitch: float = 0, hop_length: int = 128, filter_radius: int = 3, rms: float = 0.25, protect: float = 0.33, f0_min: int = 50, f0_max: int = 1100 ): output_dir = tempfile.mkdtemp(prefix="converted_voice_") output_path = f"{renamer(template, input_file, model_name, method_pitch, pitch)}.{output_format}" try: output_path = voice_conversion( voice_model=model_name, vocals_path=input_file, output_path=output_path, pitch=pitch, f0_method=method_pitch, index_rate=index_rate, filter_radius=filter_radius, volume_envelope=rms, protect=protect, hop_length=hop_length, f0_min=f0_min, f0_max=f0_max, format_output=output_format, output_bitrate=f"{int(output_bitrate)}k", stereo_mode=stereo_mode ) except Exception as e: print(e) finally: print(t("success_single")) return output_path def process_audio( input_audio: str = None, template: str = "NAME_MODEL_F0METHOD_PITCH", model: str = "", index_rate: float = 0, output_format: str = "wav", output_bitrate: int = 320, stereo_mode: str = "mono", method_pitch: str = "rmvpe+", pitch: float = 0, hop_length: int = 128, filter_radius: int = 3, rms: float = 0.25, protect: float = 0.33, f0_min: int = 50, f0_max: int = 1100 ): """ Processes the input audio file and converts it using the specified parameters. Args: input_audio (str): Path to the input audio file. template (str): Template for renaming the output file. model (str): Name of the voice model to use. index_rate (float): Index rate for processing. output_format (str): Desired output format of the audio file. output_bitrate (int): Bitrate for the output audio file. stereo_mode (str): Stereo mode for processing. method_pitch (str): Pitch extraction method to use. pitch (float): Pitch adjustment value. hop_length (int): Hop length for processing. filter_radius (int): Radius of the filter to apply. rms (float): RMS value for processing. protect (float): Protection value for consonants. f0_min (int): Minimum F0 value for processing. f0_max (int): Maximum F0 value for processing. Returns: str: Path to the processed audio file. """ if not input_audio: raise gr.Error(t("error_no_audio")) if not model: raise gr.Error(t("error_no_model")) if template is None or template == "": template = "Vbach_NAME_MODEL_F0METHOD_PITCH" elif input_audio: if isinstance(input_audio, list): input_files = [f for f in input_audio if os.path.exists(f)] if not input_files: raise gr.Error(t("error_strlist_is_not_list")) return [convert_voice( input_file=f, template=template, model_name=model, index_rate=index_rate, output_format=output_format, output_bitrate=output_bitrate, stereo_mode=stereo_mode, method_pitch=method_pitch, pitch=pitch, hop_length=hop_length, filter_radius=filter_radius, rms=rms, protect=protect, f0_min=f0_min, f0_max=f0_max ) for f in input_files] else: return convert_voice( input_file=input_audio, template=template, model_name=model, index_rate=index_rate, output_format=output_format, output_bitrate=output_bitrate, stereo_mode=stereo_mode, method_pitch=method_pitch, pitch=pitch, hop_length=hop_length, filter_radius=filter_radius, rms=rms, protect=protect, f0_min=f0_min, f0_max=f0_max ) def vbach_plugin_name(): return "VBach" def vbach_plugin(lang="ru"): set_language(lang) with gr.TabItem(t("inference")): with gr.Column(): with gr.Row(): with gr.Column(variant="panel"): with gr.Group(): with gr.Row(equal_height=True): model = gr.Dropdown( choices=[d for d in os.listdir(RVC_MODELS_DIR)], label=t("model_name"), multiselect=False, interactive=True, scale=4 ) model_update_btn = gr.Button( t("update_button"), variant="secondary", size="sm", scale=1, min_width=150 ) pitch = gr.Slider( minimum=-48, maximum=48, step=1, value=0, label=t("pitch"), interactive=True, ) pitch_step = gr.Checkbox( label=t("pitch_step"), value=False, interactive=True, ) with gr.Group(): input_audio = gr.File( label=t("select_file"), file_types=["audio"], file_count="single", interactive=True, ) batch_upload_check = gr.Checkbox( label=t("batch_upload"), value=False, interactive=True, ) with gr.Group(): with gr.Row(equal_height=True): output_format = gr.Dropdown( choices=OUTPUT_FORMAT, label=t("output_format"), value="mp3", multiselect=False, interactive=True, scale=1 ) generated_voice = gr.Audio( label=t("converted_voice"), type="filepath", interactive=False, show_download_button=True, scale=8 ) generated_voices = gr.Files( label=t("converted_voices"), type="filepath", interactive=False, visible=False, scale=8 ) generate_btn = gr.Button( t("convert_single"), variant="primary", size="md", scale=1, min_width=150 ) generate_batch_btn = gr.Button( t("convert_batch"), variant="primary", size="md", scale=1, min_width=150, visible=False ) with gr.Group(): method_pitch = gr.Radio( choices=["mangio-crepe", "rmvpe+", "fcpe"], label=t("pitch_method"), value="rmvpe+", interactive=True, scale=1 ) hop_length = gr.Slider( minimum=8, maximum=512, step=8, value=128, label=t("hop_length"), info=t("hop_length_info"), interactive=True, visible=False ) with gr.Accordion( t("conversion_settings"), open=False, ): with gr.Accordion( t("standart_settings"), open=False, ): with gr.Group(): stereo_mode = gr.Radio( choices=["mono", "left/right", "sim/dif"], label=t("audio_processing"), info=t("stereo_mode_info"), value="mono", interactive=True, scale=1 ) index_rate = gr.Slider( minimum=0, maximum=1, step=0.01, value=0, label=t("index_rate"), info=t("index_info"), interactive=True ) filter_radius = gr.Slider( minimum=1, maximum=7, step=1, value=3, label=t("filter_radius"), info=t("filter_radius_info"), interactive=True ) rms = gr.Slider( minimum=0, maximum=1, step=0.01, value=0.25, label=t("rms"), info=t("rms_info"), interactive=True ) protect = gr.Slider( minimum=0, maximum=0.5, step=0.01, value=0.33, label=t("protect"), info=t("protect_info"), interactive=True ) with gr.Accordion( t("advanced_settings"), open=False, ): with gr.Row(equal_height=True): f0_min = gr.Slider( minimum=1, maximum=120, step=1, value=50, label=t("f0_min"), info=t("f0_min_info"), interactive=True ) f0_max = gr.Slider( minimum=380, maximum=16000, step=1, value=1100, label=t("f0_max"), info=t("f0_max_info"), interactive=True ) with gr.Accordion( t("export_settings"), open=False, ): with gr.Group(): with gr.Accordion( t("name_format"), open=False, ): gr.Markdown(t("name_format_info"), line_breaks=True) template = gr.Textbox( label=t("name_format"), value="NAME_MODEL_F0METHOD_PITCH", lines=1, interactive=True, placeholder=t("name_format_info") ) output_bitrate = gr.Slider( minimum=32, maximum=320, step=8, value=320, label=t("bitrate"), interactive=True ) batch_upload_check.change( lambda x: ( gr.update(file_count="multiple" if x == True else "single", value=None), gr.update(visible=False if x == True else True), gr.update(visible=True if x == True else False), gr.update(visible=False if x == True else True), gr.update(visible=True if x == True else False) ), inputs=batch_upload_check, outputs=[input_audio, generated_voice, generated_voices, generate_btn, generate_batch_btn] ) model_update_btn.click( lambda : gr.update(choices=[d for d in os.listdir(os.path.join(current_dir, RVC_MODELS_DIR))]), outputs=model ) pitch_step.change( lambda x, y: gr.update(step=12 if x == True else 1, value=round_nearest_multiple_of_12(y) if x == True else y), inputs=[pitch_step, pitch], outputs=pitch ) method_pitch.change( lambda x: gr.update(visible=True if x == "mangio-crepe" else False), inputs=method_pitch, outputs=hop_length ) generate_btn.click( fn=process_audio, inputs=[ input_audio, template, model, index_rate, output_format, output_bitrate, stereo_mode, method_pitch, pitch, hop_length, filter_radius, rms, protect, f0_min, f0_max ], outputs=[generated_voice] ) generate_batch_btn.click( fn=process_audio, inputs=[ input_audio, template, model, index_rate, output_format, output_bitrate, stereo_mode, method_pitch, pitch, hop_length, filter_radius, rms, protect, f0_min, f0_max ], outputs=[generated_voices] ) with gr.TabItem(t("model_manager")): with gr.TabItem(t("download_tab")): with gr.TabItem(t("download_url")): with gr.Row(): with gr.Column(variant="panel"): gr.HTML(f"

{t('download_link')}

") model_zip_link = gr.Text(label=t("download_link")) with gr.Group(): zip_model_name = gr.Text( label=t("model_name"), info=t("unique_name"), ) download_btn = gr.Button(t("download_button"), variant="primary") gr.HTML( f"

{t('supported_sites')}: " "HuggingFace, " "Pixeldrain, " "Google Drive, " "Яндекс Диск" "

" ) dl_output_message = gr.Text(label=t("output_message"), interactive=False) download_btn.click( download_from_url, inputs=[model_zip_link, zip_model_name], outputs=dl_output_message, ) with gr.TabItem(t("download_url_pth")): with gr.Row(): with gr.Column(variant="panel"): gr.HTML(f"

{t('download_files_link')}

") model_pth_link = gr.Text(label=t("download_pth_link")) model_index_link = gr.Text(label=t("download_index_link")) with gr.Group(): install_model_name = gr.Text( label=t("model_name"), info=t("unique_name"), ) download_2_btn = gr.Button(t("download_button"), variant="primary") dl_2_output_message = gr.Text(label=t("output_message"), interactive=False) download_2_btn.click( download_from_url_2, inputs=[model_pth_link, model_index_link, install_model_name], outputs=dl_2_output_message, ) with gr.Tab(t("download_zip")): with gr.Row(): with gr.Column(): zip_file = gr.File( label=t("zip_file"), file_types=[".zip"], file_count="single" ) with gr.Column(variant="panel"): gr.HTML(t("upload_steps")) with gr.Group(): local_model_name = gr.Text( label=t("model_name"), info=t("unique_name"), ) model_upload_button = gr.Button(t("download_button"), variant="primary") local_upload_output_message = gr.Text(label=t("output_message"), interactive=False) model_upload_button.click( upload_zip_file, inputs=[zip_file, local_model_name], outputs=local_upload_output_message, ) with gr.TabItem(t("download_files")): with gr.Group(): with gr.Row(): pth_file = gr.File( label=t("pth_file"), file_types=[".pth"], file_count="single" ) index_file = gr.File( label=t("index_file"), file_types=[".index"], file_count="single" ) with gr.Column(variant="panel"): with gr.Group(): separate_model_name = gr.Text( label=t("model_name"), info=t("unique_name"), ) separate_upload_button = gr.Button(t("download_button"), variant="primary") separate_upload_output_message = gr.Text( label=t("output_message"), interactive=False ) separate_upload_button.click( upload_separate_files, inputs=[pth_file, index_file, separate_model_name], outputs=separate_upload_output_message, ) with gr.TabItem(t("delete_model")): with gr.Column(variant="panel"): with gr.Group(): delete_voicemodel_name = gr.Dropdown( label=t("model_name"), info=t("delete_info"), choices=[d for d in os.listdir(RVC_MODELS_DIR) if os.path.isdir(os.path.join(os.path.join(current_dir, RVC_MODELS_DIR), d))], interactive=True, filterable=False ) refresh_delete_btn = gr.Button(t("refresh_button")) refresh_delete_btn.click(fn=(lambda : gr.update(choices=[d for d in os.listdir(RVC_MODELS_DIR) if os.path.isdir(os.path.join(RVC_MODELS_DIR, d))])), inputs=None, outputs=delete_voicemodel_name) delete_model_output_message = gr.Text( label=t("output_message"), interactive=False ) delete_model_btn = gr.Button(t("delete_button")) delete_model_btn.click( fn=delete_model_name, inputs=delete_voicemodel_name, outputs=delete_model_output_message )