| from transformers import PreTrainedModel
|
|
|
| import math
|
| import pandas as pd
|
| import torch
|
| from torch import nn
|
| from d2l import torch as d2l
|
| from transformers import PretrainedConfig
|
|
|
|
|
|
|
| class PositionWiseFFN(nn.Module):
|
| """基于位置的前馈网络"""
|
| def __init__(self, ffn_num_input, ffn_num_hiddens, ffn_num_outputs,
|
| **kwargs):
|
| super(PositionWiseFFN, self).__init__(**kwargs)
|
| self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens)
|
| self.relu = nn.ReLU()
|
| self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs)
|
|
|
| def forward(self, X):
|
| return self.dense2(self.relu(self.dense1(X)))
|
|
|
| def transpose_qkv(X, num_heads):
|
| """为了多注意力头的并行计算而变换形状
|
|
|
| Defined in :numref:`sec_multihead-attention`"""
|
|
|
|
|
|
|
| X = X.reshape(X.shape[0], X.shape[1], num_heads, -1)
|
|
|
|
|
|
|
| X = X.permute(0, 2, 1, 3)
|
|
|
|
|
|
|
| return X.reshape(-1, X.shape[2], X.shape[3])
|
|
|
| def transpose_output(X, num_heads):
|
| """逆转transpose_qkv函数的操作
|
|
|
| Defined in :numref:`sec_multihead-attention`"""
|
| X = X.reshape(-1, num_heads, X.shape[1], X.shape[2])
|
| X = X.permute(0, 2, 1, 3)
|
| return X.reshape(X.shape[0], X.shape[1], -1)
|
|
|
| def sequence_mask(X, valid_len, value=0):
|
| """在序列中屏蔽不相关的项
|
|
|
| Defined in :numref:`sec_seq2seq_decoder`"""
|
| maxlen = X.size(1)
|
| mask = torch.arange((maxlen), dtype=torch.float32,
|
| device=X.device)[None, :] < valid_len[:, None]
|
| X[~mask] = value
|
| return X
|
|
|
| def masked_softmax(X, valid_lens):
|
| """通过在最后一个轴上掩蔽元素来执行softmax操作
|
|
|
| Defined in :numref:`sec_attention-scoring-functions`"""
|
|
|
| if valid_lens is None:
|
| return nn.functional.softmax(X, dim=-1)
|
| else:
|
| shape = X.shape
|
| if valid_lens.dim() == 1:
|
| valid_lens = torch.repeat_interleave(valid_lens, shape[1])
|
| else:
|
| valid_lens = valid_lens.reshape(-1)
|
|
|
| X = sequence_mask(X.reshape(-1, shape[-1]), valid_lens,
|
| value=-1e4)
|
| return nn.functional.softmax(X.reshape(shape), dim=-1)
|
|
|
| class DotProductAttention(nn.Module):
|
| """缩放点积注意力
|
|
|
| Defined in :numref:`subsec_additive-attention`"""
|
| def __init__(self, dropout, **kwargs):
|
| super(DotProductAttention, self).__init__(**kwargs)
|
| self.dropout = nn.Dropout(dropout)
|
|
|
|
|
|
|
|
|
|
|
| def forward(self, queries, keys, values, valid_lens=None):
|
| d = queries.shape[-1]
|
|
|
| scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d)
|
| self.attention_weights = masked_softmax(scores, valid_lens)
|
| return torch.bmm(self.dropout(self.attention_weights), values)
|
|
|
| class MultiHeadAttention(nn.Module):
|
| """多头注意力
|
|
|
| Defined in :numref:`sec_multihead-attention`"""
|
| def __init__(self, key_size, query_size, value_size, num_hiddens,
|
| num_heads, dropout, bias=False, **kwargs):
|
| super(MultiHeadAttention, self).__init__(**kwargs)
|
| self.num_heads = num_heads
|
| self.attention = DotProductAttention(dropout)
|
| self.W_q = nn.Linear(query_size, num_hiddens, bias=bias)
|
| self.W_k = nn.Linear(key_size, num_hiddens, bias=bias)
|
| self.W_v = nn.Linear(value_size, num_hiddens, bias=bias)
|
| self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias)
|
|
|
| def forward(self, queries, keys, values, valid_lens):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| queries = transpose_qkv(self.W_q(queries), self.num_heads)
|
| keys = transpose_qkv(self.W_k(keys), self.num_heads)
|
| values = transpose_qkv(self.W_v(values), self.num_heads)
|
|
|
| if valid_lens is not None:
|
|
|
|
|
| valid_lens = torch.repeat_interleave(
|
| valid_lens, repeats=self.num_heads, dim=0)
|
|
|
|
|
|
|
| output = self.attention(queries, keys, values, valid_lens)
|
|
|
|
|
| output_concat = transpose_output(output, self.num_heads)
|
| return self.W_o(output_concat)
|
|
|
|
|
|
|
|
|
| class AddNorm(nn.Module):
|
| """残差连接后进行层规范化"""
|
| def __init__(self, normalized_shape, dropout, **kwargs):
|
| super(AddNorm, self).__init__(**kwargs)
|
| self.dropout = nn.Dropout(dropout)
|
| self.ln = nn.LayerNorm(normalized_shape)
|
|
|
| def forward(self, X, Y):
|
| return self.ln(self.dropout(Y) + X)
|
|
|
|
|
|
|
| class EncoderBlock(nn.Module):
|
| """Transformer编码器块"""
|
| def __init__(self, key_size, query_size, value_size, num_hiddens,
|
| norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
|
| dropout, use_bias=False, **kwargs):
|
| super(EncoderBlock, self).__init__(**kwargs)
|
| self.attention = MultiHeadAttention(
|
| key_size, query_size, value_size, num_hiddens, num_heads, dropout,
|
| use_bias)
|
| self.addnorm1 = AddNorm(norm_shape, dropout)
|
| self.ffn = PositionWiseFFN(
|
| ffn_num_input, ffn_num_hiddens, num_hiddens)
|
| self.addnorm2 = AddNorm(norm_shape, dropout)
|
|
|
| def forward(self, X, valid_lens):
|
| Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
|
| return self.addnorm2(Y, self.ffn(Y))
|
|
|
| class PositionalEncoding(nn.Module):
|
| """位置编码
|
|
|
| Defined in :numref:`sec_self-attention-and-positional-encoding`"""
|
| def __init__(self, num_hiddens, dropout, max_len=1000):
|
| super(PositionalEncoding, self).__init__()
|
| self.dropout = nn.Dropout(dropout)
|
|
|
| self.P = torch.zeros((1, max_len, num_hiddens))
|
| X = torch.arange(max_len, dtype=torch.float32).reshape(
|
| -1, 1) / torch.pow(10000, torch.arange(
|
| 0, num_hiddens, 2, dtype=torch.float32) / num_hiddens)
|
| self.P[:, :, 0::2] = torch.sin(X)
|
| self.P[:, :, 1::2] = torch.cos(X)
|
|
|
| def forward(self, X):
|
| X = X + self.P[:, :X.shape[1], :].to(X.device)
|
| return self.dropout(X)
|
|
|
| class Encoder(nn.Module):
|
| """编码器-解码器架构的基本编码器接口"""
|
| def __init__(self, **kwargs):
|
| super(Encoder, self).__init__(**kwargs)
|
|
|
| def forward(self, X, *args):
|
| raise NotImplementedError
|
|
|
|
|
|
|
|
|
| class DecoderBlock(nn.Module):
|
| """解码器中第i个块"""
|
| def __init__(self, key_size, query_size, value_size, num_hiddens,
|
| norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
|
| dropout, i, **kwargs):
|
| super(DecoderBlock, self).__init__(**kwargs)
|
| self.i = i
|
| self.attention1 = MultiHeadAttention(
|
| key_size, query_size, value_size, num_hiddens, num_heads, dropout)
|
| self.addnorm1 = AddNorm(norm_shape, dropout)
|
| self.attention2 = MultiHeadAttention(
|
| key_size, query_size, value_size, num_hiddens, num_heads, dropout)
|
| self.addnorm2 = AddNorm(norm_shape, dropout)
|
| self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens,
|
| num_hiddens)
|
| self.addnorm3 = AddNorm(norm_shape, dropout)
|
|
|
| def forward(self, X, state):
|
| enc_outputs, enc_valid_lens = state[0], state[1]
|
|
|
|
|
|
|
|
|
| if state[2][self.i] is None:
|
| key_values = X
|
| else:
|
| key_values = torch.cat((state[2][self.i], X), axis=1)
|
| state[2][self.i] = key_values
|
| if self.training:
|
| batch_size, num_steps, _ = X.shape
|
|
|
|
|
| dec_valid_lens = torch.arange(
|
| 1, num_steps + 1, device=X.device).repeat(batch_size, 1)
|
| else:
|
| dec_valid_lens = None
|
|
|
|
|
| X2 = self.attention1(X, key_values, key_values, dec_valid_lens)
|
| Y = self.addnorm1(X, X2)
|
|
|
|
|
| Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens)
|
| Z = self.addnorm2(Y, Y2)
|
| return self.addnorm3(Z, self.ffn(Z)), state
|
|
|
| class Decoder(nn.Module):
|
| """编码器-解码器架构的基本解码器接口
|
|
|
| Defined in :numref:`sec_encoder-decoder`"""
|
| def __init__(self, **kwargs):
|
| super(Decoder, self).__init__(**kwargs)
|
|
|
| def init_state(self, enc_outputs, *args):
|
| raise NotImplementedError
|
|
|
| def forward(self, X, state):
|
| raise NotImplementedError
|
|
|
| class AttentionDecoder(Decoder):
|
| """带有注意力机制解码器的基本接口
|
|
|
| Defined in :numref:`sec_seq2seq_attention`"""
|
| def __init__(self, **kwargs):
|
| super(AttentionDecoder, self).__init__(**kwargs)
|
|
|
| @property
|
| def attention_weights(self):
|
| raise NotImplementedError
|
|
|
|
|
|
|
| class TransformerEncoder(Encoder):
|
| """Transformer编码器"""
|
| def __init__(self, vocab_size, key_size, query_size, value_size,
|
| num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
|
| num_heads, num_layers, dropout, use_bias=False, **kwargs):
|
| super(TransformerEncoder, self).__init__(**kwargs)
|
| self.num_hiddens = num_hiddens
|
| self.embedding = nn.Embedding(vocab_size, num_hiddens)
|
| self.pos_encoding = PositionalEncoding(num_hiddens, dropout)
|
| self.blks = nn.Sequential()
|
| for i in range(num_layers):
|
| self.blks.add_module("block"+str(i),
|
| EncoderBlock(key_size, query_size, value_size, num_hiddens,
|
| norm_shape, ffn_num_input, ffn_num_hiddens,
|
| num_heads, dropout, use_bias))
|
|
|
| def forward(self, X, valid_lens, *args):
|
|
|
|
|
|
|
| X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
|
| self.attention_weights = [None] * len(self.blks)
|
| for i, blk in enumerate(self.blks):
|
| X = blk(X, valid_lens)
|
| self.attention_weights[
|
| i] = blk.attention.attention.attention_weights
|
| return X
|
|
|
|
|
| class TransformerDecoder(AttentionDecoder):
|
| def __init__(self, vocab_size, key_size, query_size, value_size,
|
| num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
|
| num_heads, num_layers, dropout, **kwargs):
|
| super(TransformerDecoder, self).__init__(**kwargs)
|
| self.num_hiddens = num_hiddens
|
| self.num_layers = num_layers
|
| self.embedding = nn.Embedding(vocab_size, num_hiddens)
|
| self.pos_encoding = PositionalEncoding(num_hiddens, dropout)
|
| self.blks = nn.Sequential()
|
| for i in range(num_layers):
|
| self.blks.add_module("block"+str(i),
|
| DecoderBlock(key_size, query_size, value_size, num_hiddens,
|
| norm_shape, ffn_num_input, ffn_num_hiddens,
|
| num_heads, dropout, i))
|
| self.dense = nn.Linear(num_hiddens, vocab_size)
|
|
|
| def init_state(self, enc_outputs, enc_valid_lens, *args):
|
| return [enc_outputs, enc_valid_lens, [None] * self.num_layers]
|
|
|
| def forward(self, X, state):
|
| X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
|
| self._attention_weights = [[None] * len(self.blks) for _ in range (2)]
|
| for i, blk in enumerate(self.blks):
|
| X, state = blk(X, state)
|
|
|
| self._attention_weights[0][
|
| i] = blk.attention1.attention.attention_weights
|
|
|
| self._attention_weights[1][
|
| i] = blk.attention2.attention.attention_weights
|
| return self.dense(X), state
|
|
|
| @property
|
| def attention_weights(self):
|
| return self._attention_weights
|
|
|
| class transformerConfig(PretrainedConfig):
|
| model_type = "custom_transformer"
|
|
|
| def __init__(
|
| self,
|
| src_vocab_len : int =184,
|
| tgt_vocab : int =201,
|
| num_hiddens : int =32,
|
| num_layers : int =2,
|
| dropout : int =0.1,
|
| batch_size : int =64,
|
| num_steps : int =10,
|
| lr : int =0.005,
|
| num_epochs : int =200,
|
|
|
| ffn_num_input : int =32,
|
| ffn_num_hiddens : int =64,
|
| num_heads : int =4,
|
| key_size : int =32,
|
| query_size : int =32,
|
| value_size : int =32,
|
| norm_shape : int =[32],
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| **kwargs,
|
| ):
|
|
|
|
|
|
|
|
|
| self.src_vocab_len = src_vocab_len
|
| self.tgt_vocab = tgt_vocab
|
| self.num_hiddens = num_hiddens
|
| self.num_layers = num_layers
|
| self.dropout = dropout
|
| self.batch_size = batch_size
|
| self.num_steps = num_steps
|
| self.lr = lr
|
| self.num_epochs = num_epochs
|
| self.ffn_num_input = ffn_num_input
|
| self.ffn_num_hiddens = ffn_num_hiddens
|
| self.num_heads = num_heads
|
| self.key_size = key_size
|
| self.query_size = query_size
|
| self.value_size = value_size
|
| self.norm_shape = norm_shape
|
|
|
| super().__init__(**kwargs)
|
|
|
| class transformerModel(PreTrainedModel):
|
|
|
| config_class = transformerConfig
|
|
|
| def __init__(self, config):
|
| super().__init__(config)
|
| self.encoder = TransformerEncoder(
|
| config.src_vocab_len, config.key_size, config.query_size, config.value_size, config.num_hiddens,
|
| config.norm_shape, config.ffn_num_input, config.ffn_num_hiddens, config.num_heads,
|
| config.num_layers, config.dropout)
|
|
|
| self.decoder = TransformerDecoder(
|
| config.tgt_vocab, config.key_size, config.query_size, config.value_size, config.num_hiddens,
|
| config.norm_shape, config.ffn_num_input, config.ffn_num_hiddens, config.num_heads,
|
| config.num_layers, config.dropout)
|
|
|
| def forward(self, enc_X, dec_X, *args):
|
| enc_outputs = self.encoder(enc_X, *args)
|
| dec_state = self.decoder.init_state(enc_outputs, *args)
|
| return self.decoder(dec_X, dec_state)
|
|
|
| def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
|
| device, save_attention_weights=False):
|
| """序列到序列模型的预测
|
|
|
| Defined in :numref:`sec_seq2seq_training`"""
|
|
|
| net.eval()
|
| src_tokens = src_vocab[src_sentence.lower().split(' ')] + [
|
| src_vocab['<eos>']]
|
| enc_valid_len = torch.tensor([len(src_tokens)], device=device)
|
| src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
|
|
|
| enc_X = torch.unsqueeze(
|
| torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)
|
| enc_outputs = net.encoder(enc_X, enc_valid_len)
|
| dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
|
|
|
| dec_X = torch.unsqueeze(torch.tensor(
|
| [tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)
|
| output_seq, attention_weight_seq = [], []
|
| for _ in range(num_steps):
|
| Y, dec_state = net.decoder(dec_X, dec_state)
|
|
|
| dec_X = Y.argmax(dim=2)
|
| pred = dec_X.squeeze(dim=0).type(torch.int32).item()
|
|
|
| if save_attention_weights:
|
| attention_weight_seq.append(net.decoder.attention_weights)
|
|
|
| if pred == tgt_vocab['<eos>']:
|
| break
|
| output_seq.append(pred)
|
| return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq |