Spaces:
Runtime error
Runtime error
| """Hugging Face Spaces app for NIDS deployment. | |
| This app downloads pre-trained models from Hugging Face Hub and serves | |
| a Gradio interface for real-time network intrusion detection. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import numpy as np | |
| import joblib | |
| import gradio as gr | |
| MODELS_REPO = "Alaudeen/nids-models" | |
| MODELS_DIR = "outputs/models" | |
| os.makedirs(MODELS_DIR, exist_ok=True) | |
| # Download models from HF Hub if not present locally | |
| models = {} | |
| def download_models(): | |
| """Download models from HF Hub if they don't exist locally.""" | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| except ImportError: | |
| print("huggingface_hub not installed, models must exist locally") | |
| return | |
| model_files = [ | |
| "XGBoost.joblib", | |
| "RandomForest.joblib", | |
| "IsolationForest_Unsupervised.joblib", | |
| "MLP.pt", | |
| "LSTM.pt", | |
| "Transformer.pt", | |
| "Autoencoder.pt" | |
| ] | |
| for fname in model_files: | |
| local_path = os.path.join(MODELS_DIR, fname) | |
| if not os.path.exists(local_path): | |
| try: | |
| print(f"Downloading {fname} from {MODELS_REPO}...") | |
| hf_hub_download( | |
| repo_id=MODELS_REPO, | |
| filename=fname, | |
| repo_type="model", | |
| local_dir=MODELS_DIR, | |
| local_dir_use_symlinks=False | |
| ) | |
| print(f" Downloaded: {fname}") | |
| except Exception as e: | |
| print(f" Failed to download {fname}: {e}") | |
| def load_models(): | |
| """Load all available models.""" | |
| global models | |
| models.clear() | |
| for fname in sorted(os.listdir(MODELS_DIR)): | |
| path = os.path.join(MODELS_DIR, fname) | |
| if fname.endswith(".joblib"): | |
| name = fname.replace(".joblib", "") | |
| try: | |
| models[name] = joblib.load(path) | |
| print(f"Loaded: {name}") | |
| except Exception as e: | |
| print(f"Failed to load {name}: {e}") | |
| elif fname.endswith(".pt"): | |
| name = fname.replace(".pt", "") | |
| models[name] = path # Store path, load on-demand | |
| print(f"Found: {name}") | |
| # Download and load | |
| print("Initializing NIDS Space...") | |
| download_models() | |
| load_models() | |
| print(f"Models available: {list(models.keys())}") | |
| # Sample flows | |
| SAMPLE_NORMAL = [0, 1, 45, 0, 491, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, | |
| 1, 1, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 2, 2, 0.0, 0.0, | |
| 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.17, 0.03] | |
| SAMPLE_ANOMALY = [0, 1, 44, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, | |
| 1, 1, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 123, 6, 1.0, 1.0, | |
| 0.0, 0.0, 0.05, 0.07, 0.0, 0.0, 0.1, 0.05] | |
| # Pad to 41 | |
| SAMPLE_NORMAL = SAMPLE_NORMAL[:41] | |
| SAMPLE_ANOMALY = SAMPLE_ANOMALY[:41] | |
| def detect_single(features_text: str, model_name: str) -> str: | |
| """Detect intrusion on a single flow.""" | |
| try: | |
| features = [float(x.strip()) for x in features_text.split(",")] | |
| except Exception as e: | |
| return f"β Error parsing features: {e}" | |
| if len(features) != 41: | |
| return f"β Expected 41 features, got {len(features)}" | |
| if model_name not in models: | |
| return f"β Model '{model_name}' not available. Loaded: {list(models.keys())}" | |
| # ML models (joblib) | |
| if model_name in models and hasattr(models[model_name], 'predict'): | |
| model = models[model_name] | |
| X = np.array(features).reshape(1, -1) | |
| pred = int(model.predict(X)[0]) | |
| proba = model.predict_proba(X)[0] if hasattr(model, "predict_proba") else [0.5, 0.5] | |
| confidence = float(proba[pred]) | |
| else: | |
| return f"β οΈ Model {model_name} not loaded (path only)" | |
| if pred == 0: | |
| level = "β SAFE β Normal Traffic" | |
| elif confidence > 0.9: | |
| level = f"π΄ CRITICAL THREAT β Anomaly Detected (Confidence: {confidence:.1%})" | |
| elif confidence > 0.75: | |
| level = f"π HIGH THREAT β Anomaly Detected (Confidence: {confidence:.1%})" | |
| else: | |
| level = f"π‘ MEDIUM THREAT β Suspicious Activity (Confidence: {confidence:.1%})" | |
| return level | |
| def detect_batch(batch_text: str, model_name: str) -> str: | |
| """Batch detection on multiple flows.""" | |
| lines = [l.strip() for l in batch_text.strip().split("\n") if l.strip()] | |
| flows = [] | |
| for line in lines: | |
| try: | |
| vals = [float(x.strip()) for x in line.split(",")] | |
| if len(vals) == 41: | |
| flows.append(vals) | |
| except: | |
| continue | |
| if not flows: | |
| return "β No valid 41-feature flows found." | |
| if model_name not in models or not hasattr(models[model_name], 'predict'): | |
| return f"β Model '{model_name}' not available." | |
| model = models[model_name] | |
| X = np.array(flows) | |
| preds = model.predict(X) | |
| normals = int(sum(preds == 0)) | |
| anomalies = int(sum(preds == 1)) | |
| return ( | |
| f"**Batch Detection Results**\n\n" | |
| f"- Total Flows: {len(flows)}\n" | |
| f"- β Normal: {normals} ({normals/len(flows)*100:.1f}%)\n" | |
| f"- π¨ Anomalies: {anomalies} ({anomalies/len(flows)*100:.1f}%)\n" | |
| f"- Model: {model_name}" | |
| ) | |
| def show_results() -> str: | |
| """Show model performance table.""" | |
| return """ | |
| ## Model Performance (NSL-KDD Dataset) | |
| | Model | Accuracy | Macro F1 | AUC-ROC | Type | | |
| |-------|----------|----------|---------|------| | |
| | **XGBoost** | 76.18% | 76.04% | 95.75% | Supervised | | |
| | RandomForest | 73.10% | 73.05% | 95.34% | Supervised | | |
| | MLP | 73.28% | 73.21% | 89.33% | Supervised | | |
| | Autoencoder | 71.84% | 71.34% | 73.60% | Unsupervised | | |
| | LSTM | 70.65% | 70.58% | 87.80% | Unsupervised | | |
| | Transformer | 57.94% | 57.26% | 80.29% | Supervised | | |
| | IsolationForest | 56.55% | 55.96% | 65.24% | Unsupervised | | |
| **Key Insight:** XGBoost achieves the best performance (95.75% AUC-ROC) and runs at ~1ms latency per flow, making it ideal for real-time deployment. | |
| """ | |
| # Build Gradio interface | |
| with gr.Blocks(title="π‘οΈ Network Intrusion Detection System") as demo: | |
| gr.Markdown(""" | |
| # π‘οΈ Network Intrusion Detection System (NIDS) | |
| Detect network intrusions in real-time using ML models trained on the **NSL-KDD** dataset. | |
| Enter 41 comma-separated network flow features to classify as **Normal** or **Anomaly**. | |
| **Models from:** [Alaudeen/nids-models](https://huggingface.co/Alaudeen/nids-models) | |
| """) | |
| with gr.Tab("π Single Flow Detection"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| feature_input = gr.Textbox( | |
| label="Flow Features (41 comma-separated values)", | |
| value=",".join(map(str, SAMPLE_ANOMALY)), | |
| lines=2, | |
| placeholder="Enter 41 NSL-KDD features..." | |
| ) | |
| model_choice = gr.Dropdown( | |
| choices=list(models.keys()) if models else ["XGBoost", "RandomForest"], | |
| value="XGBoost", | |
| label="Detection Model", | |
| info="XGBoost is recommended (best accuracy + speed)" | |
| ) | |
| detect_btn = gr.Button("π Detect Intrusion", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| result = gr.Textbox( | |
| label="Detection Result", | |
| lines=4, | |
| interactive=False | |
| ) | |
| gr.Markdown(""" | |
| **Alert Levels:** | |
| - π’ **Safe** β Normal traffic | |
| - π‘ **Medium** β Suspicious activity | |
| - π **High** β Likely intrusion | |
| - π΄ **Critical** β Confirmed attack | |
| """) | |
| detect_btn.click( | |
| detect_single, | |
| inputs=[feature_input, model_choice], | |
| outputs=result | |
| ) | |
| with gr.Row(): | |
| gr.Button("π Load Normal Sample").click( | |
| lambda: ",".join(map(str, SAMPLE_NORMAL)), | |
| outputs=feature_input | |
| ) | |
| gr.Button("β οΈ Load Anomaly Sample").click( | |
| lambda: ",".join(map(str, SAMPLE_ANOMALY)), | |
| outputs=feature_input | |
| ) | |
| with gr.Tab("π Batch Detection"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| batch_input = gr.Textbox( | |
| label="Batch Flows (one per line, 41 values each)", | |
| value=",".join(map(str, SAMPLE_NORMAL)) + "\n" + | |
| ",".join(map(str, SAMPLE_ANOMALY)), | |
| lines=8 | |
| ) | |
| batch_model = gr.Dropdown( | |
| choices=list(models.keys()) if models else ["XGBoost", "RandomForest"], | |
| value="XGBoost", | |
| label="Model" | |
| ) | |
| batch_btn = gr.Button("π Batch Detect", variant="primary") | |
| with gr.Column(scale=1): | |
| batch_result = gr.Markdown(label="Results") | |
| batch_btn.click(detect_batch, inputs=[batch_input, batch_model], outputs=batch_result) | |
| with gr.Tab("π Model Performance"): | |
| gr.Markdown(show_results()) | |
| with gr.Tab("π API Documentation"): | |
| gr.Markdown(""" | |
| ## REST API Usage | |
| Deploy the FastAPI server locally: | |
| ```bash | |
| pip install fastapi uvicorn | |
| uvicorn api:app --host 0.0.0.0 --port 8000 | |
| ``` | |
| ### Endpoints | |
| | Endpoint | Method | Description | | |
| |----------|--------|-------------| | |
| | `/health` | GET | Health check | | |
| | `/models` | GET | List available models | | |
| | `/predict` | POST | Single flow detection | | |
| | `/predict/batch` | POST | Batch detection | | |
| | `/stats` | GET | Usage statistics | | |
| | `/sample` | GET | Sample flows | | |
| ### Example Request | |
| ```bash | |
| curl -X POST http://localhost:8000/predict \\ | |
| -H "Content-Type: application/json" \\ | |
| -d '{ | |
| "features": [0,1,45,0,491,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,0,1,0,0,0,2,2,0,0,0,0,1,0,0,0,0.17], | |
| "model": "XGBoost" | |
| }' | |
| ``` | |
| ### Example Response | |
| ```json | |
| { | |
| "flow_id": "flow_1", | |
| "prediction": 1, | |
| "confidence": 0.9634, | |
| "model": "XGBoost", | |
| "latency_ms": 2.77, | |
| "alert_level": "critical", | |
| "timestamp": 1778206436.82 | |
| } | |
| ``` | |
| """) | |
| gr.Markdown(""" | |
| --- | |
| **Project:** [github.com/Alaudeen/nids](https://huggingface.co/Alaudeen/nids-models) | | |
| **Dataset:** [Mireu-Lab/NSL-KDD](https://huggingface.co/datasets/Mireu-Lab/NSL-KDD) | | |
| **License:** MIT | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch() | |