""" Vehicle IDS Inference Script Usage: python inference.py --input """ import pickle import numpy as np import pandas as pd from collections import Counter def load_model(model_path='vehicle_ids_model.pkl'): with open(model_path, 'rb') as f: return pickle.load(f) def preprocess(df, model_artifacts): feature_cols = model_artifacts['feature_cols'] selected_indices = model_artifacts['selected_indices'] scaler = model_artifacts['scaler'] # Convert hex to decimal df['can_id_dec'] = df['can_id'].apply(lambda x: int(x, 16) if isinstance(x, str) else x) for i in range(8): df[f'd{i}_dec'] = df[f'd{i}'].apply(lambda x: int(x, 16) if isinstance(x, str) else x) data_cols = [f'd{i}_dec' for i in range(8)] df['data_mean'] = df[data_cols].mean(axis=1) df['data_std'] = df[data_cols].std(axis=1) df['data_min'] = df[data_cols].min(axis=1) df['data_max'] = df[data_cols].max(axis=1) df['data_range'] = df['data_max'] - df['data_min'] df['data_sum'] = df[data_cols].sum(axis=1) df['iat'] = df['timestamp'].diff().fillna(0).clip(0, 1.0) id_freq = df['can_id_dec'].value_counts(normalize=True) df['can_id_freq'] = df['can_id_dec'].map(id_freq) def byte_entropy(row): vals = [row[f'd{i}_dec'] for i in range(8)] counts = Counter(vals) total = len(vals) return -sum((c/total) * np.log2(c/total + 1e-10) for c in counts.values()) df['data_entropy'] = df.apply(byte_entropy, axis=1) X = df[feature_cols].values.astype(np.float32) X_scaled = scaler.transform(X) X_selected = X_scaled[:, selected_indices] return X_selected def predict(X, model_artifacts): base_learners = model_artifacts['base_learners'] meta_learner = model_artifacts['meta_learner'] iforest = model_artifacts['isolation_forest'] iforest_thresh = model_artifacts.get('iforest_threshold', 0.0) le = model_artifacts['label_encoder'] # Tier 1: Multi-class stacking meta_features = np.column_stack([ est.predict_proba(X) for _, est in base_learners.items() ]) tier1_preds = meta_learner.predict(meta_features) tier1_labels = le.inverse_transform(tier1_preds) # Tier 2: Anomaly detection (score-based with optimized threshold) tier2_scores = iforest.decision_function(X) tier2_anomaly = (tier2_scores < iforest_thresh) # Combined: flag unknown attacks results = pd.DataFrame({ 'attack_type': tier1_labels, 'anomaly_score': tier2_scores, 'is_anomaly': tier2_anomaly, 'alert': 'NORMAL' }) # Known attacks from Tier 1 results.loc[results['attack_type'] != 'Normal', 'alert'] = 'KNOWN_ATTACK' # Unknown attacks: Tier 1 says Normal but Tier 2 says anomaly results.loc[(results['attack_type'] == 'Normal') & results['is_anomaly'], 'alert'] = 'UNKNOWN_ATTACK' return results if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('--input', required=True, help='Path to CAN messages CSV') parser.add_argument('--model', default='vehicle_ids_model.pkl', help='Model path') args = parser.parse_args() model = load_model(args.model) df = pd.read_csv(args.input) X = preprocess(df, model) results = predict(X, model) print(results['alert'].value_counts()) results.to_csv('predictions.csv', index=False) print("Saved predictions to predictions.csv")