| """ |
| Vehicle IDS Inference Script |
| Usage: python inference.py --input <can_messages.csv> |
| """ |
| import pickle |
| import numpy as np |
| import pandas as pd |
| from collections import Counter |
|
|
| def load_model(model_path='vehicle_ids_model.pkl'): |
| with open(model_path, 'rb') as f: |
| return pickle.load(f) |
|
|
| def preprocess(df, model_artifacts): |
| feature_cols = model_artifacts['feature_cols'] |
| selected_indices = model_artifacts['selected_indices'] |
| scaler = model_artifacts['scaler'] |
| |
| |
| df['can_id_dec'] = df['can_id'].apply(lambda x: int(x, 16) if isinstance(x, str) else x) |
| for i in range(8): |
| df[f'd{i}_dec'] = df[f'd{i}'].apply(lambda x: int(x, 16) if isinstance(x, str) else x) |
| |
| data_cols = [f'd{i}_dec' for i in range(8)] |
| df['data_mean'] = df[data_cols].mean(axis=1) |
| df['data_std'] = df[data_cols].std(axis=1) |
| df['data_min'] = df[data_cols].min(axis=1) |
| df['data_max'] = df[data_cols].max(axis=1) |
| df['data_range'] = df['data_max'] - df['data_min'] |
| df['data_sum'] = df[data_cols].sum(axis=1) |
| df['iat'] = df['timestamp'].diff().fillna(0).clip(0, 1.0) |
| |
| id_freq = df['can_id_dec'].value_counts(normalize=True) |
| df['can_id_freq'] = df['can_id_dec'].map(id_freq) |
| |
| def byte_entropy(row): |
| vals = [row[f'd{i}_dec'] for i in range(8)] |
| counts = Counter(vals) |
| total = len(vals) |
| return -sum((c/total) * np.log2(c/total + 1e-10) for c in counts.values()) |
| df['data_entropy'] = df.apply(byte_entropy, axis=1) |
| |
| X = df[feature_cols].values.astype(np.float32) |
| X_scaled = scaler.transform(X) |
| X_selected = X_scaled[:, selected_indices] |
| return X_selected |
|
|
| def predict(X, model_artifacts): |
| base_learners = model_artifacts['base_learners'] |
| meta_learner = model_artifacts['meta_learner'] |
| iforest = model_artifacts['isolation_forest'] |
| iforest_thresh = model_artifacts.get('iforest_threshold', 0.0) |
| le = model_artifacts['label_encoder'] |
| |
| |
| meta_features = np.column_stack([ |
| est.predict_proba(X) for _, est in base_learners.items() |
| ]) |
| tier1_preds = meta_learner.predict(meta_features) |
| tier1_labels = le.inverse_transform(tier1_preds) |
| |
| |
| tier2_scores = iforest.decision_function(X) |
| tier2_anomaly = (tier2_scores < iforest_thresh) |
| |
| |
| results = pd.DataFrame({ |
| 'attack_type': tier1_labels, |
| 'anomaly_score': tier2_scores, |
| 'is_anomaly': tier2_anomaly, |
| 'alert': 'NORMAL' |
| }) |
| |
| |
| results.loc[results['attack_type'] != 'Normal', 'alert'] = 'KNOWN_ATTACK' |
| |
| results.loc[(results['attack_type'] == 'Normal') & results['is_anomaly'], 'alert'] = 'UNKNOWN_ATTACK' |
| |
| return results |
|
|
| if __name__ == '__main__': |
| import argparse |
| parser = argparse.ArgumentParser() |
| parser.add_argument('--input', required=True, help='Path to CAN messages CSV') |
| parser.add_argument('--model', default='vehicle_ids_model.pkl', help='Model path') |
| args = parser.parse_args() |
| |
| model = load_model(args.model) |
| df = pd.read_csv(args.input) |
| X = preprocess(df, model) |
| results = predict(X, model) |
| |
| print(results['alert'].value_counts()) |
| results.to_csv('predictions.csv', index=False) |
| print("Saved predictions to predictions.csv") |
|
|