| import os |
| import shutil |
| import gc |
| import torch |
| from multiprocessing import cpu_count |
| from lib.modules import VC |
| from lib.split_audio import split_silence_nonsilent, adjust_audio_lengths, combine_silence_nonsilent |
| import logging |
| from datetime import datetime |
| import traceback |
|
|
| |
| logging.basicConfig( |
| level=logging.DEBUG, |
| format='%(asctime)s - %(levelname)s - %(process)d - %(funcName)s:%(lineno)d - %(message)s', |
| handlers=[ |
| logging.FileHandler(f'debug_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'), |
| logging.StreamHandler() |
| ] |
| ) |
|
|
| class Configs: |
| def __init__(self, device, is_half): |
| logging.debug(f"Initializing Configs with device={device}, is_half={is_half}") |
| self.device = device |
| self.is_half = is_half |
| self.n_cpu = 0 |
| self.gpu_name = None |
| self.gpu_mem = None |
| try: |
| self.x_pad, self.x_query, self.x_center, self.x_max = self.device_config() |
| logging.debug(f"Device configuration: pad={self.x_pad}, query={self.x_query}, " |
| f"center={self.x_center}, max={self.x_max}") |
| except Exception as e: |
| logging.error(f"Failed to configure device: {str(e)}") |
| raise |
|
|
| def device_config(self) -> tuple: |
| if torch.cuda.is_available(): |
| i_device = int(self.device.split(":")[-1]) |
| self.gpu_name = torch.cuda.get_device_name(i_device) |
| logging.debug(f"GPU detected: {self.gpu_name}") |
| elif torch.backends.mps.is_available(): |
| logging.warning("No supported N-card found, falling back to MPS") |
| self.device = "mps" |
| else: |
| logging.warning("No supported N-card found, falling back to CPU") |
| self.device = "cpu" |
|
|
| if self.n_cpu == 0: |
| self.n_cpu = cpu_count() |
| logging.debug(f"Detected {self.n_cpu} CPU cores") |
|
|
| |
| if self.is_half: |
| x_pad = 3 |
| x_query = 10 |
| x_center = 60 |
| x_max = 65 |
| else: |
| x_pad = 1 |
| x_query = 6 |
| x_center = 38 |
| x_max = 41 |
|
|
| if self.gpu_mem is not None and self.gpu_mem <= 4: |
| x_pad = 1 |
| x_query = 5 |
| x_center = 30 |
| x_max = 32 |
|
|
| return x_pad, x_query, x_center, x_max |
|
|
| def get_model(voice_model): |
| model_dir = os.path.join(os.getcwd(), "models", voice_model) |
| logging.debug(f"Searching for model files in directory: {model_dir}") |
| |
| model_filename, index_filename = None, None |
| try: |
| for file in os.listdir(model_dir): |
| ext = os.path.splitext(file)[1] |
| if ext == '.pth': |
| model_filename = file |
| logging.debug(f"Found model file: {file}") |
| elif ext == '.index': |
| index_filename = file |
| logging.debug(f"Found index file: {file}") |
|
|
| if model_filename is None: |
| logging.error(f"No model file exists in {model_dir}") |
| raise FileNotFoundError(f"No model file exists in {model_dir}") |
|
|
| return os.path.join(model_dir, model_filename), os.path.join(model_dir, index_filename) if index_filename else '' |
| |
| except Exception as e: |
| logging.error(f"Failed to retrieve model files: {str(e)}") |
| raise |
|
|
| def infer_audio( |
| model_name, |
| audio_path, |
| f0_change=0, |
| f0_method="rmvpe+", |
| min_pitch="50", |
| max_pitch="1100", |
| crepe_hop_length=128, |
| index_rate=0.75, |
| filter_radius=3, |
| rms_mix_rate=0.25, |
| protect=0.33, |
| split_infer=False, |
| min_silence=500, |
| silence_threshold=-50, |
| seek_step=1, |
| keep_silence=100, |
| do_formant=False, |
| quefrency=0, |
| timbre=1, |
| f0_autotune=False, |
| audio_format="wav", |
| resample_sr=0, |
| hubert_model_path="assets/hubert/hubert_base.pt", |
| rmvpe_model_path="assets/rmvpe/rmvpe.pt", |
| fcpe_model_path="assets/fcpe/fcpe.pt" |
| ): |
| logging.info(f"Starting inference with parameters:") |
| logging.info(f"- Model: {model_name}") |
| logging.info(f"- Audio path: {audio_path}") |
| logging.info(f"- F0 change: {f0_change}, Method: {f0_method}") |
| logging.info(f"- Split inference: {split_infer}") |
|
|
| os.environ["rmvpe_model_path"] = rmvpe_model_path |
| os.environ["fcpe_model_path"] = fcpe_model_path |
| |
| try: |
| configs = Configs('cuda:0', True) |
| vc = VC(configs) |
| pth_path, index_path = get_model(model_name) |
| vc_data = vc.get_vc(pth_path, protect, 0.5) |
| |
| if split_infer: |
| logging.info("Split inference mode enabled") |
| inferred_files = [] |
| temp_dir = os.path.join(os.getcwd(), "seperate", "temp") |
| os.makedirs(temp_dir, exist_ok=True) |
| |
| try: |
| silence_files, nonsilent_files = split_silence_nonsilent( |
| audio_path, min_silence, silence_threshold, seek_step, keep_silence |
| ) |
| logging.debug(f"Silence segments: {len(silence_files)}") |
| logging.debug(f"Nonsilent segments: {len(nonsilent_files)}") |
|
|
| for i, nonsilent_file in enumerate(nonsilent_files): |
| logging.info(f"Processing segment {i+1}/{len(nonsilent_files)}") |
| |
| start_time = datetime.now() |
| inference_info, audio_data, output_path = vc.vc_single( |
| 0, |
| nonsilent_file, |
| f0_change, |
| f0_method, |
| index_path, |
| index_path, |
| index_rate, |
| filter_radius, |
| resample_sr, |
| rms_mix_rate, |
| protect, |
| audio_format, |
| crepe_hop_length, |
| do_formant, |
| quefrency, |
| timbre, |
| min_pitch, |
| max_pitch, |
| f0_autotune, |
| hubert_model_path |
| ) |
| process_time = (datetime.now() - start_time).total_seconds() |
| logging.debug(f"Segment processing time: {process_time:.2f}s") |
|
|
| if inference_info[0] == "Success.": |
| logging.info("Segment processed successfully") |
| logging.debug(inference_info[1]) |
| logging.debug(f"Times:\nnpy: %.2fs f0: %.2fs infer: %.2fs\nTotal time: %.2fs" % (*inference_info[2],)) |
| inferred_files.append(output_path) |
| else: |
| logging.error(f"Error processing segment {i+1}: {inference_info[0]}") |
| raise RuntimeError(f"Error processing segment {i+1}") |
|
|
| logging.info("Adjusting inferred audio lengths") |
| adjusted_inferred_files = adjust_audio_lengths(nonsilent_files, inferred_files) |
|
|
| logging.info("Combining silence and inferred audios") |
| output_count = 1 |
| while True: |
| output_path = os.path.join( |
| os.getcwd(), |
| "output", |
| f"{os.path.splitext(os.path.basename(audio_path))[0]}{model_name}" |
| f"{f0_method.capitalize()}_{output_count}.{audio_format}" |
| ) |
| if not os.path.exists(output_path): |
| break |
| output_count += 1 |
| |
| output_path = combine_silence_nonsilent(silence_files, adjusted_inferred_files, keep_silence, output_path) |
| |
| |
| for inferred_file in inferred_files: |
| shutil.move(inferred_file, temp_dir) |
| shutil.rmtree(temp_dir) |
| |
| except Exception as e: |
| logging.error(f"Split inference failed: {str(e)}") |
| raise |
|
|
| else: |
| logging.info("Single inference mode") |
| start_time = datetime.now() |
| inference_info, audio_data, output_path = vc.vc_single( |
| 0, |
| audio_path, |
| f0_change, |
| f0_method, |
| index_path, |
| index_path, |
| index_rate, |
| filter_radius, |
| resample_sr, |
| rms_mix_rate, |
| protect, |
| audio_format, |
| crepe_hop_length, |
| do_formant, |
| quefrency, |
| timbre, |
| min_pitch, |
| max_pitch, |
| f0_autotune, |
| hubert_model_path |
| ) |
| process_time = (datetime.now() - start_time).total_seconds() |
| logging.debug(f"Total processing time: {process_time:.2f}s") |
|
|
| if inference_info[0] == "Success.": |
| logging.info("Inference completed successfully") |
| logging.debug(inference_info[1]) |
| logging.debug(f"Times:\nnpy: %.2fs f0: %.2fs infer: %.2fs\nTotal time: %.2fs" % (*inference_info[2],)) |
| else: |
| logging.error(f"Inference failed: {inference_info[0]}") |
| raise RuntimeError(inference_info[0]) |
|
|
| del configs, vc |
| gc.collect() |
| return output_path |
| |
| except Exception as e: |
| logging.error(f"Inference failed: {str(e)}") |
| logging.error(traceback.format_exc()) |
| raise |