|
|
| import time |
| from datasets import load_dataset |
| import numpy as np |
| import transformers |
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| from torch.utils.data import DataLoader, TensorDataset |
| |
| class custom_RNNCell(nn.Module): |
| def __init__(self, input_size: int, hidden_size: int, device='cpu'): |
| |
| |
| |
| |
| |
| |
| super(custom_RNNCell, self).__init__() |
| self.hidden_size = hidden_size |
| self.device = device |
|
|
| |
| fan_in_Wx = input_size |
| fan_out_Wx = hidden_size |
| limit_Wx = np.sqrt(6 / (fan_in_Wx + fan_out_Wx)) |
|
|
| fan_in_Wh = hidden_size |
| fan_out_Wh = hidden_size |
| limit_Wh = np.sqrt(6 / (fan_in_Wh + fan_out_Wh)) |
|
|
| |
| self.Wx = nn.Parameter(torch.empty(input_size, hidden_size, device=device)) |
| self.Wh = nn.Parameter(torch.empty(hidden_size, hidden_size, device=device)) |
| self.bh = nn.Parameter(torch.zeros(hidden_size, device=device)) |
|
|
| |
| nn.init.uniform_(self.Wx, -limit_Wx, limit_Wx) |
| nn.init.uniform_(self.Wh, -limit_Wh, limit_Wh) |
|
|
| def forward(self, input_t: torch.Tensor, h_prev: torch.Tensor) -> torch.Tensor: |
| |
| |
| |
| |
| |
| |
| h_t = torch.tanh(torch.mm(input_t, self.Wx) + torch.mm(h_prev, self.Wh) + self.bh) |
| return h_t |
|
|
| class custom_GRUCell(nn.Module): |
| def __init__(self, input_size: int, hidden_size: int, device='cpu'): |
| |
| |
| |
| |
| |
| |
| super(custom_GRUCell, self).__init__() |
| self.hidden_size = hidden_size |
| self.device = device |
|
|
| |
| fan_in = input_size + hidden_size |
| fan_out = hidden_size |
| limit = (6 / (fan_in + fan_out)) ** 0.5 |
|
|
| |
| self.Wz = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
| self.Wr = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
| self.Wh = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
|
|
| |
| nn.init.uniform_(self.Wz, -limit, limit) |
| nn.init.uniform_(self.Wr, -limit, limit) |
| nn.init.uniform_(self.Wh, -limit, limit) |
|
|
| |
| self.bz = nn.Parameter(torch.zeros(hidden_size, device=device)) |
| self.br = nn.Parameter(torch.zeros(hidden_size, device=device)) |
| self.bh = nn.Parameter(torch.zeros(hidden_size, device=device)) |
|
|
| def forward(self, input_t: torch.Tensor, h_prev: torch.Tensor) -> torch.Tensor: |
| |
| |
| |
| |
| |
| |
| |
| concat = torch.cat((input_t, h_prev), dim=1) |
|
|
| |
| z_t = torch.sigmoid(torch.matmul(concat, self.Wz) + self.bz) |
|
|
| |
| r_t = torch.sigmoid(torch.matmul(concat, self.Wr) + self.br) |
|
|
| |
| concat_reset = torch.cat((input_t, r_t * h_prev), dim=1) |
| h_hat_t = torch.tanh(torch.matmul(concat_reset, self.Wh) + self.bh) |
|
|
| |
| h_t = (1 - z_t) * h_prev + z_t * h_hat_t |
|
|
| return h_t |
|
|
| class custom_LSTMCell(nn.Module): |
| def __init__(self, input_size: int, hidden_size: int, device='cpu'): |
| super(custom_LSTMCell, self).__init__() |
| self.hidden_size = hidden_size |
| self.device = device |
|
|
| |
| self.Wf = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
| self.Wi = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
| self.Wc = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
| self.Wo = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
|
|
| |
| nn.init.xavier_uniform_(self.Wf) |
| nn.init.xavier_uniform_(self.Wi) |
| nn.init.xavier_uniform_(self.Wc) |
| nn.init.xavier_uniform_(self.Wo) |
|
|
| |
| self.bf = nn.Parameter(torch.zeros(hidden_size, device=device)) |
| self.bi = nn.Parameter(torch.zeros(hidden_size, device=device)) |
| self.bc = nn.Parameter(torch.zeros(hidden_size, device=device)) |
| self.bo = nn.Parameter(torch.zeros(hidden_size, device=device)) |
|
|
| |
| nn.init.constant_(self.bf, 1.0) |
|
|
| def forward(self, x_t: torch.Tensor, h_prev: torch.Tensor, c_prev: torch.Tensor) -> tuple: |
| |
| |
| |
| |
| |
| |
| |
| |
| concat = torch.cat((x_t, h_prev), dim=1) |
|
|
| |
| f_t = torch.sigmoid(torch.matmul(concat, self.Wf) + self.bf) |
|
|
| |
| i_t = torch.sigmoid(torch.matmul(concat, self.Wi) + self.bi) |
|
|
| |
| c_hat_t = torch.tanh(torch.matmul(concat, self.Wc) + self.bc) |
|
|
| |
| c_t = f_t * c_prev + i_t * c_hat_t |
|
|
| |
| o_t = torch.sigmoid(torch.matmul(concat, self.Wo) + self.bo) |
|
|
| |
| h_t = o_t * torch.tanh(c_t) |
|
|
| |
| return h_t, c_t |
|
|
| class RecurrentLayer(nn.Module): |
| def __init__(self, input_size: int, hidden_size: int, cell_type: str = 'RNN', device='cpu'): |
| super(RecurrentLayer, self).__init__() |
| self.hidden_size = hidden_size |
| self.device = device |
| self.cell_type = cell_type |
|
|
| |
| if cell_type == 'RNN': |
| self.cell = nn.RNN(input_size, hidden_size, batch_first=True, bidirectional=True) |
| elif cell_type == 'custom_RNN': |
| self.cell = custom_RNNCell(input_size, hidden_size) |
| elif cell_type == 'GRU': |
| self.cell = nn.GRU(input_size, hidden_size, batch_first=True, bidirectional=True) |
| elif cell_type == 'custom_GRU': |
| self.cell = custom_GRUCell(input_size, hidden_size, device) |
| elif cell_type == 'LSTM': |
| self.cell = nn.LSTMCell(input_size, hidden_size) |
| elif cell_type == 'custom_LSTM': |
| self.cell = custom_LSTMCell(input_size, hidden_size, device) |
| else: |
| raise ValueError("Unsupported cell type") |
|
|
| def forward(self, inputs: torch.Tensor) -> tuple: |
| |
| |
| |
| |
| batch_size, seq_len, _ = inputs.shape |
| |
| |
| h_forward = torch.zeros(batch_size, self.hidden_size, device=self.device) |
| h_backward = torch.zeros(batch_size, self.hidden_size, device=self.device) |
| |
| if self.cell_type == 'custom_LSTM': |
| c_forward = torch.zeros(batch_size, self.hidden_size, device=self.device) |
| c_backward = torch.zeros(batch_size, self.hidden_size, device=self.device) |
| |
| |
| forward_outputs = [] |
| backward_outputs = [] |
| |
| |
| h = h_forward |
| c = c_forward if self.cell_type == 'custom_LSTM' else None |
| for t in range(seq_len): |
| if self.cell_type == 'custom_LSTM': |
| h, c = self.cell(inputs[:, t], h, c) |
| else: |
| h = self.cell(inputs[:, t], h) |
| forward_outputs.append(h) |
| |
| |
| h = h_backward |
| c = c_backward if self.cell_type == 'custom_LSTM' else None |
| for t in range(seq_len - 1, -1, -1): |
| if self.cell_type == 'custom_LSTM': |
| h, c = self.cell(inputs[:, t], h, c) |
| else: |
| h = self.cell(inputs[:, t], h) |
| backward_outputs.insert(0, h) |
| |
| |
| forward_output = torch.stack(forward_outputs, dim=1) |
| backward_output = torch.stack(backward_outputs, dim=1) |
| output = torch.cat((forward_output, backward_output), dim=2) |
| |
| |
| final_hidden = torch.stack([forward_outputs[-1], backward_outputs[-1]], dim=0) |
| |
| return output, final_hidden |
| |
| class Attention(nn.Module): |
| def __init__(self, hidden_size): |
| super(Attention, self).__init__() |
| self.W1 = nn.Linear(hidden_size, hidden_size) |
| self.W2 = nn.Linear(hidden_size, hidden_size) |
| self.v = nn.Linear(hidden_size, 1, bias=False) |
|
|
| def forward(self, hidden, encoder_outputs): |
| |
| |
| sequence_len = encoder_outputs.shape[1] |
| hidden = hidden.unsqueeze(1).repeat(1, sequence_len, 1) |
|
|
| energy = torch.tanh(self.W1(encoder_outputs) + self.W2(hidden)) |
| attention = self.v(energy).squeeze(2) |
| attention_weights = torch.softmax(attention, dim=1) |
|
|
| |
| context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1) |
| return context, attention_weights |
| |
| class SimpleRecurrentNetworkWithAttention(nn.Module): |
| def __init__(self, input_size, hidden_size, output_size, cell_type='RNN'): |
| super(SimpleRecurrentNetworkWithAttention, self).__init__() |
|
|
| self.embedding = nn.Embedding(input_size, hidden_size) |
| self.attention = Attention(hidden_size * 2) |
| self.cell_type = cell_type |
|
|
| if cell_type == 'RNN': |
| self.cell = nn.RNN(hidden_size, hidden_size, batch_first=True, bidirectional=True) |
| elif cell_type == 'custom_RNN': |
| self.cell = RecurrentLayer(hidden_size, hidden_size, cell_type="custom_RNN") |
| elif cell_type == 'GRU': |
| self.cell = nn.GRU(hidden_size, hidden_size, batch_first=True, bidirectional=True) |
| elif cell_type == 'custom_GRU': |
| self.cell = RecurrentLayer(hidden_size, hidden_size, cell_type="custom_GRU") |
| elif cell_type == 'LSTM': |
| self.cell = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True) |
| elif cell_type == 'custom_LSTM': |
| self.cell = RecurrentLayer(hidden_size, hidden_size, cell_type="custom_LSTM") |
| else: |
| raise ValueError("Unsupported cell type. Choose from 'RNN', 'custom_RNN', 'GRU', 'custom_GRU', 'LSTM' or 'custom_LSTM'.") |
|
|
| self.fc = nn.Linear(hidden_size * 2, output_size) |
|
|
| def forward(self, inputs): |
| embedded = self.embedding(inputs) |
| rnn_output, hidden = self.cell(embedded) |
|
|
| if isinstance(hidden, tuple): |
| hidden = hidden[0] |
|
|
| |
| hidden = torch.cat((hidden[-2], hidden[-1]), dim=1) |
|
|
| |
| context, attention_weights = self.attention(hidden, rnn_output) |
|
|
| |
| output = self.fc(context) |
|
|
| return output, attention_weights |
|
|