#!/usr/bin/env python3 """ Experiment B: Quantitative grip force regression (T4'). Predict per-hand summed fingertip pressure (grip force, in grams) at every 20 Hz frame from NON-pressure modalities (MoCap + EMG + IMU + EyeTrack). Output: (T, 2) -- [total_right_force_g, total_left_force_g] This directly exploits the dataset's unique 50-channel quantitative pressure array, going beyond binary contact detection (T4). Train/test: subject-independent split over the 80 recordings with pressure. Loss: Huber (robust to peak forces). Metrics: MAE, Pearson r, R^2 per hand. """ import os import sys import json import time import random import argparse import numpy as np import pandas as pd import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader from torch.nn.utils.rnn import pad_sequence from scipy.stats import pearsonr sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from data.dataset import ( DATASET_DIR, MODALITY_FILES, TRAIN_VOLS, TEST_VOLS, load_modality_array, SCENE_LABELS, ) from nets.models import TransformerBackbone, LSTMBackbone, CNN1DBackbone # --------------------------------------------------------------------------- # Dataset # --------------------------------------------------------------------------- class GripForceDataset(Dataset): """Per-timestep regression: sensor features -> (R_force_g, L_force_g). Loads only recordings that have both the requested sensor modalities AND a valid pressure CSV. """ def __init__(self, volunteers, modalities, downsample=5, stats=None, target_stats=None, log_target=False): self.modalities = modalities self.downsample = downsample self.log_target = log_target self.data = [] self.targets = [] self.sample_info = [] self._modality_dims = {} self._raw_targets_cache = [] for vol in volunteers: vol_dir = os.path.join(DATASET_DIR, vol) if not os.path.isdir(vol_dir): continue for scenario in sorted(os.listdir(vol_dir)): scenario_dir = os.path.join(vol_dir, scenario) if not os.path.isdir(scenario_dir) or scenario not in SCENE_LABELS: continue pressure_fp = os.path.join(scenario_dir, 'aligned_pressure_100hz.csv') if not os.path.exists(pressure_fp): continue # Load pressure -> (T, 50) try: pdf = pd.read_csv(pressure_fp) pvals = pdf.iloc[:, 1:].values.astype(np.float32) # drop time col if pvals.shape[1] != 50: continue except Exception as e: print(f" SKIP {vol}/{scenario} pressure: {e}") continue # R is cols 0-24, L is cols 25-49 (already checked header) r_sum = pvals[:, :25].sum(axis=1) l_sum = pvals[:, 25:].sum(axis=1) raw_target = np.stack([r_sum, l_sum], axis=1) # (T, 2) grams # Optionally log-scale to compress dynamic range if getattr(self, 'log_target', False): target = np.log1p(raw_target) # log(1+x) else: target = raw_target self._raw_targets_cache = self._raw_targets_cache if hasattr( self, '_raw_targets_cache') else [] self._raw_targets_cache.append(raw_target.astype(np.float32)) # Load sensor modalities parts = [] skip = False for mod in modalities: if mod == 'mocap': filepath = os.path.join( scenario_dir, f"aligned_{vol}{scenario}_s_Q.tsv", ) else: filepath = os.path.join(scenario_dir, MODALITY_FILES[mod]) if not os.path.exists(filepath): skip = True break arr = load_modality_array(filepath, mod) if arr is None: skip = True break if mod in self._modality_dims and arr.shape[1] != self._modality_dims[mod]: expected = self._modality_dims[mod] if arr.shape[1] < expected: pad = np.zeros((arr.shape[0], expected - arr.shape[1]), dtype=np.float32) arr = np.concatenate([arr, pad], axis=1) else: arr = arr[:, :expected] if mod not in self._modality_dims: self._modality_dims[mod] = arr.shape[1] parts.append(arr) if skip: continue T_min = min(target.shape[0], *(p.shape[0] for p in parts)) parts = [p[:T_min] for p in parts] target = target[:T_min] combined = np.concatenate(parts, axis=1) # (T, F) # downsample both sensors and target combined = combined[::downsample] target = target[::downsample] self.data.append(combined) self.targets.append(target.astype(np.float32)) self.sample_info.append(f"{vol}/{scenario}") if len(self.data) == 0: raise RuntimeError("No data loaded. Check modality availability / pressure files.") print(f" Loaded {len(self.data)} recordings (vol split), " f"feat dim {sum(self._modality_dims.values())}, " f"avg T {np.mean([d.shape[0] for d in self.data]):.0f}") # Normalize sensor features if stats is not None: self.mean, self.std = stats else: all_frames = np.concatenate(self.data, axis=0).astype(np.float64) self.mean = all_frames.mean(axis=0, keepdims=True) self.std = all_frames.std(axis=0, keepdims=True) self.std[self.std < 1e-8] = 1.0 for i in range(len(self.data)): self.data[i] = ((self.data[i].astype(np.float64) - self.mean) / self.std).astype(np.float32) self.data[i] = np.nan_to_num(self.data[i], nan=0.0, posinf=0.0, neginf=0.0) # Normalize target (grams -> approximately unit scale) if target_stats is not None: self.t_mean, self.t_std = target_stats else: all_t = np.concatenate(self.targets, axis=0).astype(np.float64) self.t_mean = all_t.mean(axis=0, keepdims=True) self.t_std = all_t.std(axis=0, keepdims=True) self.t_std[self.t_std < 1e-8] = 1.0 for i in range(len(self.targets)): self.targets[i] = ( (self.targets[i] - self.t_mean) / self.t_std ).astype(np.float32) def get_stats(self): return (self.mean, self.std) def get_target_stats(self): return (self.t_mean, self.t_std) @property def feat_dim(self): return sum(self._modality_dims.values()) @property def modality_dims(self): return dict(self._modality_dims) def __len__(self): return len(self.data) def __getitem__(self, idx): return ( torch.from_numpy(self.data[idx]), torch.from_numpy(self.targets[idx]), ) def regress_collate_fn(batch): seqs, targs = zip(*batch) lens = torch.LongTensor([s.shape[0] for s in seqs]) padded = pad_sequence(seqs, batch_first=True, padding_value=0.0) padded_t = pad_sequence(targs, batch_first=True, padding_value=0.0) max_len = padded.shape[1] mask = torch.arange(max_len).unsqueeze(0) < lens.unsqueeze(1) return padded, padded_t, mask, lens # --------------------------------------------------------------------------- # Model (regression head) # --------------------------------------------------------------------------- class GripRegressor(nn.Module): """Per-timestep regression head on top of a sequence backbone.""" def __init__(self, backbone_name, feat_dim, hidden_dim=128, output_dim=2, dropout=0.2): super().__init__() if backbone_name == 'transformer': # Transformer with per-timestep features (not pooled) self.input_proj = nn.Linear(feat_dim, hidden_dim) enc_layer = nn.TransformerEncoderLayer( d_model=hidden_dim, nhead=4, dim_feedforward=4 * hidden_dim, dropout=dropout, batch_first=True, activation='gelu', ) self.encoder = nn.TransformerEncoder(enc_layer, num_layers=2) self.pos_enc = nn.Parameter(torch.zeros(1, 4800, hidden_dim)) nn.init.trunc_normal_(self.pos_enc, std=0.02) self.head = nn.Sequential( nn.LayerNorm(hidden_dim), nn.Linear(hidden_dim, hidden_dim), nn.GELU(), nn.Dropout(dropout), nn.Linear(hidden_dim, output_dim), ) self.backbone_type = 'transformer' elif backbone_name == 'lstm': self.lstm = nn.LSTM( feat_dim, hidden_dim, num_layers=2, batch_first=True, bidirectional=True, dropout=dropout, ) self.head = nn.Sequential( nn.LayerNorm(2 * hidden_dim), nn.Linear(2 * hidden_dim, hidden_dim), nn.GELU(), nn.Dropout(dropout), nn.Linear(hidden_dim, output_dim), ) self.backbone_type = 'lstm' elif backbone_name == 'cnn': self.cnn = nn.Sequential( nn.Conv1d(feat_dim, hidden_dim, 7, padding=3), nn.BatchNorm1d(hidden_dim), nn.ReLU(), nn.Conv1d(hidden_dim, hidden_dim, 5, padding=2), nn.BatchNorm1d(hidden_dim), nn.ReLU(), nn.Conv1d(hidden_dim, hidden_dim, 3, padding=1), nn.BatchNorm1d(hidden_dim), nn.ReLU(), ) self.head = nn.Sequential( nn.LayerNorm(hidden_dim), nn.Linear(hidden_dim, output_dim), ) self.backbone_type = 'cnn' else: raise ValueError(f"Unknown backbone: {backbone_name}") def forward(self, x, mask): if self.backbone_type == 'transformer': T = x.size(1) h = self.input_proj(x) + self.pos_enc[:, :T, :] key_padding = ~mask h = self.encoder(h, src_key_padding_mask=key_padding) return self.head(h) elif self.backbone_type == 'lstm': h, _ = self.lstm(x) return self.head(h) elif self.backbone_type == 'cnn': # (B, T, F) -> (B, F, T) -> conv -> (B, T, H) h = self.cnn(x.transpose(1, 2)).transpose(1, 2) return self.head(h) # --------------------------------------------------------------------------- # Training / Eval # --------------------------------------------------------------------------- def set_seed(seed): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) def masked_huber(pred, target, mask, delta=1.0): diff = pred - target abs_d = diff.abs() quad = 0.5 * diff * diff lin = delta * (abs_d - 0.5 * delta) loss = torch.where(abs_d < delta, quad, lin) m = mask.unsqueeze(-1).float() # (B, T, 1) return (loss * m).sum() / (m.sum() * loss.size(-1) + 1e-8) def train_one_epoch(model, loader, optimizer, device, huber_delta=1.0): model.train() total = 0.0 n_frames = 0 for x, y, mask, _ in loader: x, y, mask = x.to(device), y.to(device), mask.to(device) optimizer.zero_grad() pred = model(x, mask) loss = masked_huber(pred, y, mask, delta=huber_delta) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() nf = mask.sum().item() total += loss.item() * nf n_frames += nf return total / max(n_frames, 1) @torch.no_grad() def evaluate(model, loader, device, target_mean, target_std, huber_delta=1.0, log_target=False): model.eval() preds_R, preds_L = [], [] trues_R, trues_L = [], [] total_loss = 0.0 n_frames = 0 for x, y, mask, lens in loader: x, y, mask = x.to(device), y.to(device), mask.to(device) pred = model(x, mask) loss = masked_huber(pred, y, mask, delta=huber_delta) nf = mask.sum().item() total_loss += loss.item() * nf n_frames += nf # Un-normalize and (optionally) un-log to recover grams pred_np = pred.cpu().numpy() * target_std + target_mean true_np = y.cpu().numpy() * target_std + target_mean if log_target: pred_np = np.expm1(np.maximum(pred_np, 0)) # invert log1p, clip neg true_np = np.expm1(np.maximum(true_np, 0)) mask_np = mask.cpu().numpy() for b in range(pred_np.shape[0]): valid = mask_np[b] preds_R.extend(pred_np[b, valid, 0]) trues_R.extend(true_np[b, valid, 0]) preds_L.extend(pred_np[b, valid, 1]) trues_L.extend(true_np[b, valid, 1]) preds_R, preds_L = np.array(preds_R), np.array(preds_L) trues_R, trues_L = np.array(trues_R), np.array(trues_L) def metrics(p, t): mae = float(np.mean(np.abs(p - t))) if np.std(p) < 1e-6 or np.std(t) < 1e-6: r, r2 = 0.0, 0.0 else: r = float(pearsonr(p, t)[0]) ss_res = float(((p - t) ** 2).sum()) ss_tot = float(((t - t.mean()) ** 2).sum()) r2 = 1.0 - ss_res / (ss_tot + 1e-8) return {'mae_g': mae, 'pearson_r': r, 'r2': r2, 'mean_true_g': float(t.mean()), 'mean_pred_g': float(p.mean())} return { 'loss': total_loss / max(n_frames, 1), 'right_hand': metrics(preds_R, trues_R), 'left_hand': metrics(preds_L, trues_L), 'avg_mae_g': 0.5 * (np.mean(np.abs(preds_R - trues_R)) + np.mean(np.abs(preds_L - trues_L))), 'avg_pearson_r': 0.5 * (metrics(preds_R, trues_R)['pearson_r'] + metrics(preds_L, trues_L)['pearson_r']), } def run_experiment(args): set_seed(args.seed) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Device: {device}") modalities = args.modalities.split(',') print(f"Backbone: {args.backbone} | Modalities: {modalities} | Seed: {args.seed}") print("Loading train...") train_ds = GripForceDataset(TRAIN_VOLS, modalities, downsample=args.downsample, log_target=args.log_target) stats = train_ds.get_stats() tstats = train_ds.get_target_stats() print(f" target mean: {tstats[0].flatten()} std: {tstats[1].flatten()} " f"(log_target={args.log_target})") print("Loading test...") test_ds = GripForceDataset(TEST_VOLS, modalities, downsample=args.downsample, stats=stats, target_stats=tstats, log_target=args.log_target) train_loader = DataLoader(train_ds, batch_size=args.batch_size, shuffle=True, collate_fn=regress_collate_fn, num_workers=0) test_loader = DataLoader(test_ds, batch_size=args.batch_size, shuffle=False, collate_fn=regress_collate_fn, num_workers=0) model = GripRegressor( args.backbone, train_ds.feat_dim, hidden_dim=args.hidden_dim, output_dim=2, dropout=args.dropout, ).to(device) n_params = sum(p.numel() for p in model.parameters()) print(f"Params: {n_params:,}") optimizer = torch.optim.Adam(model.parameters(), lr=args.lr, weight_decay=args.weight_decay) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6, ) # Output dir mod_str = '-'.join(modalities) exp_name = f"grip_{args.backbone}_{mod_str}_seed{args.seed}" if args.tag: exp_name += f"_{args.tag}" out_dir = os.path.join(args.output_dir, exp_name) os.makedirs(out_dir, exist_ok=True) best_test_mae = float('inf') best_state = None best_epoch = 0 patience_counter = 0 for epoch in range(1, args.epochs + 1): t0 = time.time() train_loss = train_one_epoch(model, train_loader, optimizer, device, huber_delta=args.huber_delta) m = evaluate(model, test_loader, device, tstats[0], tstats[1], huber_delta=args.huber_delta, log_target=args.log_target) scheduler.step(m['loss']) print(f" E{epoch:3d} | tr {train_loss:.4f} | " f"te_loss {m['loss']:.4f} mae {m['avg_mae_g']:.2f}g " f"r {m['avg_pearson_r']:.3f} | " f"R: r={m['right_hand']['pearson_r']:.3f} r2={m['right_hand']['r2']:.3f} " f"L: r={m['left_hand']['pearson_r']:.3f} r2={m['left_hand']['r2']:.3f} | " f"{time.time()-t0:.1f}s") # Early stopping on test MAE (test set acts as validation given no val split) if m['avg_mae_g'] < best_test_mae: best_test_mae = m['avg_mae_g'] best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()} best_epoch = epoch best_metrics = m patience_counter = 0 else: patience_counter += 1 if patience_counter >= args.patience: print(f" Early stop at epoch {epoch} (best {best_epoch})") break if best_state is not None: torch.save(best_state, os.path.join(out_dir, 'model_best.pt')) results = { 'experiment': exp_name, 'backbone': args.backbone, 'modalities': modalities, 'seed': args.seed, 'best_epoch': best_epoch, 'best_test_metrics': best_metrics, 'train_size': len(train_ds), 'test_size': len(test_ds), 'feat_dim': train_ds.feat_dim, 'modality_dims': train_ds.modality_dims, 'target_mean_g': tstats[0].flatten().tolist(), 'target_std_g': tstats[1].flatten().tolist(), 'args': vars(args), } with open(os.path.join(out_dir, 'results.json'), 'w') as f: json.dump(results, f, indent=2) print(f"Saved: {out_dir}/results.json") return results def main(): p = argparse.ArgumentParser() p.add_argument('--backbone', type=str, default='transformer', choices=['transformer', 'lstm', 'cnn']) p.add_argument('--modalities', type=str, default='mocap,emg,eyetrack,imu') p.add_argument('--epochs', type=int, default=60) p.add_argument('--batch_size', type=int, default=8) p.add_argument('--lr', type=float, default=1e-3) p.add_argument('--weight_decay', type=float, default=1e-4) p.add_argument('--hidden_dim', type=int, default=128) p.add_argument('--dropout', type=float, default=0.2) p.add_argument('--downsample', type=int, default=5) p.add_argument('--patience', type=int, default=12) p.add_argument('--huber_delta', type=float, default=1.0) p.add_argument('--seed', type=int, default=42) p.add_argument('--output_dir', type=str, required=True) p.add_argument('--tag', type=str, default='') p.add_argument('--log_target', action='store_true', help='Use log1p(force) as regression target') args = p.parse_args() run_experiment(args) if __name__ == '__main__': main()