""" Feature importance and leave-one-feature-out ablation for the 10 face_orientation features. Run: python -m evaluation.feature_importance Outputs: - XGBoost gain-based importance (from trained checkpoint) - Leave-one-feature-out LOPO F1 (ablation): drop each feature in turn, report mean LOPO F1. - Writes evaluation/feature_selection_justification.md """ import os import sys import argparse import numpy as np from sklearn.preprocessing import StandardScaler from sklearn.metrics import f1_score from xgboost import XGBClassifier _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) from data_preparation.prepare_dataset import get_default_split_config, load_per_person, SELECTED_FEATURES from models.xgboost.config import XGB_BASE_PARAMS, build_xgb_classifier, get_xgb_params _, SEED = get_default_split_config() FEATURES = SELECTED_FEATURES["face_orientation"] def _resolve_xgb_path(): return os.path.join(_PROJECT_ROOT, "checkpoints", "xgboost_face_orientation_best.json") def xgb_feature_importance(): """Load trained XGBoost and return gain-based importance for the 10 features.""" path = _resolve_xgb_path() if not os.path.isfile(path): print(f"[WARN] No XGBoost checkpoint at {path}; skip importance.") return None model = XGBClassifier() model.load_model(path) imp = model.get_booster().get_score(importance_type="gain") # Booster uses f0, f1, ...; we use same order as FEATURES (training order) by_idx = {int(k.replace("f", "")): v for k, v in imp.items() if k.startswith("f")} order = [by_idx.get(i, 0.0) for i in range(len(FEATURES))] return dict(zip(FEATURES, order)) def _make_eval_model(seed: int, quick: bool): if not quick: return build_xgb_classifier(seed, verbosity=0) params = get_xgb_params() params["n_estimators"] = 200 params["random_state"] = seed params["verbosity"] = 0 return XGBClassifier(**params) def run_ablation_lopo(by_person, persons, quick: bool): """Leave-one-feature-out: for each feature, train XGBoost on the other 9 with LOPO, report mean F1.""" results = {} for drop_feat in FEATURES: print(f" -> dropping {drop_feat} ({len(results)+1}/{len(FEATURES)})") idx_keep = [i for i, f in enumerate(FEATURES) if f != drop_feat] f1s = [] for held_out in persons: train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) X_test, y_test = by_person[held_out] X_tr = train_X[:, idx_keep] X_te = X_test[:, idx_keep] scaler = StandardScaler().fit(X_tr) X_tr_sc = scaler.transform(X_tr) X_te_sc = scaler.transform(X_te) xgb = _make_eval_model(SEED, quick) xgb.fit(X_tr_sc, train_y) pred = xgb.predict(X_te_sc) f1s.append(f1_score(y_test, pred, average="weighted")) results[drop_feat] = np.mean(f1s) return results def run_baseline_lopo_f1(by_person, persons, quick: bool): """Full 10-feature LOPO mean F1 for reference.""" f1s = [] for held_out in persons: train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) X_test, y_test = by_person[held_out] scaler = StandardScaler().fit(train_X) X_tr_sc = scaler.transform(train_X) X_te_sc = scaler.transform(X_test) xgb = _make_eval_model(SEED, quick) xgb.fit(X_tr_sc, train_y) pred = xgb.predict(X_te_sc) f1s.append(f1_score(y_test, pred, average="weighted")) return np.mean(f1s) # Channel subsets for ablation (subset name -> list of feature names) CHANNEL_SUBSETS = { "head_pose": ["head_deviation", "s_face", "pitch"], "eye_state": ["ear_left", "ear_avg", "ear_right", "perclos"], "gaze": ["h_gaze", "gaze_offset", "s_eye"], } def run_channel_ablation(by_person, persons, quick: bool, baseline: float): """LOPO XGBoost with head-only, eye-only, gaze-only, and all 10. Returns dict subset_name -> mean F1.""" results = {} for subset_name, feat_list in CHANNEL_SUBSETS.items(): print(f" -> channel {subset_name}") idx_keep = [FEATURES.index(f) for f in feat_list] f1s = [] for held_out in persons: train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) X_test, y_test = by_person[held_out] X_tr = train_X[:, idx_keep] X_te = X_test[:, idx_keep] scaler = StandardScaler().fit(X_tr) X_tr_sc = scaler.transform(X_tr) X_te_sc = scaler.transform(X_te) xgb = _make_eval_model(SEED, quick) xgb.fit(X_tr_sc, train_y) pred = xgb.predict(X_te_sc) f1s.append(f1_score(y_test, pred, average="weighted")) results[subset_name] = np.mean(f1s) results["all_10"] = baseline return results def _parse_args(): parser = argparse.ArgumentParser(description="Feature importance + LOPO ablation") parser.add_argument( "--quick", action="store_true", help="Use fewer trees (200) for faster iteration.", ) parser.add_argument( "--skip-lofo", action="store_true", help="Skip leave-one-feature-out ablation.", ) parser.add_argument( "--skip-channel", action="store_true", help="Skip channel ablation.", ) return parser.parse_args() def main(): args = _parse_args() print("=== Feature importance (XGBoost gain) ===") if args.quick: print("Running in quick mode (n_estimators=200).") imp = xgb_feature_importance() if imp: for name in FEATURES: print(f" {name}: {imp.get(name, 0):.2f}") order = sorted(imp.items(), key=lambda x: -x[1]) print(" Top-5 by gain:", [x[0] for x in order[:5]]) print("\n[DATA] Loading per-person splits once...") by_person, _, _ = load_per_person("face_orientation") persons = sorted(by_person.keys()) print("\n=== Baseline LOPO (all 10 features) ===") baseline = run_baseline_lopo_f1(by_person, persons, quick=args.quick) print(f" Baseline (all 10 features) mean LOPO F1: {baseline:.4f}") ablation = None worst_drop = None if args.skip_lofo: print("\n=== Leave-one-feature-out ablation (LOPO mean F1) ===") print(" skipped (--skip-lofo)") else: print("\n=== Leave-one-feature-out ablation (LOPO mean F1) ===") ablation = run_ablation_lopo(by_person, persons, quick=args.quick) for feat in FEATURES: delta = baseline - ablation[feat] print(f" drop {feat}: F1={ablation[feat]:.4f} (Δ={delta:+.4f})") worst_drop = min(ablation.items(), key=lambda x: x[1]) print(f" Largest F1 drop when dropping: {worst_drop[0]} (F1={worst_drop[1]:.4f})") channel_f1 = None if args.skip_channel: print("\n=== Channel ablation (LOPO mean F1) ===") print(" skipped (--skip-channel)") else: print("\n=== Channel ablation (LOPO mean F1) ===") channel_f1 = run_channel_ablation(by_person, persons, quick=args.quick, baseline=baseline) for name, f1 in channel_f1.items(): print(f" {name}: {f1:.4f}") out_dir = os.path.join(_PROJECT_ROOT, "evaluation") out_path = os.path.join(out_dir, "feature_selection_justification.md") lines = [ "# Feature selection justification", "", "The face_orientation model uses 10 of 17 extracted features. This document summarises empirical support.", "", "## 1. Domain rationale", "", "The 10 features were chosen to cover three channels:", "- **Head pose:** head_deviation, s_face, pitch", "- **Eye state:** ear_left, ear_right, ear_avg, perclos", "- **Gaze:** h_gaze, gaze_offset, s_eye", "", "Excluded: v_gaze (noisy), mar (rare events), yaw/roll (redundant with head_deviation/s_face), blink_rate/closure_duration/yawn_duration (temporal overlap with perclos).", "", "## 2. XGBoost feature importance (gain)", "", f"Config used: `{XGB_BASE_PARAMS}`.", "Quick mode: " + ("yes (200 trees)" if args.quick else "no (full config)"), "", "From the trained XGBoost checkpoint (gain on the 10 features):", "", "| Feature | Gain |", "|---------|------|", ] if imp: for name in FEATURES: lines.append(f"| {name} | {imp.get(name, 0):.2f} |") order = sorted(imp.items(), key=lambda x: -x[1]) lines.append("") lines.append(f"**Top 5 by gain:** {', '.join(x[0] for x in order[:5])}.") else: lines.append("(Run with XGBoost checkpoint to populate.)") lines.extend([ "", "## 3. Leave-one-feature-out ablation (LOPO)", "", f"Baseline (all 10 features) mean LOPO F1: **{baseline:.4f}**.", "", ]) if ablation is None: lines.append("Skipped in this run (`--skip-lofo`).") else: lines.extend([ "| Feature dropped | Mean LOPO F1 | Δ vs baseline |", "|------------------|--------------|---------------|", ]) for feat in FEATURES: delta = baseline - ablation[feat] lines.append(f"| {feat} | {ablation[feat]:.4f} | {delta:+.4f} |") lines.append("") lines.append(f"Dropping **{worst_drop[0]}** hurts most (F1={worst_drop[1]:.4f}), consistent with it being important.") lines.append("") lines.append("## 4. Channel ablation (LOPO)") lines.append("") if channel_f1 is None: lines.append("Skipped in this run (`--skip-channel`).") else: lines.append("| Subset | Mean LOPO F1 |") lines.append("|--------|--------------|") for name in ["head_pose", "eye_state", "gaze", "all_10"]: lines.append(f"| {name} | {channel_f1[name]:.4f} |") lines.append("") lines.append("## 5. Conclusion") lines.append("") if ablation is None: lines.append("Selection is supported by (1) domain rationale (three attention channels), (2) XGBoost gain importance, and (3) channel ablation. Run without `--skip-lofo` for full leave-one-out ablation.") else: lines.append("Selection is supported by (1) domain rationale (three attention channels), (2) XGBoost gain importance, and (3) leave-one-out ablation. SHAP or correlation-based pruning can be added in future work.") lines.append("") with open(out_path, "w", encoding="utf-8") as f: f.write("\n".join(lines)) print(f"\nReport written to {out_path}") if __name__ == "__main__": main()