nafsi-transformer / tranformer-1.py
Syamsuddin's picture
Upload tranformer-1.py
f9a3b96 verified
import torch
import torch.nn as nn
import math
# --- 1. Positional Encoding ---
# Transformer tidak memiliki pemahaman inheren tentang urutan token (seperti RNN).
# Positional Encoding menambahkan informasi posisi ke dalam embedding input.
# Ini menggunakan fungsi sinus dan kosinus dengan frekuensi yang berbeda.
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
"""
Args:
d_model (int): Dimensi embedding, harus sama dengan dimensi model.
max_len (int): Panjang sekuens maksimum yang mungkin.
"""
super(PositionalEncoding, self).__init__()
# Buat matriks 'pe' (positional encoding) dengan ukuran (max_len, d_model)
pe = torch.zeros(max_len, d_model)
# Buat tensor 'position' yang berisi posisi [0, 1, 2, ..., max_len-1]
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
# Hitung pembagi untuk frekuensi sin/cos
# Ini adalah implementasi dari rumus dalam paper "Attention Is All You Need"
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
# Terapkan fungsi sin ke kolom genap
pe[:, 0::2] = torch.sin(position * div_term)
# Terapkan fungsi cos ke kolom ganjil
pe[:, 1::2] = torch.cos(position * div_term)
# Tambahkan dimensi batch di awal, menjadi (1, max_len, d_model)
pe = pe.unsqueeze(0)
# Daftarkan 'pe' sebagai buffer. Buffer adalah state dari modul
# yang bukan merupakan parameter (tidak di-train), tetapi harus disimpan.
self.register_buffer('pe', pe)
def forward(self, x):
"""
Args:
x (torch.Tensor): Tensor input embedding dengan shape (batch_size, seq_len, d_model)
Returns:
torch.Tensor: Tensor dengan informasi posisi yang ditambahkan, shape sama.
"""
# Tambahkan positional encoding ke tensor input x.
# x.size(1) adalah panjang sekuens aktual dari input.
x = x + self.pe[:, :x.size(1), :]
return x
# --- 2. Multi-Head Attention ---
# Mekanisme ini memungkinkan model untuk bersama-sama memperhatikan informasi
# dari posisi yang berbeda dalam sekuens. "Multi-head" berarti kita melakukannya
# beberapa kali secara paralel, masing-masing "head" fokus pada aspek yang berbeda.
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
"""
Args:
d_model (int): Dimensi model.
num_heads (int): Jumlah "attention heads".
"""
super(MultiHeadAttention, self).__init__()
assert d_model % num_heads == 0, "d_model harus bisa dibagi dengan num_heads"
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads # Dimensi per head
# Lapisan linear untuk Query, Key, Value, dan output
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
def scaled_dot_product_attention(self, Q, K, V, mask=None):
"""
Ini adalah inti dari mekanisme attention.
Rumus: Attention(Q, K, V) = softmax( (Q * K^T) / sqrt(d_k) ) * V
"""
# 1. Hitung skor attention (dot product antara Q dan K)
attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
# 2. Terapkan mask (jika ada)
# Mask digunakan di decoder untuk mencegah model "melihat" token masa depan.
if mask is not None:
attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
# 3. Terapkan softmax untuk mendapatkan bobot attention
attn_probs = torch.softmax(attn_scores, dim=-1)
# 4. Kalikan bobot dengan V untuk mendapatkan output
output = torch.matmul(attn_probs, V)
return output
def split_heads(self, x):
"""
Memecah tensor input menjadi beberapa head.
Input: (batch_size, seq_len, d_model)
Output: (batch_size, num_heads, seq_len, d_k)
"""
batch_size, seq_len, _ = x.size()
return x.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
def combine_heads(self, x):
"""
Menggabungkan kembali hasil dari semua head.
Input: (batch_size, num_heads, seq_len, d_k)
Output: (batch_size, seq_len, d_model)
"""
batch_size, _, seq_len, _ = x.size()
return x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
def forward(self, Q, K, V, mask=None):
# 1. Proyeksikan Q, K, V menggunakan lapisan linear
Q = self.W_q(Q)
K = self.W_k(K)
V = self.W_v(V)
# 2. Pecah menjadi beberapa head
Q = self.split_heads(Q)
K = self.split_heads(K)
V = self.split_heads(V)
# 3. Lakukan scaled dot-product attention
attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
# 4. Gabungkan kembali head-head tersebut
output = self.combine_heads(attn_output)
# 5. Lewatkan melalui lapisan linear output akhir
output = self.W_o(output)
return output
# --- 3. Position-wise Feed-Forward Network ---
# Ini adalah jaringan neural network sederhana yang diterapkan pada setiap posisi
# secara terpisah dan identik setelah attention.
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff):
"""
Args:
d_model (int): Dimensi model.
d_ff (int): Dimensi lapisan tersembunyi (feed-forward).
"""
super(PositionwiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()
def forward(self, x):
return self.fc2(self.relu(self.fc1(x)))
# --- 4. Encoder Layer ---
# Satu lapisan Encoder terdiri dari Multi-Head Attention dan Feed-Forward Network.
# Terdapat juga koneksi residual (Add) dan normalisasi lapisan (Norm).
class EncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionwiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
# Sub-layer 1: Multi-Head Attention
attn_output = self.self_attn(x, x, x, mask)
# Add & Norm
x = self.norm1(x + self.dropout(attn_output))
# Sub-layer 2: Feed-Forward
ff_output = self.feed_forward(x)
# Add & Norm
x = self.norm2(x + self.dropout(ff_output))
return x
# --- 5. Decoder Layer ---
# Mirip dengan Encoder, tetapi memiliki dua sub-layer attention:
# 1. Masked Multi-Head Attention: Untuk memperhatikan token sebelumnya di output.
# 2. Encoder-Decoder Attention: Untuk memperhatikan output dari Encoder.
class DecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(DecoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.cross_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionwiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_output, src_mask, tgt_mask):
# Sub-layer 1: Masked Multi-Head Attention (Self-Attention)
attn_output = self.self_attn(x, x, x, tgt_mask)
# Add & Norm
x = self.norm1(x + self.dropout(attn_output))
# Sub-layer 2: Encoder-Decoder Attention (Cross-Attention)
# Q dari decoder, K dan V dari output encoder
attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
# Add & Norm
x = self.norm2(x + self.dropout(attn_output))
# Sub-layer 3: Feed-Forward
ff_output = self.feed_forward(x)
# Add & Norm
x = self.norm3(x + self.dropout(ff_output))
return x
# --- 6. Transformer Model ---
# Menggabungkan semua komponen menjadi arsitektur Encoder-Decoder.
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout):
super(Transformer, self).__init__()
# Lapisan Embedding dan Positional Encoding
self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, max_len)
# Tumpukan Encoder dan Decoder Layers
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
# Lapisan Linear akhir dan Softmax untuk prediksi kata berikutnya
self.fc_out = nn.Linear(d_model, tgt_vocab_size)
self.dropout = nn.Dropout(dropout)
def generate_mask(self, src, tgt):
# src_mask: Untuk padding pada input encoder. Shape: (batch, 1, 1, src_len)
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
# tgt_mask: Untuk padding dan mencegah atensi ke token masa depan pada decoder.
tgt_pad_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
seq_len = tgt.size(1)
# Buat matriks segitiga bawah
tgt_sub_mask = torch.tril(torch.ones((seq_len, seq_len), device=src.device)).bool()
tgt_mask = tgt_pad_mask & tgt_sub_mask # Gabungkan keduanya
return src_mask, tgt_mask
def forward(self, src, tgt):
src_mask, tgt_mask = self.generate_mask(src, tgt)
# Proses Encoder
# 1. Embedding + Positional Encoding
src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
# 2. Lewatkan ke setiap layer encoder
enc_output = src_embedded
for layer in self.encoder_layers:
enc_output = layer(enc_output, src_mask)
# Proses Decoder
# 1. Embedding + Positional Encoding
tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
# 2. Lewatkan ke setiap layer decoder
dec_output = tgt_embedded
for layer in self.decoder_layers:
dec_output = layer(dec_output, enc_output, src_mask, tgt_mask)
# 3. Lapisan output
output = self.fc_out(dec_output)
return output
# --- Contoh Penggunaan ---
if __name__ == '__main__':
# Parameter model (dibuat kecil untuk tujuan belajar)
src_vocab_size = 5000 # Ukuran kosakata bahasa sumber
tgt_vocab_size = 5000 # Ukuran kosakata bahasa target
d_model = 128 # Dimensi model (harus genap)
num_heads = 4 # Jumlah attention heads
num_layers = 3 # Jumlah tumpukan Encoder/Decoder
d_ff = 512 # Dimensi hidden layer di Feed Forward
max_len = 100 # Panjang sekuens maksimum
dropout = 0.1
# Inisialisasi model
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout)
print(f"Model Transformer berhasil dibuat dengan {sum(p.numel() for p in model.parameters() if p.requires_grad):,} parameter.")
# Buat data input dummy
# Anggap '0' adalah token untuk padding
src_data = torch.randint(1, src_vocab_size, (64, max_len)) # (batch_size, seq_len)
tgt_data = torch.randint(1, tgt_vocab_size, (64, max_len))
# Jalankan model (forward pass)
try:
output = model(src_data, tgt_data)
print("\nForward pass berhasil!")
print(f"Bentuk input sumber (src): {src_data.shape}")
print(f"Bentuk input target (tgt): {tgt_data.shape}")
print(f"Bentuk output model: {output.shape}")
# Bentuk output: (batch_size, seq_len, tgt_vocab_size)
# Ini adalah probabilitas (logits) untuk setiap kata dalam kosakata target
except Exception as e:
print(f"\nTerjadi error saat forward pass: {e}")