usernameg2p / username_transformer.py
samclane's picture
Upload username_transformer.py
efd4cc5 verified
# -*- coding: utf-8 -*-
"""Username_Transformer
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1iae8ZzCuKYOPmMyTibAh7hVzwjbrW4Pe
"""
# Commented out IPython magic to ensure Python compatibility.
# Install PyTorch
# %pip install torch torchvision torchaudio
# Install other dependencies
# %pip install numpy pandas nltk elevenlabs requests
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
import nltk
import re
from collections import Counter
from tqdm import tqdm
import requests
from nltk.corpus import cmudict
import os
import pandas as pd
# allow cuDNN benchmark to pick fastest model
import torch.backends.cudnn as cudnn
cudnn.benchmark = True
nltk.download('cmudict')
cmu_dict = cmudict.dict()
url = "https://raw.githubusercontent.com/danielmiessler/SecLists/master/Usernames/xato-net-10-million-usernames.txt"
try:
response = requests.get(url)
response.raise_for_status() # Raise an exception for bad status codes
usernames = response.text.splitlines()
print(f"Downloaded {len(usernames)} usernames.")
except requests.exceptions.RequestException as e:
print(f"Error downloading usernames: {e}")
usernames = []
def normalize_username(username):
# Convert to lowercase
username = username.lower()
# Replace numbers with words
num_to_word = {
'0': ' zero ', '1': ' one ', '2': ' two ', '3': ' three ',
'4': ' four ', '5': ' five ', '6': ' six ', '7': ' seven ',
'8': ' eight ', '9': ' nine '
}
for num, word in num_to_word.items():
username = username.replace(num, word)
# Replace special characters with spaces
username = re.sub(r'[\W_]+', ' ', username)
# Remove extra spaces
username = re.sub(r'\s+', ' ', username).strip()
return username
def get_phonemes(word):
phonemes_list = cmu_dict.get(word)
if phonemes_list:
return phonemes_list[0] # Use the first pronunciation
else:
return None # Only show usernames that have correct phonemes
def username_to_phonemes(username):
normalized = normalize_username(username)
words = normalized.split()
phonemes = []
for word in words:
phoneme = get_phonemes(word)
if phoneme:
phonemes.extend(phoneme)
# else:
# print(f"Warning: Unable to find phonemes for word: {word}")
return phonemes
input_sequences = []
target_sequences = []
for username in usernames:
input_seq = list(normalize_username(username))
target_seq = username_to_phonemes(username)
if target_seq:
input_sequences.append(input_seq)
target_sequences.append(target_seq)
# Character Vocabulary
char_counter = Counter([char for seq in input_sequences for char in seq])
char_list = ['<pad>'] + sorted(char_counter.keys())
char_vocab = {char: idx for idx, char in enumerate(char_list)}
# Phoneme Vocabulary
phoneme_counter = Counter([phoneme for seq in target_sequences for phoneme in seq])
phoneme_list = ['<pad>', '<sos>', '<eos>'] + sorted(phoneme_counter.keys())
phoneme_vocab = {phoneme: idx for idx, phoneme in enumerate(phoneme_list)}
def encode_sequence(seq, vocab, max_len, add_special_tokens=False):
encoded = [vocab.get(token, vocab['<pad>']) for token in seq]
if add_special_tokens:
encoded = [vocab['<sos>']] + encoded + [vocab['<eos>']]
# Trim or pad the sequence to max_len
encoded = encoded[:max_len] + [vocab['<pad>']] * max(0, max_len - len(encoded))
return encoded
max_input_len = max(len(seq) for seq in input_sequences)
max_target_len = max(len(seq) for seq in target_sequences) + 2 # For <sos> and <eos>
encoded_inputs = [encode_sequence(seq, char_vocab, max_input_len) for seq in input_sequences]
encoded_targets = [encode_sequence(seq, phoneme_vocab, max_target_len, True) for seq in target_sequences]
class UsernameDataset(Dataset):
def __init__(self, inputs, targets):
self.inputs = torch.tensor(inputs, dtype=torch.long)
self.targets = torch.tensor(targets, dtype=torch.long)
def __len__(self):
return len(self.inputs)
def __getitem__(self, idx):
return self.inputs[idx], self.targets[idx]
dataset = UsernameDataset(encoded_inputs, encoded_targets)
data_loader = DataLoader(dataset, batch_size=512, shuffle=True)
# Function to decode sequences
def decode_sequence(encoded_seq, vocab):
idx_to_token = {idx: token for token, idx in vocab.items()}
decoded_seq = [idx_to_token.get(idx, '<unk>') for idx in encoded_seq]
return decoded_seq
# Create lists to store decoded usernames and pronunciations
usernames = []
pronunciations = []
# Iterate through the dataset and decode sequences
for input_seq, target_seq in dataset:
username = ''.join(decode_sequence(input_seq.tolist(), char_vocab))
pronunciation = ' '.join(decode_sequence(target_seq.tolist(), phoneme_vocab))
usernames.append(username)
pronunciations.append(pronunciation)
# Create a Pandas DataFrame
df = pd.DataFrame({'username': usernames, 'pronunciation': pronunciations})
# Export to CSV
df.to_csv('username_pronunciation.csv', index=False)
class Encoder(nn.Module):
def __init__(self, input_dim, emb_dim, hid_dim):
super().__init__()
self.embedding = nn.Embedding(input_dim, emb_dim, padding_idx=char_vocab['<pad>'])
self.gru = nn.GRU(emb_dim, hid_dim, batch_first=True)
def forward(self, src):
embedded = self.embedding(src)
outputs, hidden = self.gru(embedded)
return outputs, hidden
class Attention(nn.Module):
def __init__(self, hid_dim):
super().__init__()
self.attn = nn.Linear(hid_dim * 2, hid_dim)
self.v = nn.Linear(hid_dim, 1, bias=False)
def forward(self, hidden, encoder_outputs):
src_len = encoder_outputs.shape[1]
hidden = hidden.repeat(1, src_len, 1)
energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
attention = self.v(energy).squeeze(2)
return torch.softmax(attention, dim=1)
class Decoder(nn.Module):
def __init__(self, output_dim, emb_dim, hid_dim, attention):
super().__init__()
self.output_dim = output_dim
self.attention = attention
self.embedding = nn.Embedding(output_dim, emb_dim, padding_idx=phoneme_vocab['<pad>'])
self.gru = nn.GRU(emb_dim + hid_dim, hid_dim, batch_first=True)
self.fc_out = nn.Linear(hid_dim * 2, output_dim)
def forward(self, input, hidden, encoder_outputs):
input = input.unsqueeze(1)
embedded = self.embedding(input)
a = self.attention(hidden.permute(1, 0, 2), encoder_outputs)
a = a.unsqueeze(1)
weighted = torch.bmm(a, encoder_outputs)
rnn_input = torch.cat((embedded, weighted), dim=2)
output, hidden = self.gru(rnn_input, hidden)
output = torch.cat((output.squeeze(1), weighted.squeeze(1)), dim=1)
prediction = self.fc_out(output)
return prediction, hidden
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self, src, trg, teacher_forcing_ratio=0.5):
batch_size = src.shape[0]
trg_len = trg.shape[1]
trg_vocab_size = self.decoder.output_dim
outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
encoder_outputs, hidden = self.encoder(src)
input = trg[:, 0]
for t in range(1, trg_len):
output, hidden = self.decoder(input, hidden, encoder_outputs)
outputs[:, t] = output
top1 = output.argmax(1)
teacher_force = np.random.random() < teacher_forcing_ratio
input = trg[:, t] if teacher_force else top1
return outputs
def get_latest_checkpoint(directory):
# Get a list of all files in the directory
files = os.listdir(directory)
# Filter the list to only include g2p{n}.pth files
checkpoint_files = [f for f in files if re.match(r'g2p\d+\.pth', f)]
# Extract the numbers from the filenames
checkpoint_numbers = [int(re.search(r'g2p(\d+)\.pth', f).group(1)) for f in checkpoint_files]
print(checkpoint_numbers)
# Sort the files by their numbers
sorted_files = sorted(zip(checkpoint_numbers, checkpoint_files))
# Get the latest file (last element in the sorted list)
if sorted_files:
latest_file = sorted_files[-1][1]
latest_checkpoint_path = os.path.join(directory, latest_file)
return latest_checkpoint_path
else:
return None
def get_next_version(directory):
files = os.listdir(directory)
# Filter the list to only include g2p{n}.pth files
checkpoint_files = [f for f in files if re.match(r'g2p\d+\.pth', f)]
# Extract the numbers from the filenames
checkpoint_numbers = [int(re.search(r'g2p(\d+)\.pth', f).group(1)) for f in checkpoint_files]
print(checkpoint_numbers)
# Sort the files by their numbers
sorted_files = sorted(zip(checkpoint_numbers, checkpoint_files))
if sorted_files:
latest_version = sorted_files[-1][0]
print(f"Latest version: {sorted_files[-1]}")
return latest_version + 1
else:
return 1 # Start with version 1 if no checkpoints exist
def save_checkpoint(model, directory, version):
filename = f"g2p{version}.pth"
filepath = os.path.join(directory, filename)
torch.save(model.state_dict(), filepath)
print(f"Model saved to {filepath}")
# Get the latest checkpoint file path
directory = '/content/drive/MyDrive/AI/username_g2p/'
latest_checkpoint_file = get_latest_checkpoint(directory)
if latest_checkpoint_file:
print(f"Latest checkpoint file: {latest_checkpoint_file}")
else:
print("No checkpoint files found.")
print(get_next_version(directory))
INPUT_DIM = len(char_vocab)
OUTPUT_DIM = len(phoneme_vocab)
ENC_EMB_DIM = 64
DEC_EMB_DIM = 64
HID_DIM = 128
attn = Attention(HID_DIM)
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, attn)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Seq2Seq(enc, dec, device).to(device)
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=phoneme_vocab['<pad>'])
# Path to your checkpoint file
checkpoint_file = latest_checkpoint_file if latest_checkpoint_file else 'g2p1.pth'
# Check if the checkpoint file exists
if os.path.exists(checkpoint_file):
# Load the checkpoint
print(f"Loading checkpoint from {checkpoint_file}")
model.load_state_dict(torch.load(checkpoint_file))
else:
print(f"Checkpoint file not found. Using default initialization.")
print(device)
# Verify input sequences
max_input_idx = max([max(seq) for seq in encoded_inputs])
print(f'Max input index: {max_input_idx}, Input vocab size: {INPUT_DIM}')
# Verify target sequences
max_target_idx = max([max(seq) for seq in encoded_targets])
print(f'Max target index: {max_target_idx}, Output vocab size: {OUTPUT_DIM}')
def train(model, loader, optimizer, criterion, clip):
model.train()
epoch_loss = 0
for src, trg in tqdm(loader, desc="Training Batches"):
src, trg = src.to(device), trg.to(device)
optimizer.zero_grad()
output = model(src, trg)
output_dim = output.shape[-1]
output = output[:, 1:].reshape(-1, output_dim)
trg = trg[:, 1:].reshape(-1)
loss = criterion(output, trg)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(loader)
N_EPOCHS = 1
CLIP = 1
for epoch in range(N_EPOCHS):
loss = train(model, data_loader, optimizer, criterion, CLIP)
print(f'Epoch: {epoch+1}, Loss: {loss:.4f}')
# Get the next version number
next_version = get_next_version(directory)
# Save the model with the new version number
save_checkpoint(model, directory, next_version)
def predict(model, username):
model.eval()
with torch.no_grad():
normalized = normalize_username(username)
input_seq = encode_sequence(list(normalized), char_vocab, max_input_len)
src = torch.tensor([input_seq], dtype=torch.long).to(device)
encoder_outputs, hidden = model.encoder(src)
input_token = torch.tensor([phoneme_vocab['<sos>']], dtype=torch.long).to(device)
outputs = []
for _ in range(max_target_len):
output, hidden = model.decoder(input_token, hidden, encoder_outputs)
top1 = output.argmax(1)
if top1.item() == phoneme_vocab['<eos>']:
break
outputs.append(top1.item())
input_token = top1
idx_to_phoneme = {idx: phoneme for phoneme, idx in phoneme_vocab.items()}
predicted_phonemes = [idx_to_phoneme[idx] for idx in outputs]
return ' '.join(predicted_phonemes)
test_username = 'supercalafragalisticexpialadocous'
test_username = 'barnabassacket'
pronunciation = predict(model, test_username)
print(f'Username: {test_username}')
print(f'Pronunciation: {pronunciation}')
# from https://github.com/margonaut/CMU-to-IPA-Converter/blob/master/cmu_ipa_mapping.rb
CMU_IPA_MAPPING = {
"B": "b",
"CH": "ʧ",
"D": "d",
"DH": "ð",
"F": "f",
"G": "g",
"HH": "h",
"JH": "ʤ",
"K": "k",
"L": "l",
"M": "m",
"N": "n",
"NG": "ŋ",
"P": "p",
"R": "r",
"S": "s",
"SH": "ʃ",
"T": "t",
"TH": "θ",
"V": "v",
"W": "w",
"Y": "j",
"Z": "z",
"ZH": "ʒ",
"AA0": "ɑ",
"AA1": "ɑ",
"AA2": "ɑ",
"AE0": "æ",
"AE1": "æ",
"AE2": "æ",
"AH0": "ə",
"AH1": "ʌ",
"AH2": "ʌ",
"AO0": "ɔ",
"AO1": "ɔ",
"AO2": "ɔ",
"EH0": "ɛ",
"EH1": "ɛ",
"EH2": "ɛ",
"ER0": "ɚ",
"ER1": "ɝ",
"ER2": "ɝ",
"IH0": "ɪ",
"IH1": "ɪ",
"IH2": "ɪ",
"IY0": "i",
"IY1": "i",
"IY2": "i",
"UH0": "ʊ",
"UH1": "ʊ",
"UH2": "ʊ",
"UW0": "u",
"UW1": "u",
"UW2": "u",
"AW0": "aʊ",
"AW1": "aʊ",
"AW2": "aʊ",
"AY0": "aɪ",
"AY1": "aɪ",
"AY2": "aɪ",
"EY0": "eɪ",
"EY1": "eɪ",
"EY2": "eɪ",
"OW0": "oʊ",
"OW1": "oʊ",
"OW2": "oʊ",
"OY0": "ɔɪ",
"OY1": "ɔɪ",
"OY2": "ɔɪ"
}
pronunciation = predict(model, test_username)
ipa_sequence = ''.join([CMU_IPA_MAPPING.get(phoneme, phoneme) for phoneme in pronunciation.split()])
print(f'Username: {test_username}')
print(f'Pronunciation: {ipa_sequence}')
ssml_template = """<phoneme alphabet="{alphabet}" ph="{phonetics}">{text}</phoneme>"""
class Alphabets:
IPA = "ipa"
CMU = "cmu-arpabet"
print(ssml_template.format(alphabet=Alphabets.IPA, phonetics="ˈæktʃuəli", text="actually"))
from google.colab import userdata
eleven_labs_key = userdata.get('ELEVENLABS')
from elevenlabs import save
from elevenlabs.client import ElevenLabs
from IPython.display import Audio, display
sound_file = 'test.mp3'
def build_eleven_labs_query(username: str):
client = ElevenLabs(
api_key=eleven_labs_key,
)
audio = client.generate(
text=ssml_template.format(
alphabet=Alphabets.CMU,
phonetics=predict(model, username),
text=username
),
voice="Rachel",
model="eleven_flash_v2"
)
save(audio, sound_file)
build_eleven_labs_query(test_username)
display(Audio(sound_file, autoplay=True))
# prompt: get the parameters of a pytorch model
import torch
# Assuming 'model' is your Seq2Seq model instance
# Replace with your actual model if named differently
# Method 1: Using model.named_parameters()
for name, param in model.named_parameters():
print(f"Parameter Name: {name}, Shape: {param.shape}")
# Method 2: Using model.parameters() (without parameter names)
for param in model.parameters():
print(f"Parameter Shape: {param.shape}")
print(f"Model Parameters: {sum(p.numel() for p in model.parameters())}")
# prompt: visualize the weights
import matplotlib.pyplot as plt
import numpy as np
# Assuming 'model' is your Seq2Seq model instance
# Replace with your actual model if named differently
# Collect parameter shapes and names
parameter_shapes = []
parameter_names = []
for name, param in model.named_parameters():
parameter_shapes.append(np.prod(param.shape))
parameter_names.append(name)
# Create a bar chart
plt.figure(figsize=(10, 6))
plt.bar(parameter_names, parameter_shapes)
plt.xlabel("Parameter Name")
plt.ylabel("Number of Weights")
plt.title("Distribution of Weights in the Model")
plt.xticks(rotation=90) # Rotate x-axis labels for better readability
plt.tight_layout()
plt.show()