| |
| """Username_Transformer |
| |
| Automatically generated by Colab. |
| |
| Original file is located at |
| https://colab.research.google.com/drive/1iae8ZzCuKYOPmMyTibAh7hVzwjbrW4Pe |
| """ |
|
|
| |
| |
| |
|
|
| |
| |
|
|
| import torch |
| import torch.nn as nn |
| from torch.utils.data import Dataset, DataLoader |
| import numpy as np |
| import nltk |
| import re |
| from collections import Counter |
| from tqdm import tqdm |
| import requests |
| from nltk.corpus import cmudict |
| import os |
| import pandas as pd |
|
|
| |
| import torch.backends.cudnn as cudnn |
| cudnn.benchmark = True |
|
|
| nltk.download('cmudict') |
|
|
| cmu_dict = cmudict.dict() |
|
|
| url = "https://raw.githubusercontent.com/danielmiessler/SecLists/master/Usernames/xato-net-10-million-usernames.txt" |
|
|
| try: |
| response = requests.get(url) |
| response.raise_for_status() |
|
|
| usernames = response.text.splitlines() |
| print(f"Downloaded {len(usernames)} usernames.") |
|
|
| except requests.exceptions.RequestException as e: |
| print(f"Error downloading usernames: {e}") |
| usernames = [] |
|
|
| def normalize_username(username): |
| |
| username = username.lower() |
| |
| num_to_word = { |
| '0': ' zero ', '1': ' one ', '2': ' two ', '3': ' three ', |
| '4': ' four ', '5': ' five ', '6': ' six ', '7': ' seven ', |
| '8': ' eight ', '9': ' nine ' |
| } |
| for num, word in num_to_word.items(): |
| username = username.replace(num, word) |
| |
| username = re.sub(r'[\W_]+', ' ', username) |
| |
| username = re.sub(r'\s+', ' ', username).strip() |
| return username |
|
|
| def get_phonemes(word): |
| phonemes_list = cmu_dict.get(word) |
| if phonemes_list: |
| return phonemes_list[0] |
| else: |
| return None |
|
|
| def username_to_phonemes(username): |
| normalized = normalize_username(username) |
| words = normalized.split() |
| phonemes = [] |
| for word in words: |
| phoneme = get_phonemes(word) |
| if phoneme: |
| phonemes.extend(phoneme) |
| |
| |
| return phonemes |
|
|
| input_sequences = [] |
| target_sequences = [] |
|
|
| for username in usernames: |
| input_seq = list(normalize_username(username)) |
| target_seq = username_to_phonemes(username) |
| if target_seq: |
| input_sequences.append(input_seq) |
| target_sequences.append(target_seq) |
|
|
| |
| char_counter = Counter([char for seq in input_sequences for char in seq]) |
| char_list = ['<pad>'] + sorted(char_counter.keys()) |
| char_vocab = {char: idx for idx, char in enumerate(char_list)} |
|
|
| |
| phoneme_counter = Counter([phoneme for seq in target_sequences for phoneme in seq]) |
| phoneme_list = ['<pad>', '<sos>', '<eos>'] + sorted(phoneme_counter.keys()) |
| phoneme_vocab = {phoneme: idx for idx, phoneme in enumerate(phoneme_list)} |
|
|
| def encode_sequence(seq, vocab, max_len, add_special_tokens=False): |
| encoded = [vocab.get(token, vocab['<pad>']) for token in seq] |
| if add_special_tokens: |
| encoded = [vocab['<sos>']] + encoded + [vocab['<eos>']] |
| |
| encoded = encoded[:max_len] + [vocab['<pad>']] * max(0, max_len - len(encoded)) |
| return encoded |
|
|
|
|
| max_input_len = max(len(seq) for seq in input_sequences) |
| max_target_len = max(len(seq) for seq in target_sequences) + 2 |
|
|
| encoded_inputs = [encode_sequence(seq, char_vocab, max_input_len) for seq in input_sequences] |
| encoded_targets = [encode_sequence(seq, phoneme_vocab, max_target_len, True) for seq in target_sequences] |
|
|
| class UsernameDataset(Dataset): |
| def __init__(self, inputs, targets): |
| self.inputs = torch.tensor(inputs, dtype=torch.long) |
| self.targets = torch.tensor(targets, dtype=torch.long) |
|
|
| def __len__(self): |
| return len(self.inputs) |
|
|
| def __getitem__(self, idx): |
| return self.inputs[idx], self.targets[idx] |
|
|
| dataset = UsernameDataset(encoded_inputs, encoded_targets) |
| data_loader = DataLoader(dataset, batch_size=512, shuffle=True) |
|
|
| |
| def decode_sequence(encoded_seq, vocab): |
| idx_to_token = {idx: token for token, idx in vocab.items()} |
| decoded_seq = [idx_to_token.get(idx, '<unk>') for idx in encoded_seq] |
| return decoded_seq |
|
|
| |
| usernames = [] |
| pronunciations = [] |
|
|
| |
| for input_seq, target_seq in dataset: |
| username = ''.join(decode_sequence(input_seq.tolist(), char_vocab)) |
| pronunciation = ' '.join(decode_sequence(target_seq.tolist(), phoneme_vocab)) |
| usernames.append(username) |
| pronunciations.append(pronunciation) |
|
|
| |
| df = pd.DataFrame({'username': usernames, 'pronunciation': pronunciations}) |
|
|
| |
| df.to_csv('username_pronunciation.csv', index=False) |
|
|
| class Encoder(nn.Module): |
| def __init__(self, input_dim, emb_dim, hid_dim): |
| super().__init__() |
| self.embedding = nn.Embedding(input_dim, emb_dim, padding_idx=char_vocab['<pad>']) |
| self.gru = nn.GRU(emb_dim, hid_dim, batch_first=True) |
|
|
| def forward(self, src): |
| embedded = self.embedding(src) |
| outputs, hidden = self.gru(embedded) |
| return outputs, hidden |
|
|
| class Attention(nn.Module): |
| def __init__(self, hid_dim): |
| super().__init__() |
| self.attn = nn.Linear(hid_dim * 2, hid_dim) |
| self.v = nn.Linear(hid_dim, 1, bias=False) |
|
|
| def forward(self, hidden, encoder_outputs): |
| src_len = encoder_outputs.shape[1] |
| hidden = hidden.repeat(1, src_len, 1) |
| energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2))) |
| attention = self.v(energy).squeeze(2) |
| return torch.softmax(attention, dim=1) |
|
|
| class Decoder(nn.Module): |
| def __init__(self, output_dim, emb_dim, hid_dim, attention): |
| super().__init__() |
| self.output_dim = output_dim |
| self.attention = attention |
| self.embedding = nn.Embedding(output_dim, emb_dim, padding_idx=phoneme_vocab['<pad>']) |
| self.gru = nn.GRU(emb_dim + hid_dim, hid_dim, batch_first=True) |
| self.fc_out = nn.Linear(hid_dim * 2, output_dim) |
|
|
| def forward(self, input, hidden, encoder_outputs): |
| input = input.unsqueeze(1) |
| embedded = self.embedding(input) |
| a = self.attention(hidden.permute(1, 0, 2), encoder_outputs) |
| a = a.unsqueeze(1) |
| weighted = torch.bmm(a, encoder_outputs) |
| rnn_input = torch.cat((embedded, weighted), dim=2) |
| output, hidden = self.gru(rnn_input, hidden) |
| output = torch.cat((output.squeeze(1), weighted.squeeze(1)), dim=1) |
| prediction = self.fc_out(output) |
| return prediction, hidden |
|
|
| class Seq2Seq(nn.Module): |
| def __init__(self, encoder, decoder, device): |
| super().__init__() |
| self.encoder = encoder |
| self.decoder = decoder |
| self.device = device |
|
|
| def forward(self, src, trg, teacher_forcing_ratio=0.5): |
| batch_size = src.shape[0] |
| trg_len = trg.shape[1] |
| trg_vocab_size = self.decoder.output_dim |
|
|
| outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device) |
| encoder_outputs, hidden = self.encoder(src) |
| input = trg[:, 0] |
|
|
| for t in range(1, trg_len): |
| output, hidden = self.decoder(input, hidden, encoder_outputs) |
| outputs[:, t] = output |
| top1 = output.argmax(1) |
| teacher_force = np.random.random() < teacher_forcing_ratio |
| input = trg[:, t] if teacher_force else top1 |
| return outputs |
|
|
| def get_latest_checkpoint(directory): |
| |
| files = os.listdir(directory) |
|
|
| |
| checkpoint_files = [f for f in files if re.match(r'g2p\d+\.pth', f)] |
|
|
| |
| checkpoint_numbers = [int(re.search(r'g2p(\d+)\.pth', f).group(1)) for f in checkpoint_files] |
| print(checkpoint_numbers) |
|
|
| |
| sorted_files = sorted(zip(checkpoint_numbers, checkpoint_files)) |
|
|
| |
| if sorted_files: |
| latest_file = sorted_files[-1][1] |
| latest_checkpoint_path = os.path.join(directory, latest_file) |
| return latest_checkpoint_path |
| else: |
| return None |
|
|
| def get_next_version(directory): |
| files = os.listdir(directory) |
|
|
| |
| checkpoint_files = [f for f in files if re.match(r'g2p\d+\.pth', f)] |
|
|
| |
| checkpoint_numbers = [int(re.search(r'g2p(\d+)\.pth', f).group(1)) for f in checkpoint_files] |
| print(checkpoint_numbers) |
|
|
| |
| sorted_files = sorted(zip(checkpoint_numbers, checkpoint_files)) |
| if sorted_files: |
| latest_version = sorted_files[-1][0] |
| print(f"Latest version: {sorted_files[-1]}") |
| return latest_version + 1 |
| else: |
| return 1 |
|
|
| def save_checkpoint(model, directory, version): |
| filename = f"g2p{version}.pth" |
| filepath = os.path.join(directory, filename) |
| torch.save(model.state_dict(), filepath) |
| print(f"Model saved to {filepath}") |
|
|
| |
| directory = '/content/drive/MyDrive/AI/username_g2p/' |
| latest_checkpoint_file = get_latest_checkpoint(directory) |
|
|
| if latest_checkpoint_file: |
| print(f"Latest checkpoint file: {latest_checkpoint_file}") |
| else: |
| print("No checkpoint files found.") |
|
|
| print(get_next_version(directory)) |
|
|
| INPUT_DIM = len(char_vocab) |
| OUTPUT_DIM = len(phoneme_vocab) |
| ENC_EMB_DIM = 64 |
| DEC_EMB_DIM = 64 |
| HID_DIM = 128 |
|
|
| attn = Attention(HID_DIM) |
| enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM) |
| dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, attn) |
|
|
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| model = Seq2Seq(enc, dec, device).to(device) |
| optimizer = torch.optim.Adam(model.parameters()) |
| criterion = nn.CrossEntropyLoss(ignore_index=phoneme_vocab['<pad>']) |
|
|
| |
| checkpoint_file = latest_checkpoint_file if latest_checkpoint_file else 'g2p1.pth' |
|
|
| |
| if os.path.exists(checkpoint_file): |
| |
| print(f"Loading checkpoint from {checkpoint_file}") |
| model.load_state_dict(torch.load(checkpoint_file)) |
| else: |
| print(f"Checkpoint file not found. Using default initialization.") |
|
|
| print(device) |
|
|
| |
| max_input_idx = max([max(seq) for seq in encoded_inputs]) |
| print(f'Max input index: {max_input_idx}, Input vocab size: {INPUT_DIM}') |
|
|
| |
| max_target_idx = max([max(seq) for seq in encoded_targets]) |
| print(f'Max target index: {max_target_idx}, Output vocab size: {OUTPUT_DIM}') |
|
|
| def train(model, loader, optimizer, criterion, clip): |
| model.train() |
| epoch_loss = 0 |
|
|
| for src, trg in tqdm(loader, desc="Training Batches"): |
| src, trg = src.to(device), trg.to(device) |
| optimizer.zero_grad() |
| output = model(src, trg) |
| output_dim = output.shape[-1] |
| output = output[:, 1:].reshape(-1, output_dim) |
| trg = trg[:, 1:].reshape(-1) |
| loss = criterion(output, trg) |
| loss.backward() |
| torch.nn.utils.clip_grad_norm_(model.parameters(), clip) |
| optimizer.step() |
| epoch_loss += loss.item() |
|
|
| return epoch_loss / len(loader) |
|
|
| N_EPOCHS = 1 |
| CLIP = 1 |
|
|
| for epoch in range(N_EPOCHS): |
| loss = train(model, data_loader, optimizer, criterion, CLIP) |
| print(f'Epoch: {epoch+1}, Loss: {loss:.4f}') |
|
|
| |
| next_version = get_next_version(directory) |
|
|
| |
| save_checkpoint(model, directory, next_version) |
|
|
| def predict(model, username): |
| model.eval() |
| with torch.no_grad(): |
| normalized = normalize_username(username) |
| input_seq = encode_sequence(list(normalized), char_vocab, max_input_len) |
| src = torch.tensor([input_seq], dtype=torch.long).to(device) |
| encoder_outputs, hidden = model.encoder(src) |
| input_token = torch.tensor([phoneme_vocab['<sos>']], dtype=torch.long).to(device) |
| outputs = [] |
|
|
| for _ in range(max_target_len): |
| output, hidden = model.decoder(input_token, hidden, encoder_outputs) |
| top1 = output.argmax(1) |
| if top1.item() == phoneme_vocab['<eos>']: |
| break |
| outputs.append(top1.item()) |
| input_token = top1 |
|
|
| idx_to_phoneme = {idx: phoneme for phoneme, idx in phoneme_vocab.items()} |
| predicted_phonemes = [idx_to_phoneme[idx] for idx in outputs] |
| return ' '.join(predicted_phonemes) |
|
|
| test_username = 'supercalafragalisticexpialadocous' |
| test_username = 'barnabassacket' |
| pronunciation = predict(model, test_username) |
| print(f'Username: {test_username}') |
| print(f'Pronunciation: {pronunciation}') |
|
|
| |
| CMU_IPA_MAPPING = { |
| "B": "b", |
| "CH": "ʧ", |
| "D": "d", |
| "DH": "ð", |
| "F": "f", |
| "G": "g", |
| "HH": "h", |
| "JH": "ʤ", |
| "K": "k", |
| "L": "l", |
| "M": "m", |
| "N": "n", |
| "NG": "ŋ", |
| "P": "p", |
| "R": "r", |
| "S": "s", |
| "SH": "ʃ", |
| "T": "t", |
| "TH": "θ", |
| "V": "v", |
| "W": "w", |
| "Y": "j", |
| "Z": "z", |
| "ZH": "ʒ", |
| "AA0": "ɑ", |
| "AA1": "ɑ", |
| "AA2": "ɑ", |
| "AE0": "æ", |
| "AE1": "æ", |
| "AE2": "æ", |
| "AH0": "ə", |
| "AH1": "ʌ", |
| "AH2": "ʌ", |
| "AO0": "ɔ", |
| "AO1": "ɔ", |
| "AO2": "ɔ", |
| "EH0": "ɛ", |
| "EH1": "ɛ", |
| "EH2": "ɛ", |
| "ER0": "ɚ", |
| "ER1": "ɝ", |
| "ER2": "ɝ", |
| "IH0": "ɪ", |
| "IH1": "ɪ", |
| "IH2": "ɪ", |
| "IY0": "i", |
| "IY1": "i", |
| "IY2": "i", |
| "UH0": "ʊ", |
| "UH1": "ʊ", |
| "UH2": "ʊ", |
| "UW0": "u", |
| "UW1": "u", |
| "UW2": "u", |
| "AW0": "aʊ", |
| "AW1": "aʊ", |
| "AW2": "aʊ", |
| "AY0": "aɪ", |
| "AY1": "aɪ", |
| "AY2": "aɪ", |
| "EY0": "eɪ", |
| "EY1": "eɪ", |
| "EY2": "eɪ", |
| "OW0": "oʊ", |
| "OW1": "oʊ", |
| "OW2": "oʊ", |
| "OY0": "ɔɪ", |
| "OY1": "ɔɪ", |
| "OY2": "ɔɪ" |
| } |
|
|
| pronunciation = predict(model, test_username) |
| ipa_sequence = ''.join([CMU_IPA_MAPPING.get(phoneme, phoneme) for phoneme in pronunciation.split()]) |
| print(f'Username: {test_username}') |
| print(f'Pronunciation: {ipa_sequence}') |
|
|
| ssml_template = """<phoneme alphabet="{alphabet}" ph="{phonetics}">{text}</phoneme>""" |
|
|
| class Alphabets: |
| IPA = "ipa" |
| CMU = "cmu-arpabet" |
|
|
| print(ssml_template.format(alphabet=Alphabets.IPA, phonetics="ˈæktʃuəli", text="actually")) |
|
|
| from google.colab import userdata |
| eleven_labs_key = userdata.get('ELEVENLABS') |
|
|
| from elevenlabs import save |
| from elevenlabs.client import ElevenLabs |
| from IPython.display import Audio, display |
|
|
| sound_file = 'test.mp3' |
|
|
| def build_eleven_labs_query(username: str): |
| client = ElevenLabs( |
| api_key=eleven_labs_key, |
| ) |
|
|
| audio = client.generate( |
| text=ssml_template.format( |
| alphabet=Alphabets.CMU, |
| phonetics=predict(model, username), |
| text=username |
| ), |
| voice="Rachel", |
| model="eleven_flash_v2" |
| ) |
| save(audio, sound_file) |
|
|
| build_eleven_labs_query(test_username) |
|
|
|
|
| display(Audio(sound_file, autoplay=True)) |
|
|
| |
|
|
| import torch |
|
|
| |
| |
|
|
| |
| for name, param in model.named_parameters(): |
| print(f"Parameter Name: {name}, Shape: {param.shape}") |
|
|
| |
| for param in model.parameters(): |
| print(f"Parameter Shape: {param.shape}") |
|
|
|
|
| print(f"Model Parameters: {sum(p.numel() for p in model.parameters())}") |
|
|
| |
|
|
| import matplotlib.pyplot as plt |
| import numpy as np |
|
|
| |
| |
|
|
| |
| parameter_shapes = [] |
| parameter_names = [] |
| for name, param in model.named_parameters(): |
| parameter_shapes.append(np.prod(param.shape)) |
| parameter_names.append(name) |
|
|
| |
| plt.figure(figsize=(10, 6)) |
| plt.bar(parameter_names, parameter_shapes) |
| plt.xlabel("Parameter Name") |
| plt.ylabel("Number of Weights") |
| plt.title("Distribution of Weights in the Model") |
| plt.xticks(rotation=90) |
| plt.tight_layout() |
| plt.show() |