| import os, sys, traceback |
| from transformers import HubertModel |
| import librosa |
| from torch import nn |
| import torch |
|
|
| import json |
| os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" |
| os.environ["PYTORCH_MPS_HIGH_WATERMARK_RATIO"] = "0.0" |
|
|
| device=sys.argv[1] |
| n_part = int(sys.argv[2]) |
| i_part = int(sys.argv[3]) |
| if len(sys.argv) == 6: |
| exp_dir = sys.argv[4] |
| version = sys.argv[5] |
| else: |
| i_gpu = sys.argv[4] |
| exp_dir = sys.argv[5] |
| os.environ["CUDA_VISIBLE_DEVICES"] = str(i_gpu) |
| version = sys.argv[6] |
| import torch |
| import torch.nn.functional as F |
| import soundfile as sf |
| import numpy as np |
| from fairseq import checkpoint_utils |
|
|
| |
| if torch.cuda.is_available(): |
| device = "cuda" |
| elif torch.backends.mps.is_available(): |
| device = "mps" |
| |
| version_config_paths = [ |
| os.path.join("", "32k.json"), |
| os.path.join("", "40k.json"), |
| os.path.join("", "48k.json"), |
| os.path.join("", "48k_v2.json"), |
| os.path.join("", "40k.json"), |
| os.path.join("", "32k_v2.json"), |
| ] |
|
|
| class Config: |
| def __init__(self): |
| self.device = "cuda:0" if torch.cuda.is_available() else "cpu" |
| self.is_half = self.device != "cpu" |
| self.gpu_name = ( |
| torch.cuda.get_device_name(int(self.device.split(":")[-1])) |
| if self.device.startswith("cuda") |
| else None |
| ) |
| self.json_config = self.load_config_json() |
| self.gpu_mem = None |
| self.x_pad, self.x_query, self.x_center, self.x_max = self.device_config() |
|
|
| def load_config_json(self) -> dict: |
| configs = {} |
| for config_file in version_config_paths: |
| config_path = os.path.join("configs", config_file) |
| with open(config_path, "r") as f: |
| configs[config_file] = json.load(f) |
| return configs |
|
|
| def has_mps(self) -> bool: |
| |
| return torch.backends.mps.is_available() |
|
|
| def has_xpu(self) -> bool: |
| |
| return hasattr(torch, "xpu") and torch.xpu.is_available() |
|
|
| def set_precision(self, precision): |
| if precision not in ["fp32", "fp16"]: |
| raise ValueError("Invalid precision type. Must be 'fp32' or 'fp16'.") |
|
|
| fp16_run_value = precision == "fp16" |
| preprocess_target_version = "3.7" if precision == "fp16" else "3.0" |
| preprocess_path = os.path.join( |
| os.path.dirname(__file__), |
| os.pardir, |
| "" |
| "preprocess.py", |
| ) |
|
|
| for config_path in version_config_paths: |
| full_config_path = os.path.join("configs", config_path) |
| try: |
| with open(full_config_path, "r") as f: |
| config = json.load(f) |
| config["train"]["fp16_run"] = fp16_run_value |
| with open(full_config_path, "w") as f: |
| json.dump(config, f, indent=4) |
| except FileNotFoundError: |
| print(f"File not found: {full_config_path}") |
|
|
| if os.path.exists(preprocess_path): |
| with open(preprocess_path, "r") as f: |
| preprocess_content = f.read() |
| preprocess_content = preprocess_content.replace( |
| "3.0" if precision == "fp16" else "3.7", preprocess_target_version |
| ) |
| with open(preprocess_path, "w") as f: |
| f.write(preprocess_content) |
|
|
| return f"Overwritten preprocess and config.json to use {precision}." |
|
|
| def get_precision(self): |
| if not version_config_paths: |
| raise FileNotFoundError("No configuration paths provided.") |
|
|
| full_config_path = os.path.join("configs", version_config_paths[0]) |
| try: |
| with open(full_config_path, "r") as f: |
| config = json.load(f) |
| fp16_run_value = config["train"].get("fp16_run", False) |
| precision = "fp16" if fp16_run_value else "fp32" |
| return precision |
| except FileNotFoundError: |
| print(f"File not found: {full_config_path}") |
| return None |
|
|
| def device_config(self) -> tuple: |
| if self.device.startswith("cuda"): |
| self.set_cuda_config() |
| elif self.has_mps(): |
| self.device = "mps" |
| self.is_half = False |
| self.set_precision("fp32") |
| else: |
| self.device = "cpu" |
| self.is_half = False |
| self.set_precision("fp32") |
|
|
| |
| x_pad, x_query, x_center, x_max = ( |
| (3, 10, 60, 65) if self.is_half else (1, 6, 38, 41) |
| ) |
| if self.gpu_mem is not None and self.gpu_mem <= 4: |
| |
| x_pad, x_query, x_center, x_max = (1, 5, 30, 32) |
|
|
| return x_pad, x_query, x_center, x_max |
|
|
| def set_cuda_config(self): |
| i_device = int(self.device.split(":")[-1]) |
| self.gpu_name = torch.cuda.get_device_name(i_device) |
| low_end_gpus = ["16", "P40", "P10", "1060", "1070", "1080"] |
| if ( |
| any(gpu in self.gpu_name for gpu in low_end_gpus) |
| and "V100" not in self.gpu_name.upper() |
| ): |
| self.is_half = False |
| self.set_precision("fp32") |
|
|
| self.gpu_mem = torch.cuda.get_device_properties(i_device).total_memory // ( |
| 1024**3 |
| ) |
| config = Config() |
|
|
| def load_audio(file, sample_rate): |
| try: |
| file = file.strip(" ").strip('"').strip("\n").strip('"').strip(" ") |
| audio, sr = sf.read(file) |
| if len(audio.shape) > 1: |
| audio = librosa.to_mono(audio.T) |
| if sr != sample_rate: |
| audio = librosa.resample(audio, orig_sr=sr, target_sr=sample_rate) |
| except Exception as error: |
| raise RuntimeError(f"An error occurred loading the audio: {error}") |
|
|
| return audio.flatten() |
|
|
| |
| class HubertModelWithFinalProj(HubertModel): |
| def __init__(self, config): |
| super().__init__(config) |
| self.final_proj = nn.Linear(config.hidden_size, config.classifier_proj_size) |
| print(config.hidden_size, config.classifier_proj_size) |
|
|
| f = open("%s/extract_f0_feature.log" % exp_dir, "a+") |
|
|
|
|
| def printt(strr): |
| print(strr) |
| f.write("%s\n" % strr) |
| f.flush() |
|
|
|
|
| printt(sys.argv) |
| model_path = sys.argv[7] |
| Custom_Embed = False |
| sample_embedding = sys.argv[8] |
| if os.path.split(model_path)[-1] == "Custom" and sample_embedding == "hubert_base": |
| model_path = "hubert_base.pt" |
| Custom_Embed = True |
| elif os.path.split(model_path)[-1] == "Custom" and sample_embedding == "contentvec_base": |
| model_path = "contentvec_base.pt" |
| Custom_Embed = True |
| elif os.path.split(model_path)[-1] == "Custom" and sample_embedding == "hubert_base_japanese": |
| model_path = "japanese_hubert_base.pt" |
| Custom_Embed = True |
|
|
| printt(exp_dir) |
| wavPath = "%s/1_16k_wavs" % exp_dir |
| outPath = ( |
| "%s/3_feature256" % exp_dir if version == "v1" else "%s/3_feature768" % exp_dir |
| ) |
| os.makedirs(outPath, exist_ok=True) |
|
|
|
|
| |
| def readwave(wav_path, normalize=False): |
| wav, sr = sf.read(wav_path) |
| assert sr == 16000 |
| if Custom_Embed == False: |
| feats = torch.from_numpy(wav).float() |
| else: |
| feats = torch.from_numpy(load_audio(wav_path, sr)).to(dtype).to(device) |
| if feats.dim() == 2: |
| feats = feats.mean(-1) |
| assert feats.dim() == 1, feats.dim() |
| if normalize: |
| with torch.no_grad(): |
| feats = F.layer_norm(feats, feats.shape) |
| feats = feats.view(1, -1) |
| return feats |
|
|
|
|
| |
| printt("load model(s) from {}".format(model_path)) |
| |
| if os.access(model_path, os.F_OK) == False: |
| printt( |
| "Error: Extracting is shut down because %s does not exist, you may download it from https://huggingface.co/lj1995/VoiceConversionWebUI/tree/main" |
| % model_path |
| ) |
| exit(0) |
| models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task( |
| [model_path], |
| suffix="", |
| ) |
| if Custom_Embed == False: |
| model = models[0] |
| if device not in ["mps", "cpu"]: |
| model = model.half() |
| else: |
| dtype = torch.float16 if config.is_half and "cuda" in device else torch.float32 |
| model = HubertModelWithFinalProj.from_pretrained("Custom/").to(dtype).to(device) |
| model = model.to(device) |
| printt("move model to %s" % device) |
| model.eval() |
|
|
| todo = sorted(list(os.listdir(wavPath)))[i_part::n_part] |
| n = max(1, len(todo) // 10) |
| if len(todo) == 0: |
| printt("no-feature-todo") |
| else: |
| printt("all-feature-%s" % len(todo)) |
| for idx, file in enumerate(todo): |
| try: |
| if file.endswith(".wav"): |
| wav_path = "%s/%s" % (wavPath, file) |
| out_path = "%s/%s" % (outPath, file.replace("wav", "npy")) |
|
|
| if os.path.exists(out_path): |
| continue |
|
|
| feats = readwave(wav_path, normalize=saved_cfg.task.normalize) |
| padding_mask = torch.BoolTensor(feats.shape).fill_(False) |
| inputs = { |
| "source": feats.half().to(device) |
| if device not in ["mps", "cpu"] |
| else feats.to(device), |
| "padding_mask": padding_mask.to(device), |
| "output_layer": 9 if version == "v1" else 12, |
| } |
| with torch.no_grad(): |
| if Custom_Embed == False: |
| logits = model.extract_features(**inputs) |
| feats = ( |
| model.final_proj(logits[0]) if version == "v1" else logits[0] |
| ) |
| elif Custom_Embed == True: |
| feats = model(feats)["last_hidden_state"] |
| feats = ( |
| model.final_proj(feats[0]).unsqueeze(0) if version == "v1" else feats |
| ) |
|
|
| feats = feats.squeeze(0).float().cpu().numpy() |
| if np.isnan(feats).sum() == 0: |
| np.save(out_path, feats, allow_pickle=False) |
| else: |
| printt("%s-contains nan" % file) |
| if idx % n == 0: |
| printt("now-%s,all-%s,%s,%s" % (idx, len(todo), file, feats.shape)) |
| except: |
| printt(traceback.format_exc()) |
| printt("all-feature-done") |
|
|