""" Single source for pooled train/val/test data and splits. - Data: load_all_pooled() / load_per_person() from data/collected_*/*.npz (same pattern everywhere). - Splits: get_numpy_splits() / get_dataloaders() use stratified train/val/test with a fixed seed from config. - Test is held out before any preprocessing; StandardScaler is fit on train only, then applied to val and test. """ import os import glob import numpy as np from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split torch = None Dataset = object # type: ignore DataLoader = None # Defaults for stratified split (overridden by config when available) _DEFAULT_SPLIT_RATIOS = (0.7, 0.15, 0.15) _DEFAULT_SPLIT_SEED = 42 def _require_torch(): global torch, Dataset, DataLoader if torch is None: try: import torch as _torch from torch.utils.data import Dataset as _Dataset, DataLoader as _DataLoader except ImportError as exc: # pragma: no cover raise ImportError("PyTorch not installed") from exc torch = _torch Dataset = _Dataset # type: ignore DataLoader = _DataLoader # type: ignore return torch, Dataset, DataLoader DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data") SELECTED_FEATURES = { "face_orientation": [ 'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch', 'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos' ], "eye_behaviour": [ 'ear_left', 'ear_right', 'ear_avg', 'mar', 'blink_rate', 'closure_duration', 'perclos', 'yawn_duration' ] } class FeatureVectorDataset(Dataset): def __init__(self, features: np.ndarray, labels: np.ndarray): torch_mod, _, _ = _require_torch() self.features = torch_mod.tensor(features, dtype=torch_mod.float32) self.labels = torch_mod.tensor(labels, dtype=torch_mod.long) def __len__(self): return len(self.labels) def __getitem__(self, idx): return self.features[idx], self.labels[idx] # ── Low-level helpers ──────────────────────────────────────────────────── def _clean_npz(raw, names): """Apply clipping rules in-place. Shared by all loaders.""" for col, lo, hi in [('yaw', -45, 45), ('pitch', -30, 30), ('roll', -30, 30)]: if col in names: raw[:, names.index(col)] = np.clip(raw[:, names.index(col)], lo, hi) for feat in ['ear_left', 'ear_right', 'ear_avg']: if feat in names: raw[:, names.index(feat)] = np.clip(raw[:, names.index(feat)], 0, 0.85) return raw def _load_one_npz(npz_path, target_features): """Load a single .npz file, clean and select features. Returns (X, y, selected_feature_names).""" data = np.load(npz_path, allow_pickle=True) raw = data['features'].astype(np.float32) labels = data['labels'].astype(np.int64) names = list(data['feature_names']) raw = _clean_npz(raw, names) selected = [f for f in target_features if f in names] idx = [names.index(f) for f in selected] return raw[:, idx], labels, selected # ── Public data loaders ────────────────────────────────────────────────── def load_all_pooled(model_name: str = "face_orientation", data_dir: str = None): """Load all collected_*/*.npz, clean, select features, concatenate. Returns (X_all, y_all, all_feature_names). """ data_dir = data_dir or DATA_DIR target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"]) pattern = os.path.join(data_dir, "collected_*", "*.npz") npz_files = sorted(glob.glob(pattern)) if not npz_files: raise FileNotFoundError( f"No .npz files matching {pattern}. " "Collect data first with `python -m models.collect_features --name `." ) all_X, all_y = [], [] all_names = None for npz_path in npz_files: X, y, names = _load_one_npz(npz_path, target_features) if all_names is None: all_names = names all_X.append(X) all_y.append(y) print(f"[DATA] + {os.path.basename(npz_path)}: {X.shape[0]} samples") X_all = np.concatenate(all_X, axis=0) y_all = np.concatenate(all_y, axis=0) print(f"[DATA] Loaded {len(npz_files)} file(s) for '{model_name}': " f"{X_all.shape[0]} total samples, {X_all.shape[1]} features") return X_all, y_all, all_names def load_per_person(model_name: str = "face_orientation", data_dir: str = None): """Load collected_*/*.npz grouped by person (folder name). Returns dict { person_name: (X, y) } where X/y are per-person numpy arrays. Also returns (X_all, y_all) as pooled data. """ data_dir = data_dir or DATA_DIR target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"]) pattern = os.path.join(data_dir, "collected_*", "*.npz") npz_files = sorted(glob.glob(pattern)) if not npz_files: raise FileNotFoundError(f"No .npz files matching {pattern}") by_person = {} all_X, all_y = [], [] for npz_path in npz_files: folder = os.path.basename(os.path.dirname(npz_path)) person = folder.replace("collected_", "", 1) X, y, _ = _load_one_npz(npz_path, target_features) all_X.append(X) all_y.append(y) if person not in by_person: by_person[person] = [] by_person[person].append((X, y)) print(f"[DATA] + {person}/{os.path.basename(npz_path)}: {X.shape[0]} samples") for person, chunks in by_person.items(): by_person[person] = ( np.concatenate([c[0] for c in chunks], axis=0), np.concatenate([c[1] for c in chunks], axis=0), ) X_all = np.concatenate(all_X, axis=0) y_all = np.concatenate(all_y, axis=0) print(f"[DATA] {len(by_person)} persons, {X_all.shape[0]} total samples, {X_all.shape[1]} features") return by_person, X_all, y_all def load_raw_npz(npz_path): """Load a single .npz without cleaning or feature selection. For exploration notebooks.""" data = np.load(npz_path, allow_pickle=True) features = data['features'].astype(np.float32) labels = data['labels'].astype(np.int64) names = list(data['feature_names']) return features, labels, names # ── Legacy helpers (used by models/mlp/train.py and models/xgboost/train.py) ─ def _load_real_data(model_name: str): X, y, _ = load_all_pooled(model_name) return X, y def _generate_synthetic_data(model_name: str): target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"]) n = 500 d = len(target_features) c = 2 rng = np.random.RandomState(42) features = rng.randn(n, d).astype(np.float32) labels = rng.randint(0, c, size=n).astype(np.int64) print(f"[DATA] Using synthetic data for '{model_name}': {n} samples, {d} features, {c} classes") return features, labels def get_default_split_config(): """Return (split_ratios, seed) from config so all scripts use the same split. Reproducible and consistent.""" try: from config import get data = get("data") or {} ratios = data.get("split_ratios", list(_DEFAULT_SPLIT_RATIOS)) seed = get("mlp.seed") or _DEFAULT_SPLIT_SEED return (tuple(ratios), int(seed)) except Exception: return (_DEFAULT_SPLIT_RATIOS, _DEFAULT_SPLIT_SEED) def _split_and_scale(features, labels, split_ratios, seed, scale): """Stratified train/val/test split. Test is held out first; val is split from the rest. No training data is used for validation or test. Scaler is fit on train only, then applied to val and test (no leakage from val/test into scaling). """ test_ratio = split_ratios[2] val_ratio = split_ratios[1] / (split_ratios[0] + split_ratios[1]) X_train_val, X_test, y_train_val, y_test = train_test_split( features, labels, test_size=test_ratio, random_state=seed, stratify=labels, ) X_train, X_val, y_train, y_val = train_test_split( X_train_val, y_train_val, test_size=val_ratio, random_state=seed, stratify=y_train_val, ) scaler = None if scale: scaler = StandardScaler() X_train = scaler.fit_transform(X_train) X_val = scaler.transform(X_val) X_test = scaler.transform(X_test) print("[DATA] Applied StandardScaler (fitted on training split only)") splits = { "X_train": X_train, "y_train": y_train, "X_val": X_val, "y_val": y_val, "X_test": X_test, "y_test": y_test, } print(f"[DATA] Split (stratified): train={len(y_train)}, val={len(y_val)}, test={len(y_test)}") return splits, scaler def get_numpy_splits(model_name: str, split_ratios=None, seed=None, scale: bool = True): """Return train/val/test numpy arrays. Uses config defaults for split_ratios/seed when None. Same dataset and split logic as get_dataloaders for consistent evaluation.""" if split_ratios is None or seed is None: _ratios, _seed = get_default_split_config() split_ratios = split_ratios if split_ratios is not None else _ratios seed = seed if seed is not None else _seed features, labels = _load_real_data(model_name) num_features = features.shape[1] num_classes = int(labels.max()) + 1 if num_classes < 2: raise ValueError("Dataset has only one class; need at least 2 for classification.") splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale) return splits, num_features, num_classes, scaler def get_dataloaders(model_name: str, batch_size: int = 32, split_ratios=None, seed=None, scale: bool = True): """Return PyTorch DataLoaders. Uses config defaults for split_ratios/seed when None. Test set is held out before preprocessing; scaler fit on train only.""" if split_ratios is None or seed is None: _ratios, _seed = get_default_split_config() split_ratios = split_ratios if split_ratios is not None else _ratios seed = seed if seed is not None else _seed _, _, dataloader_cls = _require_torch() features, labels = _load_real_data(model_name) num_features = features.shape[1] num_classes = int(labels.max()) + 1 if num_classes < 2: raise ValueError("Dataset has only one class; need at least 2 for classification.") splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale) train_ds = FeatureVectorDataset(splits["X_train"], splits["y_train"]) val_ds = FeatureVectorDataset(splits["X_val"], splits["y_val"]) test_ds = FeatureVectorDataset(splits["X_test"], splits["y_test"]) train_loader = dataloader_cls(train_ds, batch_size=batch_size, shuffle=True) val_loader = dataloader_cls(val_ds, batch_size=batch_size, shuffle=False) test_loader = dataloader_cls(test_ds, batch_size=batch_size, shuffle=False) return train_loader, val_loader, test_loader, num_features, num_classes, scaler