anddali's picture
Upload inference.py with huggingface_hub
96a1efd verified
"""
Vehicle IDS Inference Script
Usage: python inference.py --input <can_messages.csv>
"""
import pickle
import numpy as np
import pandas as pd
from collections import Counter
def load_model(model_path='vehicle_ids_model.pkl'):
with open(model_path, 'rb') as f:
return pickle.load(f)
def preprocess(df, model_artifacts):
feature_cols = model_artifacts['feature_cols']
selected_indices = model_artifacts['selected_indices']
scaler = model_artifacts['scaler']
# Convert hex to decimal
df['can_id_dec'] = df['can_id'].apply(lambda x: int(x, 16) if isinstance(x, str) else x)
for i in range(8):
df[f'd{i}_dec'] = df[f'd{i}'].apply(lambda x: int(x, 16) if isinstance(x, str) else x)
data_cols = [f'd{i}_dec' for i in range(8)]
df['data_mean'] = df[data_cols].mean(axis=1)
df['data_std'] = df[data_cols].std(axis=1)
df['data_min'] = df[data_cols].min(axis=1)
df['data_max'] = df[data_cols].max(axis=1)
df['data_range'] = df['data_max'] - df['data_min']
df['data_sum'] = df[data_cols].sum(axis=1)
df['iat'] = df['timestamp'].diff().fillna(0).clip(0, 1.0)
id_freq = df['can_id_dec'].value_counts(normalize=True)
df['can_id_freq'] = df['can_id_dec'].map(id_freq)
def byte_entropy(row):
vals = [row[f'd{i}_dec'] for i in range(8)]
counts = Counter(vals)
total = len(vals)
return -sum((c/total) * np.log2(c/total + 1e-10) for c in counts.values())
df['data_entropy'] = df.apply(byte_entropy, axis=1)
X = df[feature_cols].values.astype(np.float32)
X_scaled = scaler.transform(X)
X_selected = X_scaled[:, selected_indices]
return X_selected
def predict(X, model_artifacts):
base_learners = model_artifacts['base_learners']
meta_learner = model_artifacts['meta_learner']
iforest = model_artifacts['isolation_forest']
iforest_thresh = model_artifacts.get('iforest_threshold', 0.0)
le = model_artifacts['label_encoder']
# Tier 1: Multi-class stacking
meta_features = np.column_stack([
est.predict_proba(X) for _, est in base_learners.items()
])
tier1_preds = meta_learner.predict(meta_features)
tier1_labels = le.inverse_transform(tier1_preds)
# Tier 2: Anomaly detection (score-based with optimized threshold)
tier2_scores = iforest.decision_function(X)
tier2_anomaly = (tier2_scores < iforest_thresh)
# Combined: flag unknown attacks
results = pd.DataFrame({
'attack_type': tier1_labels,
'anomaly_score': tier2_scores,
'is_anomaly': tier2_anomaly,
'alert': 'NORMAL'
})
# Known attacks from Tier 1
results.loc[results['attack_type'] != 'Normal', 'alert'] = 'KNOWN_ATTACK'
# Unknown attacks: Tier 1 says Normal but Tier 2 says anomaly
results.loc[(results['attack_type'] == 'Normal') & results['is_anomaly'], 'alert'] = 'UNKNOWN_ATTACK'
return results
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--input', required=True, help='Path to CAN messages CSV')
parser.add_argument('--model', default='vehicle_ids_model.pkl', help='Model path')
args = parser.parse_args()
model = load_model(args.model)
df = pd.read_csv(args.input)
X = preprocess(df, model)
results = predict(X, model)
print(results['alert'].value_counts())
results.to_csv('predictions.csv', index=False)
print("Saved predictions to predictions.csv")