nids-deployment / app.py
Alaudeen's picture
Upload app.py with huggingface_hub
70fd4ab verified
"""Hugging Face Spaces app for NIDS deployment.
This app downloads pre-trained models from Hugging Face Hub and serves
a Gradio interface for real-time network intrusion detection.
"""
import os
import sys
import json
import numpy as np
import joblib
import gradio as gr
MODELS_REPO = "Alaudeen/nids-models"
MODELS_DIR = "outputs/models"
os.makedirs(MODELS_DIR, exist_ok=True)
# Download models from HF Hub if not present locally
models = {}
def download_models():
"""Download models from HF Hub if they don't exist locally."""
try:
from huggingface_hub import hf_hub_download
except ImportError:
print("huggingface_hub not installed, models must exist locally")
return
model_files = [
"XGBoost.joblib",
"RandomForest.joblib",
"IsolationForest_Unsupervised.joblib",
"MLP.pt",
"LSTM.pt",
"Transformer.pt",
"Autoencoder.pt"
]
for fname in model_files:
local_path = os.path.join(MODELS_DIR, fname)
if not os.path.exists(local_path):
try:
print(f"Downloading {fname} from {MODELS_REPO}...")
hf_hub_download(
repo_id=MODELS_REPO,
filename=fname,
repo_type="model",
local_dir=MODELS_DIR,
local_dir_use_symlinks=False
)
print(f" Downloaded: {fname}")
except Exception as e:
print(f" Failed to download {fname}: {e}")
def load_models():
"""Load all available models."""
global models
models.clear()
for fname in sorted(os.listdir(MODELS_DIR)):
path = os.path.join(MODELS_DIR, fname)
if fname.endswith(".joblib"):
name = fname.replace(".joblib", "")
try:
models[name] = joblib.load(path)
print(f"Loaded: {name}")
except Exception as e:
print(f"Failed to load {name}: {e}")
elif fname.endswith(".pt"):
name = fname.replace(".pt", "")
models[name] = path # Store path, load on-demand
print(f"Found: {name}")
# Download and load
print("Initializing NIDS Space...")
download_models()
load_models()
print(f"Models available: {list(models.keys())}")
# Sample flows
SAMPLE_NORMAL = [0, 1, 45, 0, 491, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1, 1, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 2, 2, 0.0, 0.0,
0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.17, 0.03]
SAMPLE_ANOMALY = [0, 1, 44, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1, 1, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 123, 6, 1.0, 1.0,
0.0, 0.0, 0.05, 0.07, 0.0, 0.0, 0.1, 0.05]
# Pad to 41
SAMPLE_NORMAL = SAMPLE_NORMAL[:41]
SAMPLE_ANOMALY = SAMPLE_ANOMALY[:41]
def detect_single(features_text: str, model_name: str) -> str:
"""Detect intrusion on a single flow."""
try:
features = [float(x.strip()) for x in features_text.split(",")]
except Exception as e:
return f"❌ Error parsing features: {e}"
if len(features) != 41:
return f"❌ Expected 41 features, got {len(features)}"
if model_name not in models:
return f"❌ Model '{model_name}' not available. Loaded: {list(models.keys())}"
# ML models (joblib)
if model_name in models and hasattr(models[model_name], 'predict'):
model = models[model_name]
X = np.array(features).reshape(1, -1)
pred = int(model.predict(X)[0])
proba = model.predict_proba(X)[0] if hasattr(model, "predict_proba") else [0.5, 0.5]
confidence = float(proba[pred])
else:
return f"⚠️ Model {model_name} not loaded (path only)"
if pred == 0:
level = "βœ… SAFE β€” Normal Traffic"
elif confidence > 0.9:
level = f"πŸ”΄ CRITICAL THREAT β€” Anomaly Detected (Confidence: {confidence:.1%})"
elif confidence > 0.75:
level = f"🟠 HIGH THREAT β€” Anomaly Detected (Confidence: {confidence:.1%})"
else:
level = f"🟑 MEDIUM THREAT β€” Suspicious Activity (Confidence: {confidence:.1%})"
return level
def detect_batch(batch_text: str, model_name: str) -> str:
"""Batch detection on multiple flows."""
lines = [l.strip() for l in batch_text.strip().split("\n") if l.strip()]
flows = []
for line in lines:
try:
vals = [float(x.strip()) for x in line.split(",")]
if len(vals) == 41:
flows.append(vals)
except:
continue
if not flows:
return "❌ No valid 41-feature flows found."
if model_name not in models or not hasattr(models[model_name], 'predict'):
return f"❌ Model '{model_name}' not available."
model = models[model_name]
X = np.array(flows)
preds = model.predict(X)
normals = int(sum(preds == 0))
anomalies = int(sum(preds == 1))
return (
f"**Batch Detection Results**\n\n"
f"- Total Flows: {len(flows)}\n"
f"- βœ… Normal: {normals} ({normals/len(flows)*100:.1f}%)\n"
f"- 🚨 Anomalies: {anomalies} ({anomalies/len(flows)*100:.1f}%)\n"
f"- Model: {model_name}"
)
def show_results() -> str:
"""Show model performance table."""
return """
## Model Performance (NSL-KDD Dataset)
| Model | Accuracy | Macro F1 | AUC-ROC | Type |
|-------|----------|----------|---------|------|
| **XGBoost** | 76.18% | 76.04% | 95.75% | Supervised |
| RandomForest | 73.10% | 73.05% | 95.34% | Supervised |
| MLP | 73.28% | 73.21% | 89.33% | Supervised |
| Autoencoder | 71.84% | 71.34% | 73.60% | Unsupervised |
| LSTM | 70.65% | 70.58% | 87.80% | Unsupervised |
| Transformer | 57.94% | 57.26% | 80.29% | Supervised |
| IsolationForest | 56.55% | 55.96% | 65.24% | Unsupervised |
**Key Insight:** XGBoost achieves the best performance (95.75% AUC-ROC) and runs at ~1ms latency per flow, making it ideal for real-time deployment.
"""
# Build Gradio interface
with gr.Blocks(title="πŸ›‘οΈ Network Intrusion Detection System") as demo:
gr.Markdown("""
# πŸ›‘οΈ Network Intrusion Detection System (NIDS)
Detect network intrusions in real-time using ML models trained on the **NSL-KDD** dataset.
Enter 41 comma-separated network flow features to classify as **Normal** or **Anomaly**.
**Models from:** [Alaudeen/nids-models](https://huggingface.co/Alaudeen/nids-models)
""")
with gr.Tab("πŸ” Single Flow Detection"):
with gr.Row():
with gr.Column(scale=2):
feature_input = gr.Textbox(
label="Flow Features (41 comma-separated values)",
value=",".join(map(str, SAMPLE_ANOMALY)),
lines=2,
placeholder="Enter 41 NSL-KDD features..."
)
model_choice = gr.Dropdown(
choices=list(models.keys()) if models else ["XGBoost", "RandomForest"],
value="XGBoost",
label="Detection Model",
info="XGBoost is recommended (best accuracy + speed)"
)
detect_btn = gr.Button("πŸ” Detect Intrusion", variant="primary", size="lg")
with gr.Column(scale=1):
result = gr.Textbox(
label="Detection Result",
lines=4,
interactive=False
)
gr.Markdown("""
**Alert Levels:**
- 🟒 **Safe** β€” Normal traffic
- 🟑 **Medium** β€” Suspicious activity
- 🟠 **High** β€” Likely intrusion
- πŸ”΄ **Critical** β€” Confirmed attack
""")
detect_btn.click(
detect_single,
inputs=[feature_input, model_choice],
outputs=result
)
with gr.Row():
gr.Button("πŸ“‹ Load Normal Sample").click(
lambda: ",".join(map(str, SAMPLE_NORMAL)),
outputs=feature_input
)
gr.Button("⚠️ Load Anomaly Sample").click(
lambda: ",".join(map(str, SAMPLE_ANOMALY)),
outputs=feature_input
)
with gr.Tab("πŸ“Š Batch Detection"):
with gr.Row():
with gr.Column(scale=2):
batch_input = gr.Textbox(
label="Batch Flows (one per line, 41 values each)",
value=",".join(map(str, SAMPLE_NORMAL)) + "\n" +
",".join(map(str, SAMPLE_ANOMALY)),
lines=8
)
batch_model = gr.Dropdown(
choices=list(models.keys()) if models else ["XGBoost", "RandomForest"],
value="XGBoost",
label="Model"
)
batch_btn = gr.Button("πŸ“Š Batch Detect", variant="primary")
with gr.Column(scale=1):
batch_result = gr.Markdown(label="Results")
batch_btn.click(detect_batch, inputs=[batch_input, batch_model], outputs=batch_result)
with gr.Tab("πŸ“ˆ Model Performance"):
gr.Markdown(show_results())
with gr.Tab("πŸ“– API Documentation"):
gr.Markdown("""
## REST API Usage
Deploy the FastAPI server locally:
```bash
pip install fastapi uvicorn
uvicorn api:app --host 0.0.0.0 --port 8000
```
### Endpoints
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/health` | GET | Health check |
| `/models` | GET | List available models |
| `/predict` | POST | Single flow detection |
| `/predict/batch` | POST | Batch detection |
| `/stats` | GET | Usage statistics |
| `/sample` | GET | Sample flows |
### Example Request
```bash
curl -X POST http://localhost:8000/predict \\
-H "Content-Type: application/json" \\
-d '{
"features": [0,1,45,0,491,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,0,1,0,0,0,2,2,0,0,0,0,1,0,0,0,0.17],
"model": "XGBoost"
}'
```
### Example Response
```json
{
"flow_id": "flow_1",
"prediction": 1,
"confidence": 0.9634,
"model": "XGBoost",
"latency_ms": 2.77,
"alert_level": "critical",
"timestamp": 1778206436.82
}
```
""")
gr.Markdown("""
---
**Project:** [github.com/Alaudeen/nids](https://huggingface.co/Alaudeen/nids-models) |
**Dataset:** [Mireu-Lab/NSL-KDD](https://huggingface.co/datasets/Mireu-Lab/NSL-KDD) |
**License:** MIT
""")
if __name__ == "__main__":
demo.launch()