"""Hugging Face Spaces app for NIDS deployment. This app downloads pre-trained models from Hugging Face Hub and serves a Gradio interface for real-time network intrusion detection. """ import os import sys import json import numpy as np import joblib import gradio as gr MODELS_REPO = "Alaudeen/nids-models" MODELS_DIR = "outputs/models" os.makedirs(MODELS_DIR, exist_ok=True) # Download models from HF Hub if not present locally models = {} def download_models(): """Download models from HF Hub if they don't exist locally.""" try: from huggingface_hub import hf_hub_download except ImportError: print("huggingface_hub not installed, models must exist locally") return model_files = [ "XGBoost.joblib", "RandomForest.joblib", "IsolationForest_Unsupervised.joblib", "MLP.pt", "LSTM.pt", "Transformer.pt", "Autoencoder.pt" ] for fname in model_files: local_path = os.path.join(MODELS_DIR, fname) if not os.path.exists(local_path): try: print(f"Downloading {fname} from {MODELS_REPO}...") hf_hub_download( repo_id=MODELS_REPO, filename=fname, repo_type="model", local_dir=MODELS_DIR, local_dir_use_symlinks=False ) print(f" Downloaded: {fname}") except Exception as e: print(f" Failed to download {fname}: {e}") def load_models(): """Load all available models.""" global models models.clear() for fname in sorted(os.listdir(MODELS_DIR)): path = os.path.join(MODELS_DIR, fname) if fname.endswith(".joblib"): name = fname.replace(".joblib", "") try: models[name] = joblib.load(path) print(f"Loaded: {name}") except Exception as e: print(f"Failed to load {name}: {e}") elif fname.endswith(".pt"): name = fname.replace(".pt", "") models[name] = path # Store path, load on-demand print(f"Found: {name}") # Download and load print("Initializing NIDS Space...") download_models() load_models() print(f"Models available: {list(models.keys())}") # Sample flows SAMPLE_NORMAL = [0, 1, 45, 0, 491, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 2, 2, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.17, 0.03] SAMPLE_ANOMALY = [0, 1, 44, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 123, 6, 1.0, 1.0, 0.0, 0.0, 0.05, 0.07, 0.0, 0.0, 0.1, 0.05] # Pad to 41 SAMPLE_NORMAL = SAMPLE_NORMAL[:41] SAMPLE_ANOMALY = SAMPLE_ANOMALY[:41] def detect_single(features_text: str, model_name: str) -> str: """Detect intrusion on a single flow.""" try: features = [float(x.strip()) for x in features_text.split(",")] except Exception as e: return f"❌ Error parsing features: {e}" if len(features) != 41: return f"❌ Expected 41 features, got {len(features)}" if model_name not in models: return f"❌ Model '{model_name}' not available. Loaded: {list(models.keys())}" # ML models (joblib) if model_name in models and hasattr(models[model_name], 'predict'): model = models[model_name] X = np.array(features).reshape(1, -1) pred = int(model.predict(X)[0]) proba = model.predict_proba(X)[0] if hasattr(model, "predict_proba") else [0.5, 0.5] confidence = float(proba[pred]) else: return f"⚠️ Model {model_name} not loaded (path only)" if pred == 0: level = "✅ SAFE — Normal Traffic" elif confidence > 0.9: level = f"🔴 CRITICAL THREAT — Anomaly Detected (Confidence: {confidence:.1%})" elif confidence > 0.75: level = f"🟠 HIGH THREAT — Anomaly Detected (Confidence: {confidence:.1%})" else: level = f"🟡 MEDIUM THREAT — Suspicious Activity (Confidence: {confidence:.1%})" return level def detect_batch(batch_text: str, model_name: str) -> str: """Batch detection on multiple flows.""" lines = [l.strip() for l in batch_text.strip().split("\n") if l.strip()] flows = [] for line in lines: try: vals = [float(x.strip()) for x in line.split(",")] if len(vals) == 41: flows.append(vals) except: continue if not flows: return "❌ No valid 41-feature flows found." if model_name not in models or not hasattr(models[model_name], 'predict'): return f"❌ Model '{model_name}' not available." model = models[model_name] X = np.array(flows) preds = model.predict(X) normals = int(sum(preds == 0)) anomalies = int(sum(preds == 1)) return ( f"**Batch Detection Results**\n\n" f"- Total Flows: {len(flows)}\n" f"- ✅ Normal: {normals} ({normals/len(flows)*100:.1f}%)\n" f"- 🚨 Anomalies: {anomalies} ({anomalies/len(flows)*100:.1f}%)\n" f"- Model: {model_name}" ) def show_results() -> str: """Show model performance table.""" return """ ## Model Performance (NSL-KDD Dataset) | Model | Accuracy | Macro F1 | AUC-ROC | Type | |-------|----------|----------|---------|------| | **XGBoost** | 76.18% | 76.04% | 95.75% | Supervised | | RandomForest | 73.10% | 73.05% | 95.34% | Supervised | | MLP | 73.28% | 73.21% | 89.33% | Supervised | | Autoencoder | 71.84% | 71.34% | 73.60% | Unsupervised | | LSTM | 70.65% | 70.58% | 87.80% | Unsupervised | | Transformer | 57.94% | 57.26% | 80.29% | Supervised | | IsolationForest | 56.55% | 55.96% | 65.24% | Unsupervised | **Key Insight:** XGBoost achieves the best performance (95.75% AUC-ROC) and runs at ~1ms latency per flow, making it ideal for real-time deployment. """ # Build Gradio interface with gr.Blocks(title="🛡️ Network Intrusion Detection System") as demo: gr.Markdown(""" # 🛡️ Network Intrusion Detection System (NIDS) Detect network intrusions in real-time using ML models trained on the **NSL-KDD** dataset. Enter 41 comma-separated network flow features to classify as **Normal** or **Anomaly**. **Models from:** [Alaudeen/nids-models](https://huggingface.co/Alaudeen/nids-models) """) with gr.Tab("🔍 Single Flow Detection"): with gr.Row(): with gr.Column(scale=2): feature_input = gr.Textbox( label="Flow Features (41 comma-separated values)", value=",".join(map(str, SAMPLE_ANOMALY)), lines=2, placeholder="Enter 41 NSL-KDD features..." ) model_choice = gr.Dropdown( choices=list(models.keys()) if models else ["XGBoost", "RandomForest"], value="XGBoost", label="Detection Model", info="XGBoost is recommended (best accuracy + speed)" ) detect_btn = gr.Button("🔍 Detect Intrusion", variant="primary", size="lg") with gr.Column(scale=1): result = gr.Textbox( label="Detection Result", lines=4, interactive=False ) gr.Markdown(""" **Alert Levels:** - 🟢 **Safe** — Normal traffic - 🟡 **Medium** — Suspicious activity - 🟠 **High** — Likely intrusion - 🔴 **Critical** — Confirmed attack """) detect_btn.click( detect_single, inputs=[feature_input, model_choice], outputs=result ) with gr.Row(): gr.Button("📋 Load Normal Sample").click( lambda: ",".join(map(str, SAMPLE_NORMAL)), outputs=feature_input ) gr.Button("⚠️ Load Anomaly Sample").click( lambda: ",".join(map(str, SAMPLE_ANOMALY)), outputs=feature_input ) with gr.Tab("📊 Batch Detection"): with gr.Row(): with gr.Column(scale=2): batch_input = gr.Textbox( label="Batch Flows (one per line, 41 values each)", value=",".join(map(str, SAMPLE_NORMAL)) + "\n" + ",".join(map(str, SAMPLE_ANOMALY)), lines=8 ) batch_model = gr.Dropdown( choices=list(models.keys()) if models else ["XGBoost", "RandomForest"], value="XGBoost", label="Model" ) batch_btn = gr.Button("📊 Batch Detect", variant="primary") with gr.Column(scale=1): batch_result = gr.Markdown(label="Results") batch_btn.click(detect_batch, inputs=[batch_input, batch_model], outputs=batch_result) with gr.Tab("📈 Model Performance"): gr.Markdown(show_results()) with gr.Tab("📖 API Documentation"): gr.Markdown(""" ## REST API Usage Deploy the FastAPI server locally: ```bash pip install fastapi uvicorn uvicorn api:app --host 0.0.0.0 --port 8000 ``` ### Endpoints | Endpoint | Method | Description | |----------|--------|-------------| | `/health` | GET | Health check | | `/models` | GET | List available models | | `/predict` | POST | Single flow detection | | `/predict/batch` | POST | Batch detection | | `/stats` | GET | Usage statistics | | `/sample` | GET | Sample flows | ### Example Request ```bash curl -X POST http://localhost:8000/predict \\ -H "Content-Type: application/json" \\ -d '{ "features": [0,1,45,0,491,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,0,1,0,0,0,2,2,0,0,0,0,1,0,0,0,0.17], "model": "XGBoost" }' ``` ### Example Response ```json { "flow_id": "flow_1", "prediction": 1, "confidence": 0.9634, "model": "XGBoost", "latency_ms": 2.77, "alert_level": "critical", "timestamp": 1778206436.82 } ``` """) gr.Markdown(""" --- **Project:** [github.com/Alaudeen/nids](https://huggingface.co/Alaudeen/nids-models) | **Dataset:** [Mireu-Lab/NSL-KDD](https://huggingface.co/datasets/Mireu-Lab/NSL-KDD) | **License:** MIT """) if __name__ == "__main__": demo.launch()