| |
| |
| |
| |
|
|
| import os |
| import torch |
| import numpy as np |
|
|
| import json |
| from tqdm import tqdm |
| from sklearn.preprocessing import StandardScaler |
| from utils.io import save_feature, save_txt |
| from utils.util import has_existed |
| from utils.tokenizer import extract_encodec_token |
| from utils.stft import TacotronSTFT |
| from utils.dsp import compress, audio_to_label |
| from utils.data_utils import remove_outlier |
| from preprocessors.metadata import replace_augment_name |
| from scipy.interpolate import interp1d |
|
|
| ZERO = 1e-12 |
|
|
|
|
| def extract_utt_acoustic_features_parallel(metadata, dataset_output, cfg, n_workers=1): |
| """Extract acoustic features from utterances using muliprocess |
| |
| Args: |
| metadata (dict): dictionary that stores data in train.json and test.json files |
| dataset_output (str): directory to store acoustic features |
| cfg (dict): dictionary that stores configurations |
| n_workers (int, optional): num of processes to extract features in parallel. Defaults to 1. |
| |
| Returns: |
| list: acoustic features |
| """ |
| for utt in tqdm(metadata): |
| if cfg.task_type == "tts": |
| extract_utt_acoustic_features_tts(dataset_output, cfg, utt) |
| if cfg.task_type == "svc": |
| extract_utt_acoustic_features_svc(dataset_output, cfg, utt) |
| if cfg.task_type == "vocoder": |
| extract_utt_acoustic_features_vocoder(dataset_output, cfg, utt) |
| if cfg.task_type == "tta": |
| extract_utt_acoustic_features_tta(dataset_output, cfg, utt) |
|
|
|
|
| def avg_phone_feature(feature, duration, interpolation=False): |
| feature = feature[: sum(duration)] |
| if interpolation: |
| nonzero_ids = np.where(feature != 0)[0] |
| interp_fn = interp1d( |
| nonzero_ids, |
| feature[nonzero_ids], |
| fill_value=(feature[nonzero_ids[0]], feature[nonzero_ids[-1]]), |
| bounds_error=False, |
| ) |
| feature = interp_fn(np.arange(0, len(feature))) |
|
|
| |
| pos = 0 |
| for i, d in enumerate(duration): |
| if d > 0: |
| feature[i] = np.mean(feature[pos : pos + d]) |
| else: |
| feature[i] = 0 |
| pos += d |
| feature = feature[: len(duration)] |
| return feature |
|
|
|
|
| def extract_utt_acoustic_features_serial(metadata, dataset_output, cfg): |
| """Extract acoustic features from utterances (in single process) |
| |
| Args: |
| metadata (dict): dictionary that stores data in train.json and test.json files |
| dataset_output (str): directory to store acoustic features |
| cfg (dict): dictionary that stores configurations |
| |
| """ |
| for utt in tqdm(metadata): |
| if cfg.task_type == "tts": |
| extract_utt_acoustic_features_tts(dataset_output, cfg, utt) |
| if cfg.task_type == "svc": |
| extract_utt_acoustic_features_svc(dataset_output, cfg, utt) |
| if cfg.task_type == "vocoder": |
| extract_utt_acoustic_features_vocoder(dataset_output, cfg, utt) |
| if cfg.task_type == "tta": |
| extract_utt_acoustic_features_tta(dataset_output, cfg, utt) |
|
|
|
|
| def __extract_utt_acoustic_features(dataset_output, cfg, utt): |
| """Extract acoustic features from utterances (in single process) |
| |
| Args: |
| dataset_output (str): directory to store acoustic features |
| cfg (dict): dictionary that stores configurations |
| utt (dict): utterance info including dataset, singer, uid:{singer}_{song}_{index}, |
| path to utternace, duration, utternace index |
| |
| """ |
| from utils import audio, f0, world, duration |
|
|
| uid = utt["Uid"] |
| wav_path = utt["Path"] |
| if os.path.exists(os.path.join(dataset_output, cfg.preprocess.raw_data)): |
| wav_path = os.path.join( |
| dataset_output, cfg.preprocess.raw_data, utt["Singer"], uid + ".wav" |
| ) |
|
|
| with torch.no_grad(): |
| |
| wav_torch, _ = audio.load_audio_torch(wav_path, cfg.preprocess.sample_rate) |
| wav = wav_torch.cpu().numpy() |
|
|
| |
| if cfg.preprocess.extract_duration: |
| durations, phones, start, end = duration.get_duration( |
| utt, wav, cfg.preprocess |
| ) |
| save_feature(dataset_output, cfg.preprocess.duration_dir, uid, durations) |
| save_txt(dataset_output, cfg.preprocess.lab_dir, uid, phones) |
| wav = wav[start:end].astype(np.float32) |
| wav_torch = torch.from_numpy(wav).to(wav_torch.device) |
|
|
| if cfg.preprocess.extract_linear_spec: |
| from utils.mel import extract_linear_features |
|
|
| linear = extract_linear_features(wav_torch.unsqueeze(0), cfg.preprocess) |
| save_feature( |
| dataset_output, cfg.preprocess.linear_dir, uid, linear.cpu().numpy() |
| ) |
|
|
| if cfg.preprocess.extract_mel: |
| from utils.mel import extract_mel_features |
|
|
| if cfg.preprocess.mel_extract_mode == "taco": |
| _stft = TacotronSTFT( |
| sampling_rate=cfg.preprocess.sample_rate, |
| win_length=cfg.preprocess.win_size, |
| hop_length=cfg.preprocess.hop_size, |
| filter_length=cfg.preprocess.n_fft, |
| n_mel_channels=cfg.preprocess.n_mel, |
| mel_fmin=cfg.preprocess.fmin, |
| mel_fmax=cfg.preprocess.fmax, |
| ) |
| mel = extract_mel_features( |
| wav_torch.unsqueeze(0), cfg.preprocess, taco=True, _stft=_stft |
| ) |
| if cfg.preprocess.extract_duration: |
| mel = mel[:, : sum(durations)] |
| else: |
| mel = extract_mel_features(wav_torch.unsqueeze(0), cfg.preprocess) |
| save_feature(dataset_output, cfg.preprocess.mel_dir, uid, mel.cpu().numpy()) |
|
|
| if cfg.preprocess.extract_energy: |
| if ( |
| cfg.preprocess.energy_extract_mode == "from_mel" |
| and cfg.preprocess.extract_mel |
| ): |
| energy = (mel.exp() ** 2).sum(0).sqrt().cpu().numpy() |
| elif cfg.preprocess.energy_extract_mode == "from_waveform": |
| energy = audio.energy(wav, cfg.preprocess) |
| elif cfg.preprocess.energy_extract_mode == "from_tacotron_stft": |
| _stft = TacotronSTFT( |
| sampling_rate=cfg.preprocess.sample_rate, |
| win_length=cfg.preprocess.win_size, |
| hop_length=cfg.preprocess.hop_size, |
| filter_length=cfg.preprocess.n_fft, |
| n_mel_channels=cfg.preprocess.n_mel, |
| mel_fmin=cfg.preprocess.fmin, |
| mel_fmax=cfg.preprocess.fmax, |
| ) |
| _, energy = audio.get_energy_from_tacotron(wav, _stft) |
| else: |
| assert cfg.preprocess.energy_extract_mode in [ |
| "from_mel", |
| "from_waveform", |
| "from_tacotron_stft", |
| ], f"{cfg.preprocess.energy_extract_mode} not in supported energy_extract_mode [from_mel, from_waveform, from_tacotron_stft]" |
| if cfg.preprocess.extract_duration: |
| energy = energy[: sum(durations)] |
| phone_energy = avg_phone_feature(energy, durations) |
| save_feature( |
| dataset_output, cfg.preprocess.phone_energy_dir, uid, phone_energy |
| ) |
|
|
| save_feature(dataset_output, cfg.preprocess.energy_dir, uid, energy) |
|
|
| if cfg.preprocess.extract_pitch: |
| pitch = f0.get_f0(wav, cfg.preprocess) |
| if cfg.preprocess.extract_duration: |
| pitch = pitch[: sum(durations)] |
| phone_pitch = avg_phone_feature(pitch, durations, interpolation=True) |
| save_feature( |
| dataset_output, cfg.preprocess.phone_pitch_dir, uid, phone_pitch |
| ) |
| save_feature(dataset_output, cfg.preprocess.pitch_dir, uid, pitch) |
|
|
| if cfg.preprocess.extract_uv: |
| assert isinstance(pitch, np.ndarray) |
| uv = pitch != 0 |
| save_feature(dataset_output, cfg.preprocess.uv_dir, uid, uv) |
|
|
| if cfg.preprocess.extract_audio: |
| save_feature(dataset_output, cfg.preprocess.audio_dir, uid, wav) |
|
|
| if cfg.preprocess.extract_label: |
| if cfg.preprocess.is_mu_law: |
| |
| wav = compress(wav, cfg.preprocess.bits) |
| label = audio_to_label(wav, cfg.preprocess.bits) |
| save_feature(dataset_output, cfg.preprocess.label_dir, uid, label) |
|
|
| if cfg.preprocess.extract_acoustic_token: |
| if cfg.preprocess.acoustic_token_extractor == "Encodec": |
| codes = extract_encodec_token(wav_path) |
| save_feature( |
| dataset_output, cfg.preprocess.acoustic_token_dir, uid, codes |
| ) |
|
|
|
|
| def extract_utt_acoustic_features_tts(dataset_output, cfg, utt): |
| __extract_utt_acoustic_features(dataset_output, cfg, utt) |
|
|
|
|
| def extract_utt_acoustic_features_svc(dataset_output, cfg, utt): |
| """Extract acoustic features from utterances (in single process) |
| |
| Args: |
| dataset_output (str): directory to store acoustic features |
| cfg (dict): dictionary that stores configurations |
| utt (dict): utterance info including dataset, singer, uid:{singer}_{song}_{index}, |
| path to utternace, duration, utternace index |
| |
| """ |
| from utils import audio, f0, world, duration |
|
|
| uid = utt["Uid"] |
| wav_path = utt["Path"] |
|
|
| with torch.no_grad(): |
| |
| wav_torch, _ = audio.load_audio_torch(wav_path, cfg.preprocess.sample_rate) |
| wav = wav_torch.cpu().numpy() |
|
|
| |
| if cfg.preprocess.extract_mel: |
| from utils.mel import extract_mel_features |
|
|
| mel = extract_mel_features(wav_torch.unsqueeze(0), cfg.preprocess) |
| save_feature(dataset_output, cfg.preprocess.mel_dir, uid, mel.cpu().numpy()) |
|
|
| if cfg.preprocess.extract_energy: |
| energy = (mel.exp() ** 2).sum(0).sqrt().cpu().numpy() |
| save_feature(dataset_output, cfg.preprocess.energy_dir, uid, energy) |
|
|
| if cfg.preprocess.extract_pitch: |
| pitch = f0.get_f0(wav, cfg.preprocess) |
| save_feature(dataset_output, cfg.preprocess.pitch_dir, uid, pitch) |
|
|
| if cfg.preprocess.extract_uv: |
| assert isinstance(pitch, np.ndarray) |
| uv = pitch != 0 |
| save_feature(dataset_output, cfg.preprocess.uv_dir, uid, uv) |
|
|
|
|
| def extract_utt_acoustic_features_tta(dataset_output, cfg, utt): |
| __extract_utt_acoustic_features(dataset_output, cfg, utt) |
|
|
|
|
| def extract_utt_acoustic_features_vocoder(dataset_output, cfg, utt): |
| """Extract acoustic features from utterances (in single process) |
| |
| Args: |
| dataset_output (str): directory to store acoustic features |
| cfg (dict): dictionary that stores configurations |
| utt (dict): utterance info including dataset, singer, uid:{singer}_{song}_{index}, |
| path to utternace, duration, utternace index |
| |
| """ |
| from utils import audio, f0, world, duration |
|
|
| uid = utt["Uid"] |
| wav_path = utt["Path"] |
|
|
| with torch.no_grad(): |
| |
| wav_torch, _ = audio.load_audio_torch(wav_path, cfg.preprocess.sample_rate) |
| wav = wav_torch.cpu().numpy() |
|
|
| |
| if cfg.preprocess.extract_mel: |
| from utils.mel import extract_mel_features |
|
|
| mel = extract_mel_features(wav_torch.unsqueeze(0), cfg.preprocess) |
| save_feature(dataset_output, cfg.preprocess.mel_dir, uid, mel.cpu().numpy()) |
|
|
| if cfg.preprocess.extract_energy: |
| if ( |
| cfg.preprocess.energy_extract_mode == "from_mel" |
| and cfg.preprocess.extract_mel |
| ): |
| energy = (mel.exp() ** 2).sum(0).sqrt().cpu().numpy() |
| elif cfg.preprocess.energy_extract_mode == "from_waveform": |
| energy = audio.energy(wav, cfg.preprocess) |
| else: |
| assert cfg.preprocess.energy_extract_mode in [ |
| "from_mel", |
| "from_waveform", |
| ], f"{cfg.preprocess.energy_extract_mode} not in supported energy_extract_mode [from_mel, from_waveform, from_tacotron_stft]" |
|
|
| save_feature(dataset_output, cfg.preprocess.energy_dir, uid, energy) |
|
|
| if cfg.preprocess.extract_pitch: |
| pitch = f0.get_f0(wav, cfg.preprocess) |
| save_feature(dataset_output, cfg.preprocess.pitch_dir, uid, pitch) |
|
|
| if cfg.preprocess.extract_uv: |
| assert isinstance(pitch, np.ndarray) |
| uv = pitch != 0 |
| save_feature(dataset_output, cfg.preprocess.uv_dir, uid, uv) |
|
|
| if cfg.preprocess.extract_audio: |
| save_feature(dataset_output, cfg.preprocess.audio_dir, uid, wav) |
|
|
| if cfg.preprocess.extract_label: |
| if cfg.preprocess.is_mu_law: |
| |
| wav = compress(wav, cfg.preprocess.bits) |
| label = audio_to_label(wav, cfg.preprocess.bits) |
| save_feature(dataset_output, cfg.preprocess.label_dir, uid, label) |
|
|
|
|
| def cal_normalized_mel(mel, dataset_name, cfg): |
| mel_min, mel_max = load_mel_extrema(cfg, dataset_name) |
| mel_norm = normalize_mel_channel(mel, mel_min, mel_max) |
| return mel_norm |
|
|
|
|
| def cal_mel_min_max(dataset, output_path, cfg, metadata=None): |
| dataset_output = os.path.join(output_path, dataset) |
|
|
| if metadata is None: |
| metadata = [] |
| for dataset_type in ["train", "test"] if "eval" not in dataset else ["test"]: |
| dataset_file = os.path.join(dataset_output, "{}.json".format(dataset_type)) |
| with open(dataset_file, "r") as f: |
| metadata.extend(json.load(f)) |
|
|
| tmp_mel_min = [] |
| tmp_mel_max = [] |
| for item in metadata: |
| mel_path = os.path.join( |
| dataset_output, cfg.preprocess.mel_dir, item["Uid"] + ".npy" |
| ) |
| if not os.path.exists(mel_path): |
| continue |
| mel = np.load(mel_path) |
| if mel.shape[0] != cfg.preprocess.n_mel: |
| mel = mel.T |
| |
| assert mel.shape[0] == cfg.preprocess.n_mel |
|
|
| tmp_mel_min.append(np.min(mel, axis=-1)) |
| tmp_mel_max.append(np.max(mel, axis=-1)) |
|
|
| mel_min = np.min(tmp_mel_min, axis=0) |
| mel_max = np.max(tmp_mel_max, axis=0) |
|
|
| |
| mel_min_max_dir = os.path.join(dataset_output, cfg.preprocess.mel_min_max_stats_dir) |
| os.makedirs(mel_min_max_dir, exist_ok=True) |
|
|
| mel_min_path = os.path.join(mel_min_max_dir, "mel_min.npy") |
| mel_max_path = os.path.join(mel_min_max_dir, "mel_max.npy") |
| np.save(mel_min_path, mel_min) |
| np.save(mel_max_path, mel_max) |
|
|
|
|
| def denorm_for_pred_mels(cfg, dataset_name, split, pred): |
| """ |
| Args: |
| pred: a list whose every element is (frame_len, n_mels) |
| Return: |
| similar like pred |
| """ |
| mel_min, mel_max = load_mel_extrema(cfg.preprocess, dataset_name) |
| recovered_mels = [ |
| denormalize_mel_channel(mel.T, mel_min, mel_max).T for mel in pred |
| ] |
|
|
| return recovered_mels |
|
|
|
|
| def load_mel_extrema(cfg, dataset_name): |
| data_dir = os.path.join(cfg.processed_dir, dataset_name, cfg.mel_min_max_stats_dir) |
|
|
| min_file = os.path.join(data_dir, "mel_min.npy") |
| max_file = os.path.join(data_dir, "mel_max.npy") |
|
|
| mel_min = np.load(min_file) |
| mel_max = np.load(max_file) |
|
|
| return mel_min, mel_max |
|
|
|
|
| def denormalize_mel_channel(mel, mel_min, mel_max): |
| mel_min = np.expand_dims(mel_min, -1) |
| mel_max = np.expand_dims(mel_max, -1) |
| return (mel + 1) / 2 * (mel_max - mel_min + ZERO) + mel_min |
|
|
|
|
| def normalize_mel_channel(mel, mel_min, mel_max): |
| mel_min = np.expand_dims(mel_min, -1) |
| mel_max = np.expand_dims(mel_max, -1) |
| return (mel - mel_min) / (mel_max - mel_min + ZERO) * 2 - 1 |
|
|
|
|
| def normalize(dataset, feat_dir, cfg): |
| dataset_output = os.path.join(cfg.preprocess.processed_dir, dataset) |
| print(f"normalize {feat_dir}") |
|
|
| max_value = np.finfo(np.float64).min |
| min_value = np.finfo(np.float64).max |
|
|
| scaler = StandardScaler() |
| feat_files = os.listdir(os.path.join(dataset_output, feat_dir)) |
|
|
| for feat_file in tqdm(feat_files): |
| feat_file = os.path.join(dataset_output, feat_dir, feat_file) |
| if not feat_file.endswith(".npy"): |
| continue |
| feat = np.load(feat_file) |
| max_value = max(max_value, max(feat)) |
| min_value = min(min_value, min(feat)) |
| scaler.partial_fit(feat.reshape((-1, 1))) |
| mean = scaler.mean_[0] |
| std = scaler.scale_[0] |
| stat = np.array([min_value, max_value, mean, std]) |
| stat_npy = os.path.join(dataset_output, f"{feat_dir}_stat.npy") |
| np.save(stat_npy, stat) |
| return mean, std, min_value, max_value |
|
|
|
|
| def load_normalized(feat_dir, dataset_name, cfg): |
| dataset_output = os.path.join(cfg.preprocess.processed_dir, dataset_name) |
| stat_npy = os.path.join(dataset_output, f"{feat_dir}_stat.npy") |
| min_value, max_value, mean, std = np.load(stat_npy) |
| return mean, std, min_value, max_value |
|
|
|
|
| def cal_pitch_statistics_svc(dataset, output_path, cfg, metadata=None): |
| |
| dataset_dir = os.path.join(output_path, dataset) |
| save_dir = os.path.join(dataset_dir, cfg.preprocess.pitch_dir) |
| os.makedirs(save_dir, exist_ok=True) |
| if has_existed(os.path.join(save_dir, "statistics.json")): |
| return |
|
|
| if metadata is None: |
| |
| singers = json.load(open(os.path.join(dataset_dir, "singers.json"), "r")) |
|
|
| |
| metadata = [] |
| for dataset_type in ["train", "test"] if "eval" not in dataset else ["test"]: |
| dataset_file = os.path.join(dataset_dir, "{}.json".format(dataset_type)) |
| with open(dataset_file, "r") as f: |
| metadata.extend(json.load(f)) |
| else: |
| singers = list(set([item["Singer"] for item in metadata])) |
| singers = { |
| "{}_{}".format(dataset, name): idx for idx, name in enumerate(singers) |
| } |
|
|
| |
| pitch_scalers = [[] for _ in range(len(singers))] |
| total_pitch_scalers = [[] for _ in range(len(singers))] |
|
|
| for utt_info in tqdm(metadata, desc="Loading F0..."): |
| |
| singer = utt_info["Singer"] |
| pitch_path = os.path.join( |
| dataset_dir, cfg.preprocess.pitch_dir, utt_info["Uid"] + ".npy" |
| ) |
| |
| if not os.path.exists(pitch_path): |
| continue |
| total_pitch = np.load(pitch_path) |
| assert len(total_pitch) > 0 |
| |
| pitch = total_pitch[total_pitch != 0] |
| spkid = singers[f"{replace_augment_name(dataset)}_{singer}"] |
|
|
| |
| pitch_scalers[spkid].extend(pitch.tolist()) |
| |
| total_pitch_scalers[spkid].extend(total_pitch.tolist()) |
|
|
| |
| sta_dict = {} |
| for singer in tqdm(singers, desc="Singers statistics"): |
| spkid = singers[singer] |
| |
| mean, std, min, max, median = ( |
| np.mean(pitch_scalers[spkid]), |
| np.std(pitch_scalers[spkid]), |
| np.min(pitch_scalers[spkid]), |
| np.max(pitch_scalers[spkid]), |
| np.median(pitch_scalers[spkid]), |
| ) |
|
|
| |
| mean_t, std_t, min_t, max_t, median_t = ( |
| np.mean(total_pitch_scalers[spkid]), |
| np.std(total_pitch_scalers[spkid]), |
| np.min(total_pitch_scalers[spkid]), |
| np.max(total_pitch_scalers[spkid]), |
| np.median(total_pitch_scalers[spkid]), |
| ) |
| sta_dict[singer] = { |
| "voiced_positions": { |
| "mean": mean, |
| "std": std, |
| "median": median, |
| "min": min, |
| "max": max, |
| }, |
| "total_positions": { |
| "mean": mean_t, |
| "std": std_t, |
| "median": median_t, |
| "min": min_t, |
| "max": max_t, |
| }, |
| } |
|
|
| |
| with open(os.path.join(save_dir, "statistics.json"), "w") as f: |
| json.dump(sta_dict, f, indent=4, ensure_ascii=False) |
|
|
|
|
| def cal_pitch_statistics(dataset, output_path, cfg): |
| |
| dataset_dir = os.path.join(output_path, dataset) |
| if cfg.preprocess.use_phone_pitch: |
| pitch_dir = cfg.preprocess.phone_pitch_dir |
| else: |
| pitch_dir = cfg.preprocess.pitch_dir |
| save_dir = os.path.join(dataset_dir, pitch_dir) |
|
|
| os.makedirs(save_dir, exist_ok=True) |
| if has_existed(os.path.join(save_dir, "statistics.json")): |
| return |
| |
| singers = json.load(open(os.path.join(dataset_dir, "singers.json"), "r")) |
|
|
| |
| metadata = [] |
| for dataset_type in ["train", "test"] if "eval" not in dataset else ["test"]: |
| dataset_file = os.path.join(dataset_dir, "{}.json".format(dataset_type)) |
| with open(dataset_file, "r") as f: |
| metadata.extend(json.load(f)) |
|
|
| |
| pitch_scalers = [[] for _ in range(len(singers))] |
| total_pitch_scalers = [[] for _ in range(len(singers))] |
|
|
| for utt_info in metadata: |
| utt = f'{utt_info["Dataset"]}_{utt_info["Uid"]}' |
| singer = utt_info["Singer"] |
| pitch_path = os.path.join(dataset_dir, pitch_dir, utt_info["Uid"] + ".npy") |
| |
| if not os.path.exists(pitch_path): |
| continue |
| total_pitch = np.load(pitch_path) |
| assert len(total_pitch) > 0 |
| |
| |
| if cfg.preprocess.pitch_remove_outlier: |
| pitch = remove_outlier(total_pitch) |
| spkid = singers[f"{replace_augment_name(dataset)}_{singer}"] |
|
|
| |
| pitch_scalers[spkid].extend(pitch.tolist()) |
| |
| total_pitch_scalers[spkid].extend(total_pitch.tolist()) |
|
|
| |
| sta_dict = {} |
| for singer in singers: |
| spkid = singers[singer] |
| |
| mean, std, min, max, median = ( |
| np.mean(pitch_scalers[spkid]), |
| np.std(pitch_scalers[spkid]), |
| np.min(pitch_scalers[spkid]), |
| np.max(pitch_scalers[spkid]), |
| np.median(pitch_scalers[spkid]), |
| ) |
|
|
| |
| mean_t, std_t, min_t, max_t, median_t = ( |
| np.mean(total_pitch_scalers[spkid]), |
| np.std(total_pitch_scalers[spkid]), |
| np.min(total_pitch_scalers[spkid]), |
| np.max(total_pitch_scalers[spkid]), |
| np.median(total_pitch_scalers[spkid]), |
| ) |
| sta_dict[singer] = { |
| "voiced_positions": { |
| "mean": mean, |
| "std": std, |
| "median": median, |
| "min": min, |
| "max": max, |
| }, |
| "total_positions": { |
| "mean": mean_t, |
| "std": std_t, |
| "median": median_t, |
| "min": min_t, |
| "max": max_t, |
| }, |
| } |
|
|
| |
| with open(os.path.join(save_dir, "statistics.json"), "w") as f: |
| json.dump(sta_dict, f, indent=4, ensure_ascii=False) |
|
|
|
|
| def cal_energy_statistics(dataset, output_path, cfg): |
| |
| dataset_dir = os.path.join(output_path, dataset) |
| if cfg.preprocess.use_phone_energy: |
| energy_dir = cfg.preprocess.phone_energy_dir |
| else: |
| energy_dir = cfg.preprocess.energy_dir |
| save_dir = os.path.join(dataset_dir, energy_dir) |
| os.makedirs(save_dir, exist_ok=True) |
| print(os.path.join(save_dir, "statistics.json")) |
| if has_existed(os.path.join(save_dir, "statistics.json")): |
| return |
| |
| singers = json.load(open(os.path.join(dataset_dir, "singers.json"), "r")) |
|
|
| |
| metadata = [] |
| for dataset_type in ["train", "test"] if "eval" not in dataset else ["test"]: |
| dataset_file = os.path.join(dataset_dir, "{}.json".format(dataset_type)) |
| with open(dataset_file, "r") as f: |
| metadata.extend(json.load(f)) |
|
|
| |
| energy_scalers = [[] for _ in range(len(singers))] |
| total_energy_scalers = [[] for _ in range(len(singers))] |
|
|
| for utt_info in metadata: |
| utt = f'{utt_info["Dataset"]}_{utt_info["Uid"]}' |
| singer = utt_info["Singer"] |
| energy_path = os.path.join(dataset_dir, energy_dir, utt_info["Uid"] + ".npy") |
| |
| if not os.path.exists(energy_path): |
| continue |
| total_energy = np.load(energy_path) |
| assert len(total_energy) > 0 |
| |
| |
| if cfg.preprocess.energy_remove_outlier: |
| energy = remove_outlier(total_energy) |
| spkid = singers[f"{replace_augment_name(dataset)}_{singer}"] |
|
|
| |
| energy_scalers[spkid].extend(energy.tolist()) |
| |
| total_energy_scalers[spkid].extend(total_energy.tolist()) |
|
|
| |
| sta_dict = {} |
| for singer in singers: |
| spkid = singers[singer] |
| |
| mean, std, min, max, median = ( |
| np.mean(energy_scalers[spkid]), |
| np.std(energy_scalers[spkid]), |
| np.min(energy_scalers[spkid]), |
| np.max(energy_scalers[spkid]), |
| np.median(energy_scalers[spkid]), |
| ) |
|
|
| |
| mean_t, std_t, min_t, max_t, median_t = ( |
| np.mean(total_energy_scalers[spkid]), |
| np.std(total_energy_scalers[spkid]), |
| np.min(total_energy_scalers[spkid]), |
| np.max(total_energy_scalers[spkid]), |
| np.median(total_energy_scalers[spkid]), |
| ) |
| sta_dict[singer] = { |
| "voiced_positions": { |
| "mean": mean, |
| "std": std, |
| "median": median, |
| "min": min, |
| "max": max, |
| }, |
| "total_positions": { |
| "mean": mean_t, |
| "std": std_t, |
| "median": median_t, |
| "min": min_t, |
| "max": max_t, |
| }, |
| } |
|
|
| |
| with open(os.path.join(save_dir, "statistics.json"), "w") as f: |
| json.dump(sta_dict, f, indent=4, ensure_ascii=False) |
|
|
|
|
| def copy_acoustic_features(metadata, dataset_dir, src_dataset_dir, cfg): |
| """Copy acoustic features from src_dataset_dir to dataset_dir |
| |
| Args: |
| metadata (dict): dictionary that stores data in train.json and test.json files |
| dataset_dir (str): directory to store acoustic features |
| src_dataset_dir (str): directory to store acoustic features |
| cfg (dict): dictionary that stores configurations |
| |
| """ |
|
|
| if cfg.preprocess.extract_mel: |
| if not has_existed(os.path.join(dataset_dir, cfg.preprocess.mel_dir)): |
| os.makedirs( |
| os.path.join(dataset_dir, cfg.preprocess.mel_dir), exist_ok=True |
| ) |
| print( |
| "Copying mel features from {} to {}...".format( |
| src_dataset_dir, dataset_dir |
| ) |
| ) |
| for utt_info in tqdm(metadata): |
| src_mel_path = os.path.join( |
| src_dataset_dir, cfg.preprocess.mel_dir, utt_info["Uid"] + ".npy" |
| ) |
| dst_mel_path = os.path.join( |
| dataset_dir, cfg.preprocess.mel_dir, utt_info["Uid"] + ".npy" |
| ) |
| |
| if not os.path.exists(dst_mel_path): |
| os.symlink(src_mel_path, dst_mel_path) |
| if cfg.preprocess.extract_energy: |
| if not has_existed(os.path.join(dataset_dir, cfg.preprocess.energy_dir)): |
| os.makedirs( |
| os.path.join(dataset_dir, cfg.preprocess.energy_dir), exist_ok=True |
| ) |
| print( |
| "Copying energy features from {} to {}...".format( |
| src_dataset_dir, dataset_dir |
| ) |
| ) |
| for utt_info in tqdm(metadata): |
| src_energy_path = os.path.join( |
| src_dataset_dir, cfg.preprocess.energy_dir, utt_info["Uid"] + ".npy" |
| ) |
| dst_energy_path = os.path.join( |
| dataset_dir, cfg.preprocess.energy_dir, utt_info["Uid"] + ".npy" |
| ) |
| |
| if not os.path.exists(dst_energy_path): |
| os.symlink(src_energy_path, dst_energy_path) |
| if cfg.preprocess.extract_pitch: |
| if not has_existed(os.path.join(dataset_dir, cfg.preprocess.pitch_dir)): |
| os.makedirs( |
| os.path.join(dataset_dir, cfg.preprocess.pitch_dir), exist_ok=True |
| ) |
| print( |
| "Copying pitch features from {} to {}...".format( |
| src_dataset_dir, dataset_dir |
| ) |
| ) |
| for utt_info in tqdm(metadata): |
| src_pitch_path = os.path.join( |
| src_dataset_dir, cfg.preprocess.pitch_dir, utt_info["Uid"] + ".npy" |
| ) |
| dst_pitch_path = os.path.join( |
| dataset_dir, cfg.preprocess.pitch_dir, utt_info["Uid"] + ".npy" |
| ) |
| |
| if not os.path.exists(dst_pitch_path): |
| os.symlink(src_pitch_path, dst_pitch_path) |
| if cfg.preprocess.extract_uv: |
| if not has_existed(os.path.join(dataset_dir, cfg.preprocess.uv_dir)): |
| os.makedirs( |
| os.path.join(dataset_dir, cfg.preprocess.uv_dir), exist_ok=True |
| ) |
| print( |
| "Copying uv features from {} to {}...".format( |
| src_dataset_dir, dataset_dir |
| ) |
| ) |
| for utt_info in tqdm(metadata): |
| src_uv_path = os.path.join( |
| src_dataset_dir, cfg.preprocess.uv_dir, utt_info["Uid"] + ".npy" |
| ) |
| dst_uv_path = os.path.join( |
| dataset_dir, cfg.preprocess.uv_dir, utt_info["Uid"] + ".npy" |
| ) |
| |
| if not os.path.exists(dst_uv_path): |
| os.symlink(src_uv_path, dst_uv_path) |
| if cfg.preprocess.extract_audio: |
| if not has_existed(os.path.join(dataset_dir, cfg.preprocess.audio_dir)): |
| os.makedirs( |
| os.path.join(dataset_dir, cfg.preprocess.audio_dir), exist_ok=True |
| ) |
| print( |
| "Copying audio features from {} to {}...".format( |
| src_dataset_dir, dataset_dir |
| ) |
| ) |
| for utt_info in tqdm(metadata): |
| src_audio_path = os.path.join( |
| src_dataset_dir, cfg.preprocess.audio_dir, utt_info["Uid"] + ".npy" |
| ) |
| dst_audio_path = os.path.join( |
| dataset_dir, cfg.preprocess.audio_dir, utt_info["Uid"] + ".npy" |
| ) |
| |
| if not os.path.exists(dst_audio_path): |
| os.symlink(src_audio_path, dst_audio_path) |
| if cfg.preprocess.extract_label: |
| if not has_existed(os.path.join(dataset_dir, cfg.preprocess.label_dir)): |
| os.makedirs( |
| os.path.join(dataset_dir, cfg.preprocess.label_dir), exist_ok=True |
| ) |
| print( |
| "Copying label features from {} to {}...".format( |
| src_dataset_dir, dataset_dir |
| ) |
| ) |
| for utt_info in tqdm(metadata): |
| src_label_path = os.path.join( |
| src_dataset_dir, cfg.preprocess.label_dir, utt_info["Uid"] + ".npy" |
| ) |
| dst_label_path = os.path.join( |
| dataset_dir, cfg.preprocess.label_dir, utt_info["Uid"] + ".npy" |
| ) |
| |
| if not os.path.exists(dst_label_path): |
| os.symlink(src_label_path, dst_label_path) |
|
|
|
|
| def align_duration_mel(dataset, output_path, cfg): |
| print("align the duration and mel") |
|
|
| dataset_dir = os.path.join(output_path, dataset) |
| metadata = [] |
| for dataset_type in ["train", "test"] if "eval" not in dataset else ["test"]: |
| dataset_file = os.path.join(dataset_dir, "{}.json".format(dataset_type)) |
| with open(dataset_file, "r") as f: |
| metadata.extend(json.load(f)) |
|
|
| utt2dur = {} |
| for index in tqdm(range(len(metadata))): |
| utt_info = metadata[index] |
| dataset = utt_info["Dataset"] |
| uid = utt_info["Uid"] |
| utt = "{}_{}".format(dataset, uid) |
|
|
| mel_path = os.path.join(dataset_dir, cfg.preprocess.mel_dir, uid + ".npy") |
| mel = np.load(mel_path).transpose(1, 0) |
| duration_path = os.path.join( |
| dataset_dir, cfg.preprocess.duration_dir, uid + ".npy" |
| ) |
| duration = np.load(duration_path) |
| if sum(duration) != mel.shape[0]: |
| duration_sum = sum(duration) |
| mel_len = mel.shape[0] |
| mismatch = abs(duration_sum - mel_len) |
| assert mismatch <= 5, "duration and mel length mismatch!" |
| cloned = np.array(duration, copy=True) |
| if duration_sum > mel_len: |
| for j in range(1, len(duration) - 1): |
| if mismatch == 0: |
| break |
| dur_val = cloned[-j] |
| if dur_val >= mismatch: |
| cloned[-j] -= mismatch |
| mismatch -= dur_val |
| break |
| else: |
| cloned[-j] = 0 |
| mismatch -= dur_val |
|
|
| elif duration_sum < mel_len: |
| cloned[-1] += mismatch |
| duration = cloned |
| utt2dur[utt] = duration |
| np.save(duration_path, duration) |
|
|
| return utt2dur |
|
|