test_final / data_preparation /prepare_dataset.py
k22056537
feat: sync integration updates across app and ML pipeline
eb4abb8
"""
Single source for pooled train/val/test data and splits.
- Data: load_all_pooled() / load_per_person() from data/collected_*/*.npz (same pattern everywhere).
- Splits: get_numpy_splits() / get_dataloaders() use stratified train/val/test with a fixed seed from config.
- Test is held out before any preprocessing; StandardScaler is fit on train only, then applied to val and test.
"""
import os
import glob
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
torch = None
Dataset = object # type: ignore
DataLoader = None
# Defaults for stratified split (overridden by config when available)
_DEFAULT_SPLIT_RATIOS = (0.7, 0.15, 0.15)
_DEFAULT_SPLIT_SEED = 42
def _require_torch():
global torch, Dataset, DataLoader
if torch is None:
try:
import torch as _torch
from torch.utils.data import Dataset as _Dataset, DataLoader as _DataLoader
except ImportError as exc: # pragma: no cover
raise ImportError("PyTorch not installed") from exc
torch = _torch
Dataset = _Dataset # type: ignore
DataLoader = _DataLoader # type: ignore
return torch, Dataset, DataLoader
DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
SELECTED_FEATURES = {
"face_orientation": [
'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch',
'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos'
],
"eye_behaviour": [
'ear_left', 'ear_right', 'ear_avg', 'mar',
'blink_rate', 'closure_duration', 'perclos', 'yawn_duration'
]
}
class FeatureVectorDataset(Dataset):
def __init__(self, features: np.ndarray, labels: np.ndarray):
torch_mod, _, _ = _require_torch()
self.features = torch_mod.tensor(features, dtype=torch_mod.float32)
self.labels = torch_mod.tensor(labels, dtype=torch_mod.long)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
return self.features[idx], self.labels[idx]
# ── Low-level helpers ────────────────────────────────────────────────────
def _clean_npz(raw, names):
"""Apply clipping rules in-place. Shared by all loaders."""
for col, lo, hi in [('yaw', -45, 45), ('pitch', -30, 30), ('roll', -30, 30)]:
if col in names:
raw[:, names.index(col)] = np.clip(raw[:, names.index(col)], lo, hi)
for feat in ['ear_left', 'ear_right', 'ear_avg']:
if feat in names:
raw[:, names.index(feat)] = np.clip(raw[:, names.index(feat)], 0, 0.85)
return raw
def _load_one_npz(npz_path, target_features):
"""Load a single .npz file, clean and select features. Returns (X, y, selected_feature_names)."""
data = np.load(npz_path, allow_pickle=True)
raw = data['features'].astype(np.float32)
labels = data['labels'].astype(np.int64)
names = list(data['feature_names'])
raw = _clean_npz(raw, names)
selected = [f for f in target_features if f in names]
idx = [names.index(f) for f in selected]
return raw[:, idx], labels, selected
# ── Public data loaders ──────────────────────────────────────────────────
def load_all_pooled(model_name: str = "face_orientation", data_dir: str = None):
"""Load all collected_*/*.npz, clean, select features, concatenate.
Returns (X_all, y_all, all_feature_names).
"""
data_dir = data_dir or DATA_DIR
target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
pattern = os.path.join(data_dir, "collected_*", "*.npz")
npz_files = sorted(glob.glob(pattern))
if not npz_files:
raise FileNotFoundError(
f"No .npz files matching {pattern}. "
"Collect data first with `python -m models.collect_features --name <name>`."
)
all_X, all_y = [], []
all_names = None
for npz_path in npz_files:
X, y, names = _load_one_npz(npz_path, target_features)
if all_names is None:
all_names = names
all_X.append(X)
all_y.append(y)
print(f"[DATA] + {os.path.basename(npz_path)}: {X.shape[0]} samples")
X_all = np.concatenate(all_X, axis=0)
y_all = np.concatenate(all_y, axis=0)
print(f"[DATA] Loaded {len(npz_files)} file(s) for '{model_name}': "
f"{X_all.shape[0]} total samples, {X_all.shape[1]} features")
return X_all, y_all, all_names
def load_per_person(model_name: str = "face_orientation", data_dir: str = None):
"""Load collected_*/*.npz grouped by person (folder name).
Returns dict { person_name: (X, y) } where X/y are per-person numpy arrays.
Also returns (X_all, y_all) as pooled data.
"""
data_dir = data_dir or DATA_DIR
target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
pattern = os.path.join(data_dir, "collected_*", "*.npz")
npz_files = sorted(glob.glob(pattern))
if not npz_files:
raise FileNotFoundError(f"No .npz files matching {pattern}")
by_person = {}
all_X, all_y = [], []
for npz_path in npz_files:
folder = os.path.basename(os.path.dirname(npz_path))
person = folder.replace("collected_", "", 1)
X, y, _ = _load_one_npz(npz_path, target_features)
all_X.append(X)
all_y.append(y)
if person not in by_person:
by_person[person] = []
by_person[person].append((X, y))
print(f"[DATA] + {person}/{os.path.basename(npz_path)}: {X.shape[0]} samples")
for person, chunks in by_person.items():
by_person[person] = (
np.concatenate([c[0] for c in chunks], axis=0),
np.concatenate([c[1] for c in chunks], axis=0),
)
X_all = np.concatenate(all_X, axis=0)
y_all = np.concatenate(all_y, axis=0)
print(f"[DATA] {len(by_person)} persons, {X_all.shape[0]} total samples, {X_all.shape[1]} features")
return by_person, X_all, y_all
def load_raw_npz(npz_path):
"""Load a single .npz without cleaning or feature selection. For exploration notebooks."""
data = np.load(npz_path, allow_pickle=True)
features = data['features'].astype(np.float32)
labels = data['labels'].astype(np.int64)
names = list(data['feature_names'])
return features, labels, names
# ── Legacy helpers (used by models/mlp/train.py and models/xgboost/train.py) ─
def _load_real_data(model_name: str):
X, y, _ = load_all_pooled(model_name)
return X, y
def _generate_synthetic_data(model_name: str):
target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
n = 500
d = len(target_features)
c = 2
rng = np.random.RandomState(42)
features = rng.randn(n, d).astype(np.float32)
labels = rng.randint(0, c, size=n).astype(np.int64)
print(f"[DATA] Using synthetic data for '{model_name}': {n} samples, {d} features, {c} classes")
return features, labels
def get_default_split_config():
"""Return (split_ratios, seed) from config so all scripts use the same split. Reproducible and consistent."""
try:
from config import get
data = get("data") or {}
ratios = data.get("split_ratios", list(_DEFAULT_SPLIT_RATIOS))
seed = get("mlp.seed") or _DEFAULT_SPLIT_SEED
return (tuple(ratios), int(seed))
except Exception:
return (_DEFAULT_SPLIT_RATIOS, _DEFAULT_SPLIT_SEED)
def _split_and_scale(features, labels, split_ratios, seed, scale):
"""Stratified train/val/test split. Test is held out first; val is split from the rest.
No training data is used for validation or test. Scaler is fit on train only, then
applied to val and test (no leakage from val/test into scaling).
"""
test_ratio = split_ratios[2]
val_ratio = split_ratios[1] / (split_ratios[0] + split_ratios[1])
X_train_val, X_test, y_train_val, y_test = train_test_split(
features, labels, test_size=test_ratio, random_state=seed, stratify=labels,
)
X_train, X_val, y_train, y_val = train_test_split(
X_train_val, y_train_val, test_size=val_ratio, random_state=seed, stratify=y_train_val,
)
scaler = None
if scale:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)
print("[DATA] Applied StandardScaler (fitted on training split only)")
splits = {
"X_train": X_train, "y_train": y_train,
"X_val": X_val, "y_val": y_val,
"X_test": X_test, "y_test": y_test,
}
print(f"[DATA] Split (stratified): train={len(y_train)}, val={len(y_val)}, test={len(y_test)}")
return splits, scaler
def get_numpy_splits(model_name: str, split_ratios=None, seed=None, scale: bool = True):
"""Return train/val/test numpy arrays. Uses config defaults for split_ratios/seed when None.
Same dataset and split logic as get_dataloaders for consistent evaluation."""
if split_ratios is None or seed is None:
_ratios, _seed = get_default_split_config()
split_ratios = split_ratios if split_ratios is not None else _ratios
seed = seed if seed is not None else _seed
features, labels = _load_real_data(model_name)
num_features = features.shape[1]
num_classes = int(labels.max()) + 1
if num_classes < 2:
raise ValueError("Dataset has only one class; need at least 2 for classification.")
splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale)
return splits, num_features, num_classes, scaler
def get_dataloaders(model_name: str, batch_size: int = 32, split_ratios=None, seed=None, scale: bool = True):
"""Return PyTorch DataLoaders. Uses config defaults for split_ratios/seed when None.
Test set is held out before preprocessing; scaler fit on train only."""
if split_ratios is None or seed is None:
_ratios, _seed = get_default_split_config()
split_ratios = split_ratios if split_ratios is not None else _ratios
seed = seed if seed is not None else _seed
_, _, dataloader_cls = _require_torch()
features, labels = _load_real_data(model_name)
num_features = features.shape[1]
num_classes = int(labels.max()) + 1
if num_classes < 2:
raise ValueError("Dataset has only one class; need at least 2 for classification.")
splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale)
train_ds = FeatureVectorDataset(splits["X_train"], splits["y_train"])
val_ds = FeatureVectorDataset(splits["X_val"], splits["y_val"])
test_ds = FeatureVectorDataset(splits["X_test"], splits["y_test"])
train_loader = dataloader_cls(train_ds, batch_size=batch_size, shuffle=True)
val_loader = dataloader_cls(val_ds, batch_size=batch_size, shuffle=False)
test_loader = dataloader_cls(test_ds, batch_size=batch_size, shuffle=False)
return train_loader, val_loader, test_loader, num_features, num_classes, scaler