# -*- coding: utf-8 -*- """250405_Eum_lstm.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1-ABY8VONWw9g6ESYhz0Cir0qcrT-OfIW """ import torch import os from google.colab import files import pandas as pd from sklearn.model_selection import train_test_split import re import nltk from nltk.tokenize import word_tokenize from collections import Counter from torch.utils.data import Dataset, DataLoader import torch.nn as nn # 1. GPU 사용 가능 여부 확인 및 device 설정 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print("사용 가능한 device:", device) os.environ["CUDA_LAUNCH_BLOCKING"] = "1" # 2. 내 PC의 엑셀 파일 업로드 (Colab에서 파일 업로드 창이 뜹니다) uploaded = files.upload() # 여기서 'spam_data.xlsx' 등 파일을 선택하세요 # 3. 업로드한 CSV 파일을 pandas DataFrame으로 읽어오기 # CSV 파일 이름은 업로드한 파일명과 동일하게 사용하세요. data = pd.read_csv('lgaidataset (1).csv') # pd.read_excel 대신 pd.read_csv 사용 # 1. index 열에서 현재 가장 높은 값 찾기 max_index = data['index'].max() # 예: 33436.0 # 2. NaN 값의 개수 파악 nan_count = data['index'].isna().sum() # 예: 1 # 3. NaN을 채울 새로운 값 생성 (최대값 + 1부터 순차적으로) new_values = range(int(max_index) + 1, int(max_index) + 1 + nan_count) # 예: [33437] # 4. NaN이 있는 위치에 새로운 값 할당 data.loc[data['index'].isna(), 'index'] = new_values data.loc[:, 'class'] = data['class'] + 1 # 4. Train, Test 데이터 분할 (전체 데이터의 20%를 테스트셋으로 사용) train_df, test_df = train_test_split(data, test_size=0.2, random_state=42) print("Train 데이터 크기:", train_df.shape) print("Test 데이터 크기:", test_df.shape) # 5. 텍스트 전처리 및 토큰화 함수 정의 nltk.download('punkt') # 처음 한 번만 실행하면 됩니다 nltk.download('punkt_tab') def preprocess_text(text): """ 입력된 텍스트를 소문자화, 특수문자 제거 후 단어 단위로 토큰화합니다. """ text = text.lower() # 모두 소문자로 변환 text = re.sub(r'[^가-힣a-z0-9\s]', '', text) tokens = word_tokenize(text) # 단어 단위 토큰화 return tokens """아무 데이터에 관해 preprocess_text 함수 실행해서 한국어 토큰화 되는지 확인하기""" # 6. Vocabulary(단어 집합) 구축 all_tokens = [] # train 데이터의 모든 메시지에 대해 토큰을 추출합니다. for msg in train_df['content']: tokens = preprocess_text(msg) all_tokens.extend(tokens) # 각 단어의 빈도수를 계산합니다. word_counts = Counter(all_tokens) # 자주 등장하는 단어만 사용 (여기서는 빈도수 1 이상인 단어 모두 사용) # 인덱스 0은 용, 1은 (알 수 없는 단어) 용으로 예약합니다. vocab = {word: i+2 for i, (word, count) in enumerate(word_counts.items()) if count >= 1} vocab[""] = 0 vocab[""] = 1 vocab_size = len(vocab)+1 print("단어 집합 크기:", vocab_size) # 7. 텍스트를 숫자 시퀀스로 변환하는 함수 (최대 길이 max_len으로 패딩 또는 자르기) def text_to_sequence(text, vocab, max_len=50): """ 텍스트를 토큰화한 후 단어를 해당 인덱스로 변환합니다. max_len보다 짧으면 패딩(), 길면 자릅니다. """ tokens = preprocess_text(text) seq = [vocab.get(token, vocab[""]) for token in tokens] # 단어가 없으면 사용 if len(seq) < max_len: seq = seq + [vocab[""]] * (max_len - len(seq)) # 부족한 길이만큼 패딩 else: seq = seq[:max_len] # max_len까지만 사용 return seq # 8. PyTorch Dataset 클래스 정의 (메시지와 라벨을 숫자 시퀀스로 변환) class SpamDataset(Dataset): def __init__(self, df, vocab, max_len=50): self.messages = df['content'].tolist() # 메시지 리스트 self.labels = df['class'].tolist() # 라벨 리스트 (1~5) self.vocab = vocab self.max_len = max_len def __len__(self): return len(self.messages) def __getitem__(self, idx): message = self.messages[idx] label = self.labels[idx] # 메시지를 숫자 시퀀스로 변환 seq = text_to_sequence(message, self.vocab, self.max_len) # 라벨은 0부터 시작하도록 변환 (예: 1→0, 2→1, …) return torch.tensor(seq, dtype=torch.long), torch.tensor(label-1, dtype=torch.long) # 9. DataLoader 생성 (배치 처리 및 셔플) batch_size = 64 train_dataset = SpamDataset(train_df, vocab) test_dataset = SpamDataset(test_df, vocab) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) # 10. LSTM 기반 분류 모델 정의 (임베딩, LSTM, 선형 계층 포함) class LSTMClassifier(nn.Module): def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim, num_layers=1): super(LSTMClassifier, self).__init__() # 임베딩 레이어: 단어 인덱스를 임베딩 벡터로 변환 (padding_idx=0은 를 위한 설정) self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0) # LSTM 레이어: 시퀀스 데이터를 처리 (batch_first=True로 배치 차원이 첫번째) self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True) # 분류를 위한 선형(fully-connected) 레이어 self.fc = nn.Linear(hidden_dim, output_dim) def forward(self, x): # x: [batch_size, seq_len] x = self.embedding(x) # [batch_size, seq_len, embed_dim] out, (hn, cn) = self.lstm(x) # LSTM 처리, out: 모든 타임스텝 출력 out = out[:, -1, :] # 마지막 타임스텝의 출력만 사용 out = self.fc(out) # 선형 계층 통과 → [batch_size, output_dim] return out # 모델 하이퍼파라미터 설정 embed_dim = 100 # 임베딩 차원 hidden_dim = 128 # LSTM hidden state 차원 output_dim = 5 # 분류할 클래스 개수 (1~5) num_layers = 1 #dropout = 0.2 # 0.2 ~ 0.5 # 모델 초기화 및 device(GPU 또는 CPU)로 이동 model = LSTMClassifier(vocab_size, embed_dim, hidden_dim, output_dim, num_layers) model.to(device) # 11. 손실 함수와 옵티마이저 정의 criterion = nn.CrossEntropyLoss() # CrossEntropyLoss는 softmax 포함 (라벨은 0~4) #optimizer = torch.optim.Adam(model.parameters(), lr=0.001) optimizer = torch.optim.AdamW(model.parameters(), lr=0.0005) # 12. 학습 루프 실행 num_epochs = 3 # 에폭 수는 필요에 따라 조정 가능 for epoch in range(num_epochs): model.train() # 학습 모드로 전환 epoch_loss = 0 for batch_idx, (inputs, labels) in enumerate(train_loader): inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() # 기울기 초기화 outputs = model(inputs) # 모델 예측 loss = criterion(outputs, labels) # 손실 계산 loss.backward() # 역전파 수행 optimizer.step() # 가중치 업데이트 epoch_loss += loss.item() if (batch_idx + 1) % 100 == 0: print(f"Epoch {epoch+1}/{num_epochs}, Iteration {batch_idx+1}/{len(train_loader)}, Loss: {loss.item():.4f}") print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss/len(train_loader):.4f}") # 13. 테스트 데이터로 모델 평가 model.eval() # 평가 모드 전환 (dropout 등 비활성화) correct = 0 total = 0 with torch.no_grad(): # 평가 시에는 기울기 계산 불필요 for inputs, labels in test_loader: inputs, labels = inputs.to(device), labels.to(device) outputs = model(inputs) _, predicted = torch.max(outputs, 1) # 가장 높은 확률의 클래스를 선택 total += labels.size(0) correct += (predicted == labels).sum().item() print(f"Test Accuracy: {100 * correct / total:.2f}%") """내가 넣은 입력을 분류하는 코드 추가""" def predict_message(text): """ 입력된 텍스트에 대해 전처리, 숫자 시퀀스 변환 후 모델 예측을 수행합니다. 반환값은 예측된 클래스 번호(원래 라벨, 1~5)입니다. """ model.eval() # 평가 모드로 전환 # 입력 텍스트를 숫자 시퀀스로 변환 (max_len 길이로 패딩 또는 자르기) seq = text_to_sequence(text, vocab, max_len=50) # 모델 입력으로 사용하기 위해 텐서로 변환하고 배치 차원 추가 input_tensor = torch.tensor(seq, dtype=torch.long).unsqueeze(0).to(device) with torch.no_grad(): output = model(input_tensor) # 가장 높은 확률을 가진 클래스를 선택 (0~4로 예측되었으므로, 실제 라벨은 +1) _, predicted = torch.max(output, 1) predicted_class = predicted.item() + 1 return predicted_class # 사용자로부터 직접 메시지 입력받기 sample_text = input("예측할 메시지를 입력하세요: ") predicted_class = predict_message(sample_text) print("예측된 클래스:", predicted_class) #여기서부터는 위에서 생성된 모델을 디바이스에 저장하는 코드 # Google Drive 마운트 from google.colab import drive drive.mount('/content/drive') # 저장할 경로 설정 (여기에 당신이 원하는 디렉토리와 파일 이름을 넣으세요) save_dir = '/content/drive/My Drive/' # Google Drive의 기본 디렉토리, 필요하면 하위 폴더 추가 가능 model_name = 'Eum_lstm_save.pth' # 모델 파일 이름 (예: 'my_model.pth'). 당신이 원하는 이름으로 변경하세요. # 전체 저장 경로 model_path = os.path.join(save_dir, model_name) # 모델 저장 torch.save(model.state_dict(), model_path) print(f"Model saved to {model_path}") # (선택) 저장된 파일을 로컬로 다운로드하려면 from google.colab import files files.download(model_path)