| import os |
| import torch |
| import pandas as pd |
| import gradio as gr |
| import shutil |
| import zipfile |
| from pathlib import Path |
| from torch.utils.data import DataLoader, Dataset |
| from torch.optim import AdamW |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
|
| |
| |
| |
| LIST_LABEL = ['anger','anticipation','disgust','fear','joy','sadness','surprise','trust'] |
|
|
| |
| DIR_UPLOADED = Path("temp_models/uploaded_zip") |
| DIR_TRAINED = Path("temp_models/trained_cloud") |
|
|
| DIR_UPLOADED.mkdir(parents=True, exist_ok=True) |
| DIR_TRAINED.mkdir(parents=True, exist_ok=True) |
|
|
| |
| active_model_path = None |
|
|
| |
| |
| |
| class EmosiDataset(Dataset): |
| def __init__(self, df, tokenizer, max_len=128): |
| self.df = df |
| self.tokenizer = tokenizer |
| self.max_len = max_len |
| self.labels = df[LIST_LABEL].values |
| self.texts = df["text_clean"].astype(str).tolist() |
|
|
| def __len__(self): |
| return len(self.df) |
|
|
| def __getitem__(self, item): |
| text = self.texts[item] |
| inputs = self.tokenizer( |
| text, |
| truncation=True, |
| padding='max_length', |
| max_length=self.max_len, |
| return_tensors='pt' |
| ) |
| return { |
| 'input_ids': inputs['input_ids'].flatten(), |
| 'attention_mask': inputs['attention_mask'].flatten(), |
| 'labels': torch.tensor(self.labels[item], dtype=torch.float) |
| } |
|
|
| def clean_data(df): |
| for l in LIST_LABEL: |
| if l not in df.columns: df[l] = 0 |
| df[l] = df[l].astype(str).str.replace(',', '.', regex=False) |
| df[l] = pd.to_numeric(df[l], errors='coerce').fillna(0).astype(float) |
| |
| col_text = next((c for c in df.columns if c.lower() in ['text', 'kalimat', 'content', 'tweet']), None) |
| if col_text: |
| df["text_clean"] = df[col_text].astype(str).str.replace("\n", " ").str.strip() |
| elif "text" in df.columns: |
| df["text_clean"] = df["text"].astype(str).str.replace("\n", " ").str.strip() |
| return df |
|
|
| |
| |
| |
| def handle_zip_upload(file_obj): |
| global active_model_path |
| |
| if file_obj is None: return "β Tidak ada file.", None |
| try: |
| if DIR_UPLOADED.exists(): shutil.rmtree(DIR_UPLOADED) |
| DIR_UPLOADED.mkdir(parents=True, exist_ok=True) |
| |
| with zipfile.ZipFile(file_obj.name, 'r') as zip_ref: |
| zip_ref.extractall(DIR_UPLOADED) |
| |
| |
| config_path = list(DIR_UPLOADED.rglob("config.json")) |
| if not config_path: |
| return "β Error: Tidak ditemukan config.json dalam ZIP.", None |
| |
| final_model_path = config_path[0].parent |
| active_model_path = str(final_model_path) |
| |
| return f"β
Model ZIP Berhasil Dimuat!\nLokasi: {active_model_path}", "Status: Memakai Model Upload ZIP" |
| except Exception as e: |
| return f"β Error unzip: {str(e)}", None |
|
|
| |
| |
| |
| def train_model_cloud(file_obj, sep, epochs, batch_size, lr, progress=gr.Progress()): |
| global active_model_path |
| |
| yield "β³ Membaca dataset...", None |
| if file_obj is None: |
| yield "β File CSV belum diupload!", None |
| return |
|
|
| try: |
| df = pd.read_csv(file_obj.name, sep=sep) |
| df = clean_data(df) |
| if "text_clean" not in df.columns: |
| yield "β Kolom teks tidak ditemukan.", None |
| return |
|
|
| MODEL_NAME = "indobenchmark/indobert-base-p1" |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) |
| model = AutoModelForSequenceClassification.from_pretrained( |
| MODEL_NAME, num_labels=len(LIST_LABEL), problem_type="multi_label_classification" |
| ) |
| |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| model.to(device) |
| |
| dataset = EmosiDataset(df, tokenizer) |
| loader = DataLoader(dataset, batch_size=int(batch_size), shuffle=True) |
| optimizer = AdamW(model.parameters(), lr=float(lr)) |
| |
| log_text = f"π Mulai Training di {device}...\nData: {len(df)} baris.\n" |
| yield log_text, None |
| |
| model.train() |
| for ep in range(int(epochs)): |
| total_loss = 0 |
| steps = len(loader) |
| for i, batch in enumerate(loader): |
| optimizer.zero_grad() |
| input_ids = batch['input_ids'].to(device) |
| attention_mask = batch['attention_mask'].to(device) |
| labels = batch['labels'].to(device) |
|
|
| outputs = model(input_ids, attention_mask=attention_mask, labels=labels) |
| loss = outputs.loss |
| loss.backward() |
| optimizer.step() |
| |
| total_loss += loss.item() |
| if i % 5 == 0: |
| progress((ep * steps + i) / (int(epochs) * steps), desc=f"Ep {ep+1} Loss: {total_loss/(i+1):.4f}") |
| |
| avg_loss = total_loss / steps |
| log_text += f"β
Epoch {ep+1}/{epochs} | Loss: {avg_loss:.4f}\n" |
| yield log_text, None |
| |
| |
| yield log_text + "\nπΎ Menyimpan model...", None |
| if DIR_TRAINED.exists(): shutil.rmtree(DIR_TRAINED) |
| DIR_TRAINED.mkdir(parents=True, exist_ok=True) |
| |
| model.save_pretrained(DIR_TRAINED) |
| tokenizer.save_pretrained(DIR_TRAINED) |
| |
| active_model_path = str(DIR_TRAINED) |
| yield log_text + f"\nπ Selesai! Model training aktif.", "Status: Memakai Model Hasil Training" |
| |
| except Exception as e: |
| yield f"β Error: {str(e)}", None |
|
|
| |
| |
| |
| def load_model_inference(): |
| global active_model_path |
| |
| |
| if active_model_path and os.path.exists(active_model_path): |
| target_path = active_model_path |
| |
| |
| elif os.path.exists("model_default") and os.path.exists("model_default/config.json"): |
| target_path = "model_default" |
| |
| |
| else: |
| return AutoModelForSequenceClassification.from_pretrained("indobenchmark/indobert-base-p1", num_labels=8), \ |
| AutoTokenizer.from_pretrained("indobenchmark/indobert-base-p1") |
|
|
| try: |
| tokenizer = AutoTokenizer.from_pretrained(target_path) |
| model = AutoModelForSequenceClassification.from_pretrained(target_path) |
| model.eval() |
| return model, tokenizer |
| except: |
| return AutoModelForSequenceClassification.from_pretrained("indobenchmark/indobert-base-p1", num_labels=8), \ |
| AutoTokenizer.from_pretrained("indobenchmark/indobert-base-p1") |
|
|
| def predict_text(text): |
| if not text: return None |
| try: |
| model, tokenizer = load_model_inference() |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, padding="max_length", max_length=128) |
| with torch.no_grad(): |
| out = model(**inputs) |
| probs = torch.sigmoid(out.logits).numpy()[0] |
| return {LIST_LABEL[i]: float(probs[i]) for i in range(len(LIST_LABEL))} |
| except Exception as e: |
| return {"Error": str(e)} |
|
|
| def predict_csv(file_obj, sep): |
| try: |
| try: df = pd.read_csv(file_obj.name, sep=sep) |
| except: df = pd.read_csv(file_obj.name, sep=",") |
| df = clean_data(df) |
| |
| model, tokenizer = load_model_inference() |
| if "text_clean" not in df.columns: return {"Error": "Kolom teks tidak ditemukan"} |
| |
| results = [] |
| for txt in df["text_clean"]: |
| inputs = tokenizer(txt, return_tensors="pt", truncation=True, padding="max_length", max_length=128) |
| with torch.no_grad(): |
| out = model(**inputs) |
| probs = torch.sigmoid(out.logits).numpy()[0] |
| results.append({LIST_LABEL[i]: float(probs[i]) for i in range(len(LIST_LABEL))}) |
| |
| avg = {l: 0.0 for l in LIST_LABEL} |
| for r in results: |
| for l,v in r.items(): avg[l] += v |
| for l in avg: avg[l] /= len(results) |
| top3 = sorted(avg.items(), key=lambda x: x[1], reverse=True)[:3] |
| return {"Info": f"Total {len(results)} data", "Dominan": {k: round(v,4) for k,v in top3}, "Detail": avg} |
| except Exception as e: |
| return {"Error": str(e)} |
|
|
| |
| |
| |
| with gr.Blocks(title="IndoBERT Emotion Cloud") as app: |
| gr.Markdown("# βοΈ IndoBERT Emotion Classifier") |
| |
| |
| lbl_status = gr.Textbox(label="Status Model Aktif", value="Default (IndoBERT Base / Uploaded Manual)", interactive=False) |
|
|
| with gr.Tabs(): |
| |
| with gr.Tab("βοΈ Konfigurasi Model"): |
| with gr.Tabs(): |
| |
| |
| with gr.Tab("π Unggah Model"): |
| gr.Markdown("Upload file `.zip` berisi model yang sudah dilatih (dari Komputer).") |
| in_zip = gr.File(label="File ZIP Model") |
| btn_upload = gr.Button("Ekstrak & Pakai Model", variant="primary") |
| out_log_upload = gr.Textbox(label="Log Sistem") |
| |
| btn_upload.click(handle_zip_upload, inputs=in_zip, outputs=[out_log_upload, lbl_status]) |
| |
| |
| with gr.Tab("ποΈββοΈ Latih Model"): |
| gr.Markdown("Latih model baru menggunakan Dataset CSV sendiri di Cloud.") |
| with gr.Row(): |
| in_csv = gr.File(label="Dataset CSV") |
| in_sep = gr.Textbox(label="Separator", value=";") |
| with gr.Row(): |
| in_ep = gr.Number(label="Epoch", value=1, precision=0) |
| in_bs = gr.Number(label="Batch Size", value=4, precision=0) |
| in_lr = gr.Number(label="Learning Rate", value=2e-5) |
| btn_train = gr.Button("Mulai Training", variant="stop") |
| out_log_train = gr.Textbox(label="Log Training", lines=5) |
| |
| btn_train.click(train_model_cloud, inputs=[in_csv, in_sep, in_ep, in_bs, in_lr], outputs=[out_log_train, lbl_status]) |
|
|
| |
| with gr.Tab("π§ͺ Testing"): |
| gr.Markdown("Uji model yang sedang aktif.") |
| |
| with gr.Tabs(): |
| with gr.Tab("π Uji Satu Kalimat"): |
| in_txt = gr.Textbox(label="Masukkan Kalimat", lines=2, placeholder="Contoh: Saya sangat bahagia hari ini...") |
| btn_pred = gr.Button("Prediksi Emosi") |
| out_lbl = gr.Label(label="Hasil Prediksi") |
| btn_pred.click(predict_text, inputs=in_txt, outputs=out_lbl) |
| |
| with gr.Tab("π Uji Batch (CSV)"): |
| in_csv_test = gr.File(label="Upload CSV Test") |
| btn_batch = gr.Button("Analisis Batch") |
| out_json = gr.JSON(label="Hasil Analisis") |
| btn_batch.click(predict_csv, inputs=[in_csv_test, in_sep], outputs=out_json) |
|
|
| if __name__ == "__main__": |
| app.launch() |