import torch import torch.nn as nn import math # --- 1. Positional Encoding --- # Transformer tidak memiliki pemahaman inheren tentang urutan token (seperti RNN). # Positional Encoding menambahkan informasi posisi ke dalam embedding input. # Ini menggunakan fungsi sinus dan kosinus dengan frekuensi yang berbeda. class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): """ Args: d_model (int): Dimensi embedding, harus sama dengan dimensi model. max_len (int): Panjang sekuens maksimum yang mungkin. """ super(PositionalEncoding, self).__init__() # Buat matriks 'pe' (positional encoding) dengan ukuran (max_len, d_model) pe = torch.zeros(max_len, d_model) # Buat tensor 'position' yang berisi posisi [0, 1, 2, ..., max_len-1] position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # Hitung pembagi untuk frekuensi sin/cos # Ini adalah implementasi dari rumus dalam paper "Attention Is All You Need" div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # Terapkan fungsi sin ke kolom genap pe[:, 0::2] = torch.sin(position * div_term) # Terapkan fungsi cos ke kolom ganjil pe[:, 1::2] = torch.cos(position * div_term) # Tambahkan dimensi batch di awal, menjadi (1, max_len, d_model) pe = pe.unsqueeze(0) # Daftarkan 'pe' sebagai buffer. Buffer adalah state dari modul # yang bukan merupakan parameter (tidak di-train), tetapi harus disimpan. self.register_buffer('pe', pe) def forward(self, x): """ Args: x (torch.Tensor): Tensor input embedding dengan shape (batch_size, seq_len, d_model) Returns: torch.Tensor: Tensor dengan informasi posisi yang ditambahkan, shape sama. """ # Tambahkan positional encoding ke tensor input x. # x.size(1) adalah panjang sekuens aktual dari input. x = x + self.pe[:, :x.size(1), :] return x # --- 2. Multi-Head Attention --- # Mekanisme ini memungkinkan model untuk bersama-sama memperhatikan informasi # dari posisi yang berbeda dalam sekuens. "Multi-head" berarti kita melakukannya # beberapa kali secara paralel, masing-masing "head" fokus pada aspek yang berbeda. class MultiHeadAttention(nn.Module): def __init__(self, d_model, num_heads): """ Args: d_model (int): Dimensi model. num_heads (int): Jumlah "attention heads". """ super(MultiHeadAttention, self).__init__() assert d_model % num_heads == 0, "d_model harus bisa dibagi dengan num_heads" self.d_model = d_model self.num_heads = num_heads self.d_k = d_model // num_heads # Dimensi per head # Lapisan linear untuk Query, Key, Value, dan output self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) def scaled_dot_product_attention(self, Q, K, V, mask=None): """ Ini adalah inti dari mekanisme attention. Rumus: Attention(Q, K, V) = softmax( (Q * K^T) / sqrt(d_k) ) * V """ # 1. Hitung skor attention (dot product antara Q dan K) attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k) # 2. Terapkan mask (jika ada) # Mask digunakan di decoder untuk mencegah model "melihat" token masa depan. if mask is not None: attn_scores = attn_scores.masked_fill(mask == 0, -1e9) # 3. Terapkan softmax untuk mendapatkan bobot attention attn_probs = torch.softmax(attn_scores, dim=-1) # 4. Kalikan bobot dengan V untuk mendapatkan output output = torch.matmul(attn_probs, V) return output def split_heads(self, x): """ Memecah tensor input menjadi beberapa head. Input: (batch_size, seq_len, d_model) Output: (batch_size, num_heads, seq_len, d_k) """ batch_size, seq_len, _ = x.size() return x.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2) def combine_heads(self, x): """ Menggabungkan kembali hasil dari semua head. Input: (batch_size, num_heads, seq_len, d_k) Output: (batch_size, seq_len, d_model) """ batch_size, _, seq_len, _ = x.size() return x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model) def forward(self, Q, K, V, mask=None): # 1. Proyeksikan Q, K, V menggunakan lapisan linear Q = self.W_q(Q) K = self.W_k(K) V = self.W_v(V) # 2. Pecah menjadi beberapa head Q = self.split_heads(Q) K = self.split_heads(K) V = self.split_heads(V) # 3. Lakukan scaled dot-product attention attn_output = self.scaled_dot_product_attention(Q, K, V, mask) # 4. Gabungkan kembali head-head tersebut output = self.combine_heads(attn_output) # 5. Lewatkan melalui lapisan linear output akhir output = self.W_o(output) return output # --- 3. Position-wise Feed-Forward Network --- # Ini adalah jaringan neural network sederhana yang diterapkan pada setiap posisi # secara terpisah dan identik setelah attention. class PositionwiseFeedForward(nn.Module): def __init__(self, d_model, d_ff): """ Args: d_model (int): Dimensi model. d_ff (int): Dimensi lapisan tersembunyi (feed-forward). """ super(PositionwiseFeedForward, self).__init__() self.fc1 = nn.Linear(d_model, d_ff) self.fc2 = nn.Linear(d_ff, d_model) self.relu = nn.ReLU() def forward(self, x): return self.fc2(self.relu(self.fc1(x))) # --- 4. Encoder Layer --- # Satu lapisan Encoder terdiri dari Multi-Head Attention dan Feed-Forward Network. # Terdapat juga koneksi residual (Add) dan normalisasi lapisan (Norm). class EncoderLayer(nn.Module): def __init__(self, d_model, num_heads, d_ff, dropout): super(EncoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, num_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout = nn.Dropout(dropout) def forward(self, x, mask): # Sub-layer 1: Multi-Head Attention attn_output = self.self_attn(x, x, x, mask) # Add & Norm x = self.norm1(x + self.dropout(attn_output)) # Sub-layer 2: Feed-Forward ff_output = self.feed_forward(x) # Add & Norm x = self.norm2(x + self.dropout(ff_output)) return x # --- 5. Decoder Layer --- # Mirip dengan Encoder, tetapi memiliki dua sub-layer attention: # 1. Masked Multi-Head Attention: Untuk memperhatikan token sebelumnya di output. # 2. Encoder-Decoder Attention: Untuk memperhatikan output dari Encoder. class DecoderLayer(nn.Module): def __init__(self, d_model, num_heads, d_ff, dropout): super(DecoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, num_heads) self.cross_attn = MultiHeadAttention(d_model, num_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.dropout = nn.Dropout(dropout) def forward(self, x, enc_output, src_mask, tgt_mask): # Sub-layer 1: Masked Multi-Head Attention (Self-Attention) attn_output = self.self_attn(x, x, x, tgt_mask) # Add & Norm x = self.norm1(x + self.dropout(attn_output)) # Sub-layer 2: Encoder-Decoder Attention (Cross-Attention) # Q dari decoder, K dan V dari output encoder attn_output = self.cross_attn(x, enc_output, enc_output, src_mask) # Add & Norm x = self.norm2(x + self.dropout(attn_output)) # Sub-layer 3: Feed-Forward ff_output = self.feed_forward(x) # Add & Norm x = self.norm3(x + self.dropout(ff_output)) return x # --- 6. Transformer Model --- # Menggabungkan semua komponen menjadi arsitektur Encoder-Decoder. class Transformer(nn.Module): def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout): super(Transformer, self).__init__() # Lapisan Embedding dan Positional Encoding self.encoder_embedding = nn.Embedding(src_vocab_size, d_model) self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model) self.positional_encoding = PositionalEncoding(d_model, max_len) # Tumpukan Encoder dan Decoder Layers self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]) self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]) # Lapisan Linear akhir dan Softmax untuk prediksi kata berikutnya self.fc_out = nn.Linear(d_model, tgt_vocab_size) self.dropout = nn.Dropout(dropout) def generate_mask(self, src, tgt): # src_mask: Untuk padding pada input encoder. Shape: (batch, 1, 1, src_len) src_mask = (src != 0).unsqueeze(1).unsqueeze(2) # tgt_mask: Untuk padding dan mencegah atensi ke token masa depan pada decoder. tgt_pad_mask = (tgt != 0).unsqueeze(1).unsqueeze(3) seq_len = tgt.size(1) # Buat matriks segitiga bawah tgt_sub_mask = torch.tril(torch.ones((seq_len, seq_len), device=src.device)).bool() tgt_mask = tgt_pad_mask & tgt_sub_mask # Gabungkan keduanya return src_mask, tgt_mask def forward(self, src, tgt): src_mask, tgt_mask = self.generate_mask(src, tgt) # Proses Encoder # 1. Embedding + Positional Encoding src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src))) # 2. Lewatkan ke setiap layer encoder enc_output = src_embedded for layer in self.encoder_layers: enc_output = layer(enc_output, src_mask) # Proses Decoder # 1. Embedding + Positional Encoding tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt))) # 2. Lewatkan ke setiap layer decoder dec_output = tgt_embedded for layer in self.decoder_layers: dec_output = layer(dec_output, enc_output, src_mask, tgt_mask) # 3. Lapisan output output = self.fc_out(dec_output) return output # --- Contoh Penggunaan --- if __name__ == '__main__': # Parameter model (dibuat kecil untuk tujuan belajar) src_vocab_size = 5000 # Ukuran kosakata bahasa sumber tgt_vocab_size = 5000 # Ukuran kosakata bahasa target d_model = 128 # Dimensi model (harus genap) num_heads = 4 # Jumlah attention heads num_layers = 3 # Jumlah tumpukan Encoder/Decoder d_ff = 512 # Dimensi hidden layer di Feed Forward max_len = 100 # Panjang sekuens maksimum dropout = 0.1 # Inisialisasi model model = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout) print(f"Model Transformer berhasil dibuat dengan {sum(p.numel() for p in model.parameters() if p.requires_grad):,} parameter.") # Buat data input dummy # Anggap '0' adalah token untuk padding src_data = torch.randint(1, src_vocab_size, (64, max_len)) # (batch_size, seq_len) tgt_data = torch.randint(1, tgt_vocab_size, (64, max_len)) # Jalankan model (forward pass) try: output = model(src_data, tgt_data) print("\nForward pass berhasil!") print(f"Bentuk input sumber (src): {src_data.shape}") print(f"Bentuk input target (tgt): {tgt_data.shape}") print(f"Bentuk output model: {output.shape}") # Bentuk output: (batch_size, seq_len, tgt_vocab_size) # Ini adalah probabilitas (logits) untuk setiap kata dalam kosakata target except Exception as e: print(f"\nTerjadi error saat forward pass: {e}")