| import os |
| import sys |
| import time |
| import torch |
| import librosa |
| import logging |
| import traceback |
| import numpy as np |
| import soundfile as sf |
| import noisereduce as nr |
| from pedalboard import ( |
| Pedalboard, |
| Chorus, |
| Distortion, |
| Reverb, |
| PitchShift, |
| Limiter, |
| Gain, |
| Bitcrush, |
| Clipping, |
| Compressor, |
| Delay, |
| ) |
|
|
| from scipy.io import wavfile |
| from audio_upscaler import upscale |
|
|
| now_dir = os.getcwd() |
| sys.path.append(now_dir) |
|
|
| from rvc.infer.pipeline import Pipeline as VC |
| from rvc.lib.utils import load_audio_infer, load_embedding |
| from rvc.lib.tools.split_audio import process_audio, merge_audio |
| from rvc.lib.algorithm.synthesizers import Synthesizer |
| from rvc.configs.config import Config |
|
|
| logging.getLogger("httpx").setLevel(logging.WARNING) |
| logging.getLogger("httpcore").setLevel(logging.WARNING) |
| logging.getLogger("faiss").setLevel(logging.WARNING) |
| logging.getLogger("faiss.loader").setLevel(logging.WARNING) |
|
|
|
|
| class VoiceConverter: |
| """ |
| A class for performing voice conversion using the Retrieval-Based Voice Conversion (RVC) method. |
| """ |
|
|
| def __init__(self): |
| """ |
| Initializes the VoiceConverter with default configuration, and sets up models and parameters. |
| """ |
| self.config = Config() |
| self.hubert_model = ( |
| None |
| ) |
| self.last_embedder_model = None |
| self.tgt_sr = None |
| self.net_g = None |
| self.vc = None |
| self.cpt = None |
| self.version = None |
| self.n_spk = None |
| self.use_f0 = None |
|
|
| def load_hubert(self, embedder_model: str, embedder_model_custom: str = None): |
| """ |
| Loads the HuBERT model for speaker embedding extraction. |
| |
| Args: |
| embedder_model (str): Path to the pre-trained HuBERT model. |
| embedder_model_custom (str): Path to the custom HuBERT model. |
| """ |
| self.hubert_model = load_embedding(embedder_model, embedder_model_custom) |
| self.hubert_model.to(self.config.device) |
| self.hubert_model = ( |
| self.hubert_model.half() |
| if self.config.is_half |
| else self.hubert_model.float() |
| ) |
| self.hubert_model.eval() |
|
|
| @staticmethod |
| def remove_audio_noise(data, sr, reduction_strength=0.7): |
| """ |
| Removes noise from an audio file using the NoiseReduce library. |
| |
| Args: |
| data (numpy.ndarray): The audio data as a NumPy array. |
| sr (int): The sample rate of the audio data. |
| reduction_strength (float): Strength of the noise reduction. Default is 0.7. |
| """ |
| try: |
| reduced_noise = nr.reduce_noise( |
| y=data, sr=sr, prop_decrease=reduction_strength |
| ) |
| return reduced_noise |
| except Exception as error: |
| print(f"An error occurred removing audio noise: {error}") |
| return None |
|
|
| @staticmethod |
| def convert_audio_format(input_path, output_path, output_format): |
| """ |
| Converts an audio file to a specified output format. |
| |
| Args: |
| input_path (str): Path to the input audio file. |
| output_path (str): Path to the output audio file. |
| output_format (str): Desired audio format (e.g., "WAV", "MP3"). |
| """ |
| try: |
| if output_format != "WAV": |
| print(f"Converting audio to {output_format} format...") |
| audio, sample_rate = librosa.load(input_path, sr=None) |
| common_sample_rates = [ |
| 8000, |
| 11025, |
| 12000, |
| 16000, |
| 22050, |
| 24000, |
| 32000, |
| 44100, |
| 48000, |
| ] |
| target_sr = min(common_sample_rates, key=lambda x: abs(x - sample_rate)) |
| audio = librosa.resample( |
| audio, orig_sr=sample_rate, target_sr=target_sr |
| ) |
| sf.write(output_path, audio, target_sr, format=output_format.lower()) |
| return output_path |
| except Exception as error: |
| print(f"An error occurred converting the audio format: {error}") |
|
|
| @staticmethod |
| def post_process_audio( |
| audio_input, |
| sample_rate, |
| **kwargs, |
| ): |
| board = Pedalboard() |
| if kwargs.get("reverb", False): |
| reverb = Reverb( |
| room_size=kwargs.get("reverb_room_size", 0.5), |
| damping=kwargs.get("reverb_damping", 0.5), |
| wet_level=kwargs.get("reverb_wet_level", 0.33), |
| dry_level=kwargs.get("reverb_dry_level", 0.4), |
| width=kwargs.get("reverb_width", 1.0), |
| freeze_mode=kwargs.get("reverb_freeze_mode", 0), |
| ) |
| board.append(reverb) |
| if kwargs.get("pitch_shift", False): |
| pitch_shift = PitchShift(semitones=kwargs.get("pitch_shift_semitones", 0)) |
| board.append(pitch_shift) |
| if kwargs.get("limiter", False): |
| limiter = Limiter( |
| threshold_db=kwargs.get("limiter_threshold", -6), |
| release_ms=kwargs.get("limiter_release", 0.05), |
| ) |
| board.append(limiter) |
| if kwargs.get("gain", False): |
| gain = Gain(gain_db=kwargs.get("gain_db", 0)) |
| board.append(gain) |
| if kwargs.get("distortion", False): |
| distortion = Distortion(drive_db=kwargs.get("distortion_gain", 25)) |
| board.append(distortion) |
| if kwargs.get("chorus", False): |
| chorus = Chorus( |
| rate_hz=kwargs.get("chorus_rate", 1.0), |
| depth=kwargs.get("chorus_depth", 0.25), |
| centre_delay_ms=kwargs.get("chorus_delay", 7), |
| feedback=kwargs.get("chorus_feedback", 0.0), |
| mix=kwargs.get("chorus_mix", 0.5), |
| ) |
| board.append(chorus) |
| if kwargs.get("bitcrush", False): |
| bitcrush = Bitcrush(bit_depth=kwargs.get("bitcrush_bit_depth", 8)) |
| board.append(bitcrush) |
| if kwargs.get("clipping", False): |
| clipping = Clipping(threshold_db=kwargs.get("clipping_threshold", 0)) |
| board.append(clipping) |
| if kwargs.get("compressor", False): |
| compressor = Compressor( |
| threshold_db=kwargs.get("compressor_threshold", 0), |
| ratio=kwargs.get("compressor_ratio", 1), |
| attack_ms=kwargs.get("compressor_attack", 1.0), |
| release_ms=kwargs.get("compressor_release", 100), |
| ) |
| board.append(compressor) |
| if kwargs.get("delay", False): |
| delay = Delay( |
| delay_seconds=kwargs.get("delay_seconds", 0.5), |
| feedback=kwargs.get("delay_feedback", 0.0), |
| mix=kwargs.get("delay_mix", 0.5), |
| ) |
| board.append(delay) |
| return board(audio_input, sample_rate) |
|
|
| def convert_audio( |
| self, |
| audio_input_path: str, |
| audio_output_path: str, |
| model_path: str, |
| index_path: str, |
| pitch: int = 0, |
| f0_file: str = None, |
| f0_method: str = "rmvpe", |
| index_rate: float = 0.75, |
| volume_envelope: float = 1, |
| protect: float = 0.5, |
| hop_length: int = 128, |
| split_audio: bool = False, |
| f0_autotune: bool = False, |
| filter_radius: int = 3, |
| embedder_model: str = "contentvec", |
| embedder_model_custom: str = None, |
| clean_audio: bool = False, |
| clean_strength: float = 0.5, |
| export_format: str = "WAV", |
| upscale_audio: bool = False, |
| post_process: bool = False, |
| resample_sr: int = 0, |
| sid: int = 0, |
| **kwargs, |
| ): |
| """ |
| Performs voice conversion on the input audio. |
| |
| Args: |
| pitch (int): Key for F0 up-sampling. |
| filter_radius (int): Radius for filtering. |
| index_rate (float): Rate for index matching. |
| volume_envelope (int): RMS mix rate. |
| protect (float): Protection rate for certain audio segments. |
| hop_length (int): Hop length for audio processing. |
| f0_method (str): Method for F0 extraction. |
| audio_input_path (str): Path to the input audio file. |
| audio_output_path (str): Path to the output audio file. |
| model_path (str): Path to the voice conversion model. |
| index_path (str): Path to the index file. |
| split_audio (bool): Whether to split the audio for processing. |
| f0_autotune (bool): Whether to use F0 autotune. |
| clean_audio (bool): Whether to clean the audio. |
| clean_strength (float): Strength of the audio cleaning. |
| export_format (str): Format for exporting the audio. |
| upscale_audio (bool): Whether to upscale the audio. |
| f0_file (str): Path to the F0 file. |
| embedder_model (str): Path to the embedder model. |
| embedder_model_custom (str): Path to the custom embedder model. |
| resample_sr (int, optional): Resample sampling rate. Default is 0. |
| sid (int, optional): Speaker ID. Default is 0. |
| **kwargs: Additional keyword arguments. |
| """ |
| self.get_vc(model_path, sid) |
| try: |
| start_time = time.time() |
| print(f"Converting audio '{audio_input_path}'...") |
|
|
| if upscale_audio == True: |
| upscale(audio_input_path, audio_input_path) |
| audio = load_audio_infer( |
| audio_input_path, |
| 16000, |
| **kwargs, |
| ) |
| audio_max = np.abs(audio).max() / 0.95 |
|
|
| if audio_max > 1: |
| audio /= audio_max |
|
|
| if not self.hubert_model or embedder_model != self.last_embedder_model: |
| self.load_hubert(embedder_model, embedder_model_custom) |
| self.last_embedder_model = embedder_model |
|
|
| file_index = ( |
| index_path.strip() |
| .strip('"') |
| .strip("\n") |
| .strip('"') |
| .strip() |
| .replace("trained", "added") |
| ) |
|
|
| if self.tgt_sr != resample_sr >= 16000: |
| self.tgt_sr = resample_sr |
|
|
| if split_audio: |
| chunks, intervals = process_audio(audio, 16000) |
| print(f"Audio split into {len(chunks)} chunks for processing.") |
| else: |
| chunks = [] |
| chunks.append(audio) |
|
|
| converted_chunks = [] |
| for c in chunks: |
| audio_opt = self.vc.pipeline( |
| model=self.hubert_model, |
| net_g=self.net_g, |
| sid=sid, |
| audio=c, |
| pitch=pitch, |
| f0_method=f0_method, |
| file_index=file_index, |
| index_rate=index_rate, |
| pitch_guidance=self.use_f0, |
| filter_radius=filter_radius, |
| volume_envelope=volume_envelope, |
| version=self.version, |
| protect=protect, |
| hop_length=hop_length, |
| f0_autotune=f0_autotune, |
| f0_file=f0_file, |
| ) |
| converted_chunks.append(audio_opt) |
| if split_audio: |
| print(f"Converted audio chunk {len(converted_chunks)}") |
|
|
| if split_audio: |
| audio_opt = merge_audio(converted_chunks, intervals, 16000, self.tgt_sr) |
| else: |
| audio_opt = converted_chunks[0] |
|
|
| if clean_audio: |
| cleaned_audio = self.remove_audio_noise( |
| audio_opt, self.tgt_sr, clean_strength |
| ) |
| if cleaned_audio is not None: |
| audio_opt = cleaned_audio |
|
|
| if post_process: |
| audio_opt = self.post_process_audio( |
| audio_input=audio_opt, |
| sample_rate=self.tgt_sr, |
| **kwargs, |
| ) |
|
|
| sf.write(audio_output_path, audio_opt, self.tgt_sr, format="WAV") |
| output_path_format = audio_output_path.replace( |
| ".wav", f".{export_format.lower()}" |
| ) |
| audio_output_path = self.convert_audio_format( |
| audio_output_path, output_path_format, export_format |
| ) |
|
|
| elapsed_time = time.time() - start_time |
| print( |
| f"Conversion completed at '{audio_output_path}' in {elapsed_time:.2f} seconds." |
| ) |
| except Exception as error: |
| print(f"An error occurred during audio conversion: {error}") |
| print(traceback.format_exc()) |
|
|
| def convert_audio_batch( |
| self, |
| audio_input_paths: str, |
| audio_output_path: str, |
| **kwargs, |
| ): |
| """ |
| Performs voice conversion on a batch of input audio files. |
| |
| Args: |
| audio_input_paths (str): List of paths to the input audio files. |
| audio_output_path (str): Path to the output audio file. |
| resample_sr (int, optional): Resample sampling rate. Default is 0. |
| sid (int, optional): Speaker ID. Default is 0. |
| **kwargs: Additional keyword arguments. |
| """ |
| pid = os.getpid() |
| try: |
| with open( |
| os.path.join(now_dir, "assets", "infer_pid.txt"), "w" |
| ) as pid_file: |
| pid_file.write(str(pid)) |
| start_time = time.time() |
| print(f"Converting audio batch '{audio_input_paths}'...") |
| audio_files = [ |
| f |
| for f in os.listdir(audio_input_paths) |
| if f.endswith( |
| ( |
| "wav", |
| "mp3", |
| "flac", |
| "ogg", |
| "opus", |
| "m4a", |
| "mp4", |
| "aac", |
| "alac", |
| "wma", |
| "aiff", |
| "webm", |
| "ac3", |
| ) |
| ) |
| ] |
| print(f"Detected {len(audio_files)} audio files for inference.") |
| for a in audio_files: |
| new_input = os.path.join(audio_input_paths, a) |
| new_output = os.path.splitext(a)[0] + "_output.wav" |
| new_output = os.path.join(audio_output_path, new_output) |
| if os.path.exists(new_output): |
| continue |
| self.convert_audio( |
| audio_input_path=new_input, |
| audio_output_path=new_output, |
| **kwargs, |
| ) |
| print(f"Conversion completed at '{audio_input_paths}'.") |
| elapsed_time = time.time() - start_time |
| print(f"Batch conversion completed in {elapsed_time:.2f} seconds.") |
| except Exception as error: |
| print(f"An error occurred during audio batch conversion: {error}") |
| print(traceback.format_exc()) |
| finally: |
| os.remove(os.path.join(now_dir, "assets", "infer_pid.txt")) |
|
|
| def get_vc(self, weight_root, sid): |
| """ |
| Loads the voice conversion model and sets up the pipeline. |
| |
| Args: |
| weight_root (str): Path to the model weights. |
| sid (int): Speaker ID. |
| """ |
| if sid == "" or sid == []: |
| self.cleanup_model() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
|
|
| self.load_model(weight_root) |
|
|
| if self.cpt is not None: |
| self.setup_network() |
| self.setup_vc_instance() |
|
|
| def cleanup_model(self): |
| """ |
| Cleans up the model and releases resources. |
| """ |
| if self.hubert_model is not None: |
| del self.net_g, self.n_spk, self.vc, self.hubert_model, self.tgt_sr |
| self.hubert_model = self.net_g = self.n_spk = self.vc = self.tgt_sr = None |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
|
|
| del self.net_g, self.cpt |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| self.cpt = None |
|
|
| def load_model(self, weight_root): |
| """ |
| Loads the model weights from the specified path. |
| |
| Args: |
| weight_root (str): Path to the model weights. |
| """ |
| self.cpt = ( |
| torch.load(weight_root, map_location="cpu") |
| if os.path.isfile(weight_root) |
| else None |
| ) |
|
|
| def setup_network(self): |
| """ |
| Sets up the network configuration based on the loaded checkpoint. |
| """ |
| if self.cpt is not None: |
| self.tgt_sr = self.cpt["config"][-1] |
| self.cpt["config"][-3] = self.cpt["weight"]["emb_g.weight"].shape[0] |
| self.use_f0 = self.cpt.get("f0", 1) |
|
|
| self.version = self.cpt.get("version", "v1") |
| self.text_enc_hidden_dim = 768 if self.version == "v2" and os.path.exists("/content/Applio/Large_hubert.txt") == False else 256 if self.version == "v1" and os.path.exists("/content/Applio/Large_hubert.txt") == False else 1024 |
| self.net_g = Synthesizer( |
| *self.cpt["config"], |
| use_f0=self.use_f0, |
| text_enc_hidden_dim=self.text_enc_hidden_dim, |
| is_half=self.config.is_half, |
| ) |
| del self.net_g.enc_q |
| self.net_g.load_state_dict(self.cpt["weight"], strict=False) |
| self.net_g.eval().to(self.config.device) |
| self.net_g = ( |
| self.net_g.half() if self.config.is_half else self.net_g.float() |
| ) |
|
|
| def setup_vc_instance(self): |
| """ |
| Sets up the voice conversion pipeline instance based on the target sampling rate and configuration. |
| """ |
| if self.cpt is not None: |
| self.vc = VC(self.tgt_sr, self.config) |
| self.n_spk = self.cpt["config"][-3] |
|
|