Korean_message_detecting / 250405_eum_lstm.py
meal-bbang's picture
Upload 6 files
b5dc155 verified
# -*- coding: utf-8 -*-
"""250405_Eum_lstm.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1-ABY8VONWw9g6ESYhz0Cir0qcrT-OfIW
"""
import torch
import os
from google.colab import files
import pandas as pd
from sklearn.model_selection import train_test_split
import re
import nltk
from nltk.tokenize import word_tokenize
from collections import Counter
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
# 1. GPU ์‚ฌ์šฉ ๊ฐ€๋Šฅ ์—ฌ๋ถ€ ํ™•์ธ ๋ฐ device ์„ค์ •
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ device:", device)
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
# 2. ๋‚ด PC์˜ ์—‘์…€ ํŒŒ์ผ ์—…๋กœ๋“œ (Colab์—์„œ ํŒŒ์ผ ์—…๋กœ๋“œ ์ฐฝ์ด ๋œน๋‹ˆ๋‹ค)
uploaded = files.upload() # ์—ฌ๊ธฐ์„œ 'spam_data.xlsx' ๋“ฑ ํŒŒ์ผ์„ ์„ ํƒํ•˜์„ธ์š”
# 3. ์—…๋กœ๋“œํ•œ CSV ํŒŒ์ผ์„ pandas DataFrame์œผ๋กœ ์ฝ์–ด์˜ค๊ธฐ
# CSV ํŒŒ์ผ ์ด๋ฆ„์€ ์—…๋กœ๋“œํ•œ ํŒŒ์ผ๋ช…๊ณผ ๋™์ผํ•˜๊ฒŒ ์‚ฌ์šฉํ•˜์„ธ์š”.
data = pd.read_csv('lgaidataset (1).csv') # pd.read_excel ๋Œ€์‹  pd.read_csv ์‚ฌ์šฉ
# 1. index ์—ด์—์„œ ํ˜„์žฌ ๊ฐ€์žฅ ๋†’์€ ๊ฐ’ ์ฐพ๊ธฐ
max_index = data['index'].max() # ์˜ˆ: 33436.0
# 2. NaN ๊ฐ’์˜ ๊ฐœ์ˆ˜ ํŒŒ์•…
nan_count = data['index'].isna().sum() # ์˜ˆ: 1
# 3. NaN์„ ์ฑ„์šธ ์ƒˆ๋กœ์šด ๊ฐ’ ์ƒ์„ฑ (์ตœ๋Œ€๊ฐ’ + 1๋ถ€ํ„ฐ ์ˆœ์ฐจ์ ์œผ๋กœ)
new_values = range(int(max_index) + 1, int(max_index) + 1 + nan_count) # ์˜ˆ: [33437]
# 4. NaN์ด ์žˆ๋Š” ์œ„์น˜์— ์ƒˆ๋กœ์šด ๊ฐ’ ํ• ๋‹น
data.loc[data['index'].isna(), 'index'] = new_values
data.loc[:, 'class'] = data['class'] + 1
# 4. Train, Test ๋ฐ์ดํ„ฐ ๋ถ„ํ•  (์ „์ฒด ๋ฐ์ดํ„ฐ์˜ 20%๋ฅผ ํ…Œ์ŠคํŠธ์…‹์œผ๋กœ ์‚ฌ์šฉ)
train_df, test_df = train_test_split(data, test_size=0.2, random_state=42)
print("Train ๋ฐ์ดํ„ฐ ํฌ๊ธฐ:", train_df.shape)
print("Test ๋ฐ์ดํ„ฐ ํฌ๊ธฐ:", test_df.shape)
# 5. ํ…์ŠคํŠธ ์ „์ฒ˜๋ฆฌ ๋ฐ ํ† ํฐํ™” ํ•จ์ˆ˜ ์ •์˜
nltk.download('punkt') # ์ฒ˜์Œ ํ•œ ๋ฒˆ๋งŒ ์‹คํ–‰ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค
nltk.download('punkt_tab')
def preprocess_text(text):
"""
์ž…๋ ฅ๋œ ํ…์ŠคํŠธ๋ฅผ ์†Œ๋ฌธ์žํ™”, ํŠน์ˆ˜๋ฌธ์ž ์ œ๊ฑฐ ํ›„ ๋‹จ์–ด ๋‹จ์œ„๋กœ ํ† ํฐํ™”ํ•ฉ๋‹ˆ๋‹ค.
"""
text = text.lower() # ๋ชจ๋‘ ์†Œ๋ฌธ์ž๋กœ ๋ณ€ํ™˜
text = re.sub(r'[^๊ฐ€-ํžฃa-z0-9\s]', '', text)
tokens = word_tokenize(text) # ๋‹จ์–ด ๋‹จ์œ„ ํ† ํฐํ™”
return tokens
"""์•„๋ฌด ๋ฐ์ดํ„ฐ์— ๊ด€ํ•ด preprocess_text ํ•จ์ˆ˜ ์‹คํ–‰ํ•ด์„œ ํ•œ๊ตญ์–ด ํ† ํฐํ™” ๋˜๋Š”์ง€ ํ™•์ธํ•˜๊ธฐ"""
# 6. Vocabulary(๋‹จ์–ด ์ง‘ํ•ฉ) ๊ตฌ์ถ•
all_tokens = []
# train ๋ฐ์ดํ„ฐ์˜ ๋ชจ๋“  ๋ฉ”์‹œ์ง€์— ๋Œ€ํ•ด ํ† ํฐ์„ ์ถ”์ถœํ•ฉ๋‹ˆ๋‹ค.
for msg in train_df['content']:
tokens = preprocess_text(msg)
all_tokens.extend(tokens)
# ๊ฐ ๋‹จ์–ด์˜ ๋นˆ๋„์ˆ˜๋ฅผ ๊ณ„์‚ฐํ•ฉ๋‹ˆ๋‹ค.
word_counts = Counter(all_tokens)
# ์ž์ฃผ ๋“ฑ์žฅํ•˜๋Š” ๋‹จ์–ด๋งŒ ์‚ฌ์šฉ (์—ฌ๊ธฐ์„œ๋Š” ๋นˆ๋„์ˆ˜ 1 ์ด์ƒ์ธ ๋‹จ์–ด ๋ชจ๋‘ ์‚ฌ์šฉ)
# ์ธ๋ฑ์Šค 0์€ <PAD>์šฉ, 1์€ <UNK>(์•Œ ์ˆ˜ ์—†๋Š” ๋‹จ์–ด) ์šฉ์œผ๋กœ ์˜ˆ์•ฝํ•ฉ๋‹ˆ๋‹ค.
vocab = {word: i+2 for i, (word, count) in enumerate(word_counts.items()) if count >= 1}
vocab["<PAD>"] = 0
vocab["<UNK>"] = 1
vocab_size = len(vocab)+1
print("๋‹จ์–ด ์ง‘ํ•ฉ ํฌ๊ธฐ:", vocab_size)
# 7. ํ…์ŠคํŠธ๋ฅผ ์ˆซ์ž ์‹œํ€€์Šค๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ํ•จ์ˆ˜ (์ตœ๋Œ€ ๊ธธ์ด max_len์œผ๋กœ ํŒจ๋”ฉ ๋˜๋Š” ์ž๋ฅด๊ธฐ)
def text_to_sequence(text, vocab, max_len=50):
"""
ํ…์ŠคํŠธ๋ฅผ ํ† ํฐํ™”ํ•œ ํ›„ ๋‹จ์–ด๋ฅผ ํ•ด๋‹น ์ธ๋ฑ์Šค๋กœ ๋ณ€ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
max_len๋ณด๋‹ค ์งง์œผ๋ฉด ํŒจ๋”ฉ(<PAD>), ๊ธธ๋ฉด ์ž๋ฆ…๋‹ˆ๋‹ค.
"""
tokens = preprocess_text(text)
seq = [vocab.get(token, vocab["<UNK>"]) for token in tokens] # ๋‹จ์–ด๊ฐ€ ์—†์œผ๋ฉด <UNK> ์‚ฌ์šฉ
if len(seq) < max_len:
seq = seq + [vocab["<PAD>"]] * (max_len - len(seq)) # ๋ถ€์กฑํ•œ ๊ธธ์ด๋งŒํผ ํŒจ๋”ฉ
else:
seq = seq[:max_len] # max_len๊นŒ์ง€๋งŒ ์‚ฌ์šฉ
return seq
# 8. PyTorch Dataset ํด๋ž˜์Šค ์ •์˜ (๋ฉ”์‹œ์ง€์™€ ๋ผ๋ฒจ์„ ์ˆซ์ž ์‹œํ€€์Šค๋กœ ๋ณ€ํ™˜)
class SpamDataset(Dataset):
def __init__(self, df, vocab, max_len=50):
self.messages = df['content'].tolist() # ๋ฉ”์‹œ์ง€ ๋ฆฌ์ŠคํŠธ
self.labels = df['class'].tolist() # ๋ผ๋ฒจ ๋ฆฌ์ŠคํŠธ (1~5)
self.vocab = vocab
self.max_len = max_len
def __len__(self):
return len(self.messages)
def __getitem__(self, idx):
message = self.messages[idx]
label = self.labels[idx]
# ๋ฉ”์‹œ์ง€๋ฅผ ์ˆซ์ž ์‹œํ€€์Šค๋กœ ๋ณ€ํ™˜
seq = text_to_sequence(message, self.vocab, self.max_len)
# ๋ผ๋ฒจ์€ 0๋ถ€ํ„ฐ ์‹œ์ž‘ํ•˜๋„๋ก ๋ณ€ํ™˜ (์˜ˆ: 1โ†’0, 2โ†’1, โ€ฆ)
return torch.tensor(seq, dtype=torch.long), torch.tensor(label-1, dtype=torch.long)
# 9. DataLoader ์ƒ์„ฑ (๋ฐฐ์น˜ ์ฒ˜๋ฆฌ ๋ฐ ์…”ํ”Œ)
batch_size = 64
train_dataset = SpamDataset(train_df, vocab)
test_dataset = SpamDataset(test_df, vocab)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 10. LSTM ๊ธฐ๋ฐ˜ ๋ถ„๋ฅ˜ ๋ชจ๋ธ ์ •์˜ (์ž„๋ฒ ๋”ฉ, LSTM, ์„ ํ˜• ๊ณ„์ธต ํฌํ•จ)
class LSTMClassifier(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim, num_layers=1):
super(LSTMClassifier, self).__init__()
# ์ž„๋ฒ ๋”ฉ ๋ ˆ์ด์–ด: ๋‹จ์–ด ์ธ๋ฑ์Šค๋ฅผ ์ž„๋ฒ ๋”ฉ ๋ฒกํ„ฐ๋กœ ๋ณ€ํ™˜ (padding_idx=0์€ <PAD>๋ฅผ ์œ„ํ•œ ์„ค์ •)
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
# LSTM ๋ ˆ์ด์–ด: ์‹œํ€€์Šค ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌ (batch_first=True๋กœ ๋ฐฐ์น˜ ์ฐจ์›์ด ์ฒซ๋ฒˆ์งธ)
self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True)
# ๋ถ„๋ฅ˜๋ฅผ ์œ„ํ•œ ์„ ํ˜•(fully-connected) ๋ ˆ์ด์–ด
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
# x: [batch_size, seq_len]
x = self.embedding(x) # [batch_size, seq_len, embed_dim]
out, (hn, cn) = self.lstm(x) # LSTM ์ฒ˜๋ฆฌ, out: ๋ชจ๋“  ํƒ€์ž„์Šคํ… ์ถœ๋ ฅ
out = out[:, -1, :] # ๋งˆ์ง€๋ง‰ ํƒ€์ž„์Šคํ…์˜ ์ถœ๋ ฅ๋งŒ ์‚ฌ์šฉ
out = self.fc(out) # ์„ ํ˜• ๊ณ„์ธต ํ†ต๊ณผ โ†’ [batch_size, output_dim]
return out
# ๋ชจ๋ธ ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ ์„ค์ •
embed_dim = 100 # ์ž„๋ฒ ๋”ฉ ์ฐจ์›
hidden_dim = 128 # LSTM hidden state ์ฐจ์›
output_dim = 5 # ๋ถ„๋ฅ˜ํ•  ํด๋ž˜์Šค ๊ฐœ์ˆ˜ (1~5)
num_layers = 1
#dropout = 0.2 # 0.2 ~ 0.5
# ๋ชจ๋ธ ์ดˆ๊ธฐํ™” ๋ฐ device(GPU ๋˜๋Š” CPU)๋กœ ์ด๋™
model = LSTMClassifier(vocab_size, embed_dim, hidden_dim, output_dim, num_layers)
model.to(device)
# 11. ์†์‹ค ํ•จ์ˆ˜์™€ ์˜ตํ‹ฐ๋งˆ์ด์ € ์ •์˜
criterion = nn.CrossEntropyLoss() # CrossEntropyLoss๋Š” softmax ํฌํ•จ (๋ผ๋ฒจ์€ 0~4)
#optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0005)
# 12. ํ•™์Šต ๋ฃจํ”„ ์‹คํ–‰
num_epochs = 3 # ์—ํญ ์ˆ˜๋Š” ํ•„์š”์— ๋”ฐ๋ผ ์กฐ์ • ๊ฐ€๋Šฅ
for epoch in range(num_epochs):
model.train() # ํ•™์Šต ๋ชจ๋“œ๋กœ ์ „ํ™˜
epoch_loss = 0
for batch_idx, (inputs, labels) in enumerate(train_loader):
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad() # ๊ธฐ์šธ๊ธฐ ์ดˆ๊ธฐํ™”
outputs = model(inputs) # ๋ชจ๋ธ ์˜ˆ์ธก
loss = criterion(outputs, labels) # ์†์‹ค ๊ณ„์‚ฐ
loss.backward() # ์—ญ์ „ํŒŒ ์ˆ˜ํ–‰
optimizer.step() # ๊ฐ€์ค‘์น˜ ์—…๋ฐ์ดํŠธ
epoch_loss += loss.item()
if (batch_idx + 1) % 100 == 0:
print(f"Epoch {epoch+1}/{num_epochs}, Iteration {batch_idx+1}/{len(train_loader)}, Loss: {loss.item():.4f}")
print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss/len(train_loader):.4f}")
# 13. ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ๋กœ ๋ชจ๋ธ ํ‰๊ฐ€
model.eval() # ํ‰๊ฐ€ ๋ชจ๋“œ ์ „ํ™˜ (dropout ๋“ฑ ๋น„ํ™œ์„ฑํ™”)
correct = 0
total = 0
with torch.no_grad(): # ํ‰๊ฐ€ ์‹œ์—๋Š” ๊ธฐ์šธ๊ธฐ ๊ณ„์‚ฐ ๋ถˆํ•„์š”
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, predicted = torch.max(outputs, 1) # ๊ฐ€์žฅ ๋†’์€ ํ™•๋ฅ ์˜ ํด๋ž˜์Šค๋ฅผ ์„ ํƒ
total += labels.size(0)
correct += (predicted == labels).sum().item()
print(f"Test Accuracy: {100 * correct / total:.2f}%")
"""๋‚ด๊ฐ€ ๋„ฃ์€ ์ž…๋ ฅ์„ ๋ถ„๋ฅ˜ํ•˜๋Š” ์ฝ”๋“œ ์ถ”๊ฐ€"""
def predict_message(text):
"""
์ž…๋ ฅ๋œ ํ…์ŠคํŠธ์— ๋Œ€ํ•ด ์ „์ฒ˜๋ฆฌ, ์ˆซ์ž ์‹œํ€€์Šค ๋ณ€ํ™˜ ํ›„ ๋ชจ๋ธ ์˜ˆ์ธก์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
๋ฐ˜ํ™˜๊ฐ’์€ ์˜ˆ์ธก๋œ ํด๋ž˜์Šค ๋ฒˆํ˜ธ(์›๋ž˜ ๋ผ๋ฒจ, 1~5)์ž…๋‹ˆ๋‹ค.
"""
model.eval() # ํ‰๊ฐ€ ๋ชจ๋“œ๋กœ ์ „ํ™˜
# ์ž…๋ ฅ ํ…์ŠคํŠธ๋ฅผ ์ˆซ์ž ์‹œํ€€์Šค๋กœ ๋ณ€ํ™˜ (max_len ๊ธธ์ด๋กœ ํŒจ๋”ฉ ๋˜๋Š” ์ž๋ฅด๊ธฐ)
seq = text_to_sequence(text, vocab, max_len=50)
# ๋ชจ๋ธ ์ž…๋ ฅ์œผ๋กœ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด ํ…์„œ๋กœ ๋ณ€ํ™˜ํ•˜๊ณ  ๋ฐฐ์น˜ ์ฐจ์› ์ถ”๊ฐ€
input_tensor = torch.tensor(seq, dtype=torch.long).unsqueeze(0).to(device)
with torch.no_grad():
output = model(input_tensor)
# ๊ฐ€์žฅ ๋†’์€ ํ™•๋ฅ ์„ ๊ฐ€์ง„ ํด๋ž˜์Šค๋ฅผ ์„ ํƒ (0~4๋กœ ์˜ˆ์ธก๋˜์—ˆ์œผ๋ฏ€๋กœ, ์‹ค์ œ ๋ผ๋ฒจ์€ +1)
_, predicted = torch.max(output, 1)
predicted_class = predicted.item() + 1
return predicted_class
# ์‚ฌ์šฉ์ž๋กœ๋ถ€ํ„ฐ ์ง์ ‘ ๋ฉ”์‹œ์ง€ ์ž…๋ ฅ๋ฐ›๊ธฐ
sample_text = input("์˜ˆ์ธกํ•  ๋ฉ”์‹œ์ง€๋ฅผ ์ž…๋ ฅํ•˜์„ธ์š”: ")
predicted_class = predict_message(sample_text)
print("์˜ˆ์ธก๋œ ํด๋ž˜์Šค:", predicted_class)
#์—ฌ๊ธฐ์„œ๋ถ€ํ„ฐ๋Š” ์œ„์—์„œ ์ƒ์„ฑ๋œ ๋ชจ๋ธ์„ ๋””๋ฐ”์ด์Šค์— ์ €์žฅํ•˜๋Š” ์ฝ”๋“œ
# Google Drive ๋งˆ์šดํŠธ
from google.colab import drive
drive.mount('/content/drive')
# ์ €์žฅํ•  ๊ฒฝ๋กœ ์„ค์ • (์—ฌ๊ธฐ์— ๋‹น์‹ ์ด ์›ํ•˜๋Š” ๋””๋ ‰ํ† ๋ฆฌ์™€ ํŒŒ์ผ ์ด๋ฆ„์„ ๋„ฃ์œผ์„ธ์š”)
save_dir = '/content/drive/My Drive/' # Google Drive์˜ ๊ธฐ๋ณธ ๋””๋ ‰ํ† ๋ฆฌ, ํ•„์š”ํ•˜๋ฉด ํ•˜์œ„ ํด๋” ์ถ”๊ฐ€ ๊ฐ€๋Šฅ
model_name = 'Eum_lstm_save.pth' # ๋ชจ๋ธ ํŒŒ์ผ ์ด๋ฆ„ (์˜ˆ: 'my_model.pth'). ๋‹น์‹ ์ด ์›ํ•˜๋Š” ์ด๋ฆ„์œผ๋กœ ๋ณ€๊ฒฝํ•˜์„ธ์š”.
# ์ „์ฒด ์ €์žฅ ๊ฒฝ๋กœ
model_path = os.path.join(save_dir, model_name)
# ๋ชจ๋ธ ์ €์žฅ
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")
# (์„ ํƒ) ์ €์žฅ๋œ ํŒŒ์ผ์„ ๋กœ์ปฌ๋กœ ๋‹ค์šด๋กœ๋“œํ•˜๋ ค๋ฉด
from google.colab import files
files.download(model_path)