| |
| |
| |
| |
|
|
| import json |
| from tqdm import tqdm |
| import os |
| import torchaudio |
| from utils import audio |
| import csv |
| import random |
|
|
| from utils.util import has_existed |
| from text import _clean_text |
| import librosa |
| import soundfile as sf |
| from scipy.io import wavfile |
|
|
| from pathlib import Path |
| import numpy as np |
|
|
|
|
| def textgird_extract( |
| corpus_directory, |
| output_directory, |
| mfa_path=os.path.join("mfa", "montreal-forced-aligner", "bin", "mfa_align"), |
| lexicon=os.path.join("mfa", "lexicon", "librispeech-lexicon.txt"), |
| acoustic_model_path=os.path.join( |
| "mfa", "montreal-forced-aligner", "pretrained_models", "english.zip" |
| ), |
| jobs="8", |
| ): |
| assert os.path.exists( |
| corpus_directory |
| ), "Please check the directionary contains *.wav, *.lab" |
| assert ( |
| os.path.exists(mfa_path) |
| and os.path.exists(lexicon) |
| and os.path.exists(acoustic_model_path) |
| ), f"Please download the MFA tools to {mfa_path} firstly" |
| Path(output_directory).mkdir(parents=True, exist_ok=True) |
| print(f"MFA results are save in {output_directory}") |
| os.system( |
| f".{os.path.sep}{mfa_path} {corpus_directory} {lexicon} {acoustic_model_path} {output_directory} -j {jobs} --clean" |
| ) |
|
|
|
|
| def get_lines(file): |
| lines = [] |
| with open(file, encoding="utf-8") as f: |
| for line in tqdm(f): |
| lines.append(line.strip()) |
| return lines |
|
|
|
|
| def get_uid2utt(ljspeech_path, dataset, cfg): |
| index_count = 0 |
| total_duration = 0 |
|
|
| uid2utt = [] |
| for l in tqdm(dataset): |
| items = l.split("|") |
| uid = items[0] |
| text = items[2] |
|
|
| res = { |
| "Dataset": "LJSpeech", |
| "index": index_count, |
| "Singer": "LJSpeech", |
| "Uid": uid, |
| "Text": text, |
| } |
|
|
| |
| audio_file = os.path.join(ljspeech_path, "wavs/{}.wav".format(uid)) |
|
|
| res["Path"] = audio_file |
|
|
| waveform, sample_rate = torchaudio.load(audio_file) |
| duration = waveform.size(-1) / sample_rate |
| res["Duration"] = duration |
|
|
| uid2utt.append(res) |
|
|
| index_count = index_count + 1 |
| total_duration += duration |
|
|
| return uid2utt, total_duration / 3600 |
|
|
|
|
| def split_dataset(lines, test_rate=0.05, test_size=None): |
| if test_size == None: |
| test_size = int(len(lines) * test_rate) |
| random.shuffle(lines) |
|
|
| train_set = [] |
| test_set = [] |
|
|
| for line in lines[:test_size]: |
| test_set.append(line) |
| for line in lines[test_size:]: |
| train_set.append(line) |
| return train_set, test_set |
|
|
|
|
| max_wav_value = 32768.0 |
|
|
|
|
| def prepare_align(dataset, dataset_path, cfg, output_path): |
| in_dir = dataset_path |
| out_dir = os.path.join(output_path, dataset, cfg.raw_data) |
| sampling_rate = cfg.sample_rate |
| cleaners = cfg.text_cleaners |
| speaker = "LJSpeech" |
| with open(os.path.join(dataset_path, "metadata.csv"), encoding="utf-8") as f: |
| for line in tqdm(f): |
| parts = line.strip().split("|") |
| base_name = parts[0] |
| text = parts[2] |
| text = _clean_text(text, cleaners) |
|
|
| output_wav_path = os.path.join(out_dir, speaker, "{}.wav".format(base_name)) |
| output_lab_path = os.path.join(out_dir, speaker, "{}.lab".format(base_name)) |
|
|
| if os.path.exists(output_wav_path) and os.path.exists(output_lab_path): |
| continue |
|
|
| wav_path = os.path.join(in_dir, "wavs", "{}.wav".format(base_name)) |
| if os.path.exists(wav_path): |
| os.makedirs(os.path.join(out_dir, speaker), exist_ok=True) |
| wav, _ = librosa.load(wav_path, sampling_rate) |
| wav = wav / max(abs(wav)) * max_wav_value |
|
|
| wavfile.write( |
| os.path.join(out_dir, speaker, "{}.wav".format(base_name)), |
| sampling_rate, |
| wav.astype(np.int16), |
| ) |
|
|
| with open( |
| os.path.join(out_dir, speaker, "{}.lab".format(base_name)), |
| "w", |
| ) as f1: |
| f1.write(text) |
| |
| textgird_extract( |
| corpus_directory=out_dir, |
| output_directory=os.path.join(output_path, dataset, "TextGrid"), |
| ) |
|
|
|
|
| def main(output_path, dataset_path, cfg): |
| print("-" * 10) |
| print("Dataset splits for {}...\n".format("LJSpeech")) |
|
|
| dataset = "LJSpeech" |
|
|
| save_dir = os.path.join(output_path, dataset) |
| os.makedirs(save_dir, exist_ok=True) |
| ljspeech_path = dataset_path |
|
|
| train_output_file = os.path.join(save_dir, "train.json") |
| test_output_file = os.path.join(save_dir, "test.json") |
| singer_dict_file = os.path.join(save_dir, "singers.json") |
|
|
| speaker = "LJSpeech" |
| speakers = [dataset + "_" + speaker] |
| singer_lut = {name: i for i, name in enumerate(sorted(speakers))} |
| with open(singer_dict_file, "w") as f: |
| json.dump(singer_lut, f, indent=4, ensure_ascii=False) |
|
|
| if has_existed(train_output_file) and has_existed(test_output_file): |
| return |
|
|
| meta_file = os.path.join(ljspeech_path, "metadata.csv") |
| lines = get_lines(meta_file) |
|
|
| train_set, test_set = split_dataset(lines) |
|
|
| res, hours = get_uid2utt(ljspeech_path, train_set, cfg) |
|
|
| |
| os.makedirs(save_dir, exist_ok=True) |
| with open(train_output_file, "w") as f: |
| json.dump(res, f, indent=4, ensure_ascii=False) |
|
|
| print("Train_hours= {}".format(hours)) |
|
|
| res, hours = get_uid2utt(ljspeech_path, test_set, cfg) |
|
|
| |
| os.makedirs(save_dir, exist_ok=True) |
| with open(test_output_file, "w") as f: |
| json.dump(res, f, indent=4, ensure_ascii=False) |
|
|
| print("Test_hours= {}".format(hours)) |
|
|