| |
| """ |
| Experiment B: Quantitative grip force regression (T4'). |
| |
| Predict per-hand summed fingertip pressure (grip force, in grams) at every |
| 20 Hz frame from NON-pressure modalities (MoCap + EMG + IMU + EyeTrack). |
| |
| Output: (T, 2) -- [total_right_force_g, total_left_force_g] |
| This directly exploits the dataset's unique 50-channel quantitative |
| pressure array, going beyond binary contact detection (T4). |
| |
| Train/test: subject-independent split over the 80 recordings with pressure. |
| Loss: Huber (robust to peak forces). Metrics: MAE, Pearson r, R^2 per hand. |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import random |
| import argparse |
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn as nn |
| from torch.utils.data import Dataset, DataLoader |
| from torch.nn.utils.rnn import pad_sequence |
| from scipy.stats import pearsonr |
|
|
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| from data.dataset import ( |
| DATASET_DIR, MODALITY_FILES, TRAIN_VOLS, TEST_VOLS, |
| load_modality_array, SCENE_LABELS, |
| ) |
| from nets.models import TransformerBackbone, LSTMBackbone, CNN1DBackbone |
|
|
|
|
| |
| |
| |
|
|
| class GripForceDataset(Dataset): |
| """Per-timestep regression: sensor features -> (R_force_g, L_force_g). |
| |
| Loads only recordings that have both the requested sensor modalities AND |
| a valid pressure CSV. |
| """ |
|
|
| def __init__(self, volunteers, modalities, downsample=5, stats=None, |
| target_stats=None, log_target=False): |
| self.modalities = modalities |
| self.downsample = downsample |
| self.log_target = log_target |
| self.data = [] |
| self.targets = [] |
| self.sample_info = [] |
| self._modality_dims = {} |
| self._raw_targets_cache = [] |
|
|
| for vol in volunteers: |
| vol_dir = os.path.join(DATASET_DIR, vol) |
| if not os.path.isdir(vol_dir): |
| continue |
| for scenario in sorted(os.listdir(vol_dir)): |
| scenario_dir = os.path.join(vol_dir, scenario) |
| if not os.path.isdir(scenario_dir) or scenario not in SCENE_LABELS: |
| continue |
| pressure_fp = os.path.join(scenario_dir, 'aligned_pressure_100hz.csv') |
| if not os.path.exists(pressure_fp): |
| continue |
| |
| try: |
| pdf = pd.read_csv(pressure_fp) |
| pvals = pdf.iloc[:, 1:].values.astype(np.float32) |
| if pvals.shape[1] != 50: |
| continue |
| except Exception as e: |
| print(f" SKIP {vol}/{scenario} pressure: {e}") |
| continue |
| |
| r_sum = pvals[:, :25].sum(axis=1) |
| l_sum = pvals[:, 25:].sum(axis=1) |
| raw_target = np.stack([r_sum, l_sum], axis=1) |
| |
| if getattr(self, 'log_target', False): |
| target = np.log1p(raw_target) |
| else: |
| target = raw_target |
| self._raw_targets_cache = self._raw_targets_cache if hasattr( |
| self, '_raw_targets_cache') else [] |
| self._raw_targets_cache.append(raw_target.astype(np.float32)) |
|
|
| |
| parts = [] |
| skip = False |
| for mod in modalities: |
| if mod == 'mocap': |
| filepath = os.path.join( |
| scenario_dir, f"aligned_{vol}{scenario}_s_Q.tsv", |
| ) |
| else: |
| filepath = os.path.join(scenario_dir, MODALITY_FILES[mod]) |
| if not os.path.exists(filepath): |
| skip = True |
| break |
| arr = load_modality_array(filepath, mod) |
| if arr is None: |
| skip = True |
| break |
| if mod in self._modality_dims and arr.shape[1] != self._modality_dims[mod]: |
| expected = self._modality_dims[mod] |
| if arr.shape[1] < expected: |
| pad = np.zeros((arr.shape[0], expected - arr.shape[1]), |
| dtype=np.float32) |
| arr = np.concatenate([arr, pad], axis=1) |
| else: |
| arr = arr[:, :expected] |
| if mod not in self._modality_dims: |
| self._modality_dims[mod] = arr.shape[1] |
| parts.append(arr) |
| if skip: |
| continue |
|
|
| T_min = min(target.shape[0], *(p.shape[0] for p in parts)) |
| parts = [p[:T_min] for p in parts] |
| target = target[:T_min] |
|
|
| combined = np.concatenate(parts, axis=1) |
| |
| combined = combined[::downsample] |
| target = target[::downsample] |
|
|
| self.data.append(combined) |
| self.targets.append(target.astype(np.float32)) |
| self.sample_info.append(f"{vol}/{scenario}") |
|
|
| if len(self.data) == 0: |
| raise RuntimeError("No data loaded. Check modality availability / pressure files.") |
| print(f" Loaded {len(self.data)} recordings (vol split), " |
| f"feat dim {sum(self._modality_dims.values())}, " |
| f"avg T {np.mean([d.shape[0] for d in self.data]):.0f}") |
|
|
| |
| if stats is not None: |
| self.mean, self.std = stats |
| else: |
| all_frames = np.concatenate(self.data, axis=0).astype(np.float64) |
| self.mean = all_frames.mean(axis=0, keepdims=True) |
| self.std = all_frames.std(axis=0, keepdims=True) |
| self.std[self.std < 1e-8] = 1.0 |
| for i in range(len(self.data)): |
| self.data[i] = ((self.data[i].astype(np.float64) - self.mean) / self.std).astype(np.float32) |
| self.data[i] = np.nan_to_num(self.data[i], nan=0.0, posinf=0.0, neginf=0.0) |
|
|
| |
| if target_stats is not None: |
| self.t_mean, self.t_std = target_stats |
| else: |
| all_t = np.concatenate(self.targets, axis=0).astype(np.float64) |
| self.t_mean = all_t.mean(axis=0, keepdims=True) |
| self.t_std = all_t.std(axis=0, keepdims=True) |
| self.t_std[self.t_std < 1e-8] = 1.0 |
| for i in range(len(self.targets)): |
| self.targets[i] = ( |
| (self.targets[i] - self.t_mean) / self.t_std |
| ).astype(np.float32) |
|
|
| def get_stats(self): |
| return (self.mean, self.std) |
|
|
| def get_target_stats(self): |
| return (self.t_mean, self.t_std) |
|
|
| @property |
| def feat_dim(self): |
| return sum(self._modality_dims.values()) |
|
|
| @property |
| def modality_dims(self): |
| return dict(self._modality_dims) |
|
|
| def __len__(self): |
| return len(self.data) |
|
|
| def __getitem__(self, idx): |
| return ( |
| torch.from_numpy(self.data[idx]), |
| torch.from_numpy(self.targets[idx]), |
| ) |
|
|
|
|
| def regress_collate_fn(batch): |
| seqs, targs = zip(*batch) |
| lens = torch.LongTensor([s.shape[0] for s in seqs]) |
| padded = pad_sequence(seqs, batch_first=True, padding_value=0.0) |
| padded_t = pad_sequence(targs, batch_first=True, padding_value=0.0) |
| max_len = padded.shape[1] |
| mask = torch.arange(max_len).unsqueeze(0) < lens.unsqueeze(1) |
| return padded, padded_t, mask, lens |
|
|
|
|
| |
| |
| |
|
|
| class GripRegressor(nn.Module): |
| """Per-timestep regression head on top of a sequence backbone.""" |
|
|
| def __init__(self, backbone_name, feat_dim, hidden_dim=128, |
| output_dim=2, dropout=0.2): |
| super().__init__() |
| if backbone_name == 'transformer': |
| |
| self.input_proj = nn.Linear(feat_dim, hidden_dim) |
| enc_layer = nn.TransformerEncoderLayer( |
| d_model=hidden_dim, nhead=4, |
| dim_feedforward=4 * hidden_dim, dropout=dropout, |
| batch_first=True, activation='gelu', |
| ) |
| self.encoder = nn.TransformerEncoder(enc_layer, num_layers=2) |
| self.pos_enc = nn.Parameter(torch.zeros(1, 4800, hidden_dim)) |
| nn.init.trunc_normal_(self.pos_enc, std=0.02) |
| self.head = nn.Sequential( |
| nn.LayerNorm(hidden_dim), |
| nn.Linear(hidden_dim, hidden_dim), |
| nn.GELU(), |
| nn.Dropout(dropout), |
| nn.Linear(hidden_dim, output_dim), |
| ) |
| self.backbone_type = 'transformer' |
| elif backbone_name == 'lstm': |
| self.lstm = nn.LSTM( |
| feat_dim, hidden_dim, num_layers=2, batch_first=True, |
| bidirectional=True, dropout=dropout, |
| ) |
| self.head = nn.Sequential( |
| nn.LayerNorm(2 * hidden_dim), |
| nn.Linear(2 * hidden_dim, hidden_dim), |
| nn.GELU(), |
| nn.Dropout(dropout), |
| nn.Linear(hidden_dim, output_dim), |
| ) |
| self.backbone_type = 'lstm' |
| elif backbone_name == 'cnn': |
| self.cnn = nn.Sequential( |
| nn.Conv1d(feat_dim, hidden_dim, 7, padding=3), |
| nn.BatchNorm1d(hidden_dim), nn.ReLU(), |
| nn.Conv1d(hidden_dim, hidden_dim, 5, padding=2), |
| nn.BatchNorm1d(hidden_dim), nn.ReLU(), |
| nn.Conv1d(hidden_dim, hidden_dim, 3, padding=1), |
| nn.BatchNorm1d(hidden_dim), nn.ReLU(), |
| ) |
| self.head = nn.Sequential( |
| nn.LayerNorm(hidden_dim), |
| nn.Linear(hidden_dim, output_dim), |
| ) |
| self.backbone_type = 'cnn' |
| else: |
| raise ValueError(f"Unknown backbone: {backbone_name}") |
|
|
| def forward(self, x, mask): |
| if self.backbone_type == 'transformer': |
| T = x.size(1) |
| h = self.input_proj(x) + self.pos_enc[:, :T, :] |
| key_padding = ~mask |
| h = self.encoder(h, src_key_padding_mask=key_padding) |
| return self.head(h) |
| elif self.backbone_type == 'lstm': |
| h, _ = self.lstm(x) |
| return self.head(h) |
| elif self.backbone_type == 'cnn': |
| |
| h = self.cnn(x.transpose(1, 2)).transpose(1, 2) |
| return self.head(h) |
|
|
|
|
| |
| |
| |
|
|
| def set_seed(seed): |
| random.seed(seed) |
| np.random.seed(seed) |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed_all(seed) |
|
|
|
|
| def masked_huber(pred, target, mask, delta=1.0): |
| diff = pred - target |
| abs_d = diff.abs() |
| quad = 0.5 * diff * diff |
| lin = delta * (abs_d - 0.5 * delta) |
| loss = torch.where(abs_d < delta, quad, lin) |
| m = mask.unsqueeze(-1).float() |
| return (loss * m).sum() / (m.sum() * loss.size(-1) + 1e-8) |
|
|
|
|
| def train_one_epoch(model, loader, optimizer, device, huber_delta=1.0): |
| model.train() |
| total = 0.0 |
| n_frames = 0 |
| for x, y, mask, _ in loader: |
| x, y, mask = x.to(device), y.to(device), mask.to(device) |
| optimizer.zero_grad() |
| pred = model(x, mask) |
| loss = masked_huber(pred, y, mask, delta=huber_delta) |
| loss.backward() |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) |
| optimizer.step() |
| nf = mask.sum().item() |
| total += loss.item() * nf |
| n_frames += nf |
| return total / max(n_frames, 1) |
|
|
|
|
| @torch.no_grad() |
| def evaluate(model, loader, device, target_mean, target_std, huber_delta=1.0, |
| log_target=False): |
| model.eval() |
| preds_R, preds_L = [], [] |
| trues_R, trues_L = [], [] |
| total_loss = 0.0 |
| n_frames = 0 |
| for x, y, mask, lens in loader: |
| x, y, mask = x.to(device), y.to(device), mask.to(device) |
| pred = model(x, mask) |
| loss = masked_huber(pred, y, mask, delta=huber_delta) |
| nf = mask.sum().item() |
| total_loss += loss.item() * nf |
| n_frames += nf |
| |
| pred_np = pred.cpu().numpy() * target_std + target_mean |
| true_np = y.cpu().numpy() * target_std + target_mean |
| if log_target: |
| pred_np = np.expm1(np.maximum(pred_np, 0)) |
| true_np = np.expm1(np.maximum(true_np, 0)) |
| mask_np = mask.cpu().numpy() |
| for b in range(pred_np.shape[0]): |
| valid = mask_np[b] |
| preds_R.extend(pred_np[b, valid, 0]) |
| trues_R.extend(true_np[b, valid, 0]) |
| preds_L.extend(pred_np[b, valid, 1]) |
| trues_L.extend(true_np[b, valid, 1]) |
| preds_R, preds_L = np.array(preds_R), np.array(preds_L) |
| trues_R, trues_L = np.array(trues_R), np.array(trues_L) |
|
|
| def metrics(p, t): |
| mae = float(np.mean(np.abs(p - t))) |
| if np.std(p) < 1e-6 or np.std(t) < 1e-6: |
| r, r2 = 0.0, 0.0 |
| else: |
| r = float(pearsonr(p, t)[0]) |
| ss_res = float(((p - t) ** 2).sum()) |
| ss_tot = float(((t - t.mean()) ** 2).sum()) |
| r2 = 1.0 - ss_res / (ss_tot + 1e-8) |
| return {'mae_g': mae, 'pearson_r': r, 'r2': r2, |
| 'mean_true_g': float(t.mean()), |
| 'mean_pred_g': float(p.mean())} |
|
|
| return { |
| 'loss': total_loss / max(n_frames, 1), |
| 'right_hand': metrics(preds_R, trues_R), |
| 'left_hand': metrics(preds_L, trues_L), |
| 'avg_mae_g': 0.5 * (np.mean(np.abs(preds_R - trues_R)) + |
| np.mean(np.abs(preds_L - trues_L))), |
| 'avg_pearson_r': 0.5 * (metrics(preds_R, trues_R)['pearson_r'] + |
| metrics(preds_L, trues_L)['pearson_r']), |
| } |
|
|
|
|
| def run_experiment(args): |
| set_seed(args.seed) |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| print(f"Device: {device}") |
|
|
| modalities = args.modalities.split(',') |
| print(f"Backbone: {args.backbone} | Modalities: {modalities} | Seed: {args.seed}") |
|
|
| print("Loading train...") |
| train_ds = GripForceDataset(TRAIN_VOLS, modalities, downsample=args.downsample, |
| log_target=args.log_target) |
| stats = train_ds.get_stats() |
| tstats = train_ds.get_target_stats() |
| print(f" target mean: {tstats[0].flatten()} std: {tstats[1].flatten()} " |
| f"(log_target={args.log_target})") |
|
|
| print("Loading test...") |
| test_ds = GripForceDataset(TEST_VOLS, modalities, downsample=args.downsample, |
| stats=stats, target_stats=tstats, |
| log_target=args.log_target) |
|
|
| train_loader = DataLoader(train_ds, batch_size=args.batch_size, shuffle=True, |
| collate_fn=regress_collate_fn, num_workers=0) |
| test_loader = DataLoader(test_ds, batch_size=args.batch_size, shuffle=False, |
| collate_fn=regress_collate_fn, num_workers=0) |
|
|
| model = GripRegressor( |
| args.backbone, train_ds.feat_dim, hidden_dim=args.hidden_dim, |
| output_dim=2, dropout=args.dropout, |
| ).to(device) |
| n_params = sum(p.numel() for p in model.parameters()) |
| print(f"Params: {n_params:,}") |
|
|
| optimizer = torch.optim.Adam(model.parameters(), lr=args.lr, |
| weight_decay=args.weight_decay) |
| scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( |
| optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6, |
| ) |
|
|
| |
| mod_str = '-'.join(modalities) |
| exp_name = f"grip_{args.backbone}_{mod_str}_seed{args.seed}" |
| if args.tag: |
| exp_name += f"_{args.tag}" |
| out_dir = os.path.join(args.output_dir, exp_name) |
| os.makedirs(out_dir, exist_ok=True) |
|
|
| best_test_mae = float('inf') |
| best_state = None |
| best_epoch = 0 |
| patience_counter = 0 |
|
|
| for epoch in range(1, args.epochs + 1): |
| t0 = time.time() |
| train_loss = train_one_epoch(model, train_loader, optimizer, device, |
| huber_delta=args.huber_delta) |
| m = evaluate(model, test_loader, device, |
| tstats[0], tstats[1], huber_delta=args.huber_delta, |
| log_target=args.log_target) |
| scheduler.step(m['loss']) |
| print(f" E{epoch:3d} | tr {train_loss:.4f} | " |
| f"te_loss {m['loss']:.4f} mae {m['avg_mae_g']:.2f}g " |
| f"r {m['avg_pearson_r']:.3f} | " |
| f"R: r={m['right_hand']['pearson_r']:.3f} r2={m['right_hand']['r2']:.3f} " |
| f"L: r={m['left_hand']['pearson_r']:.3f} r2={m['left_hand']['r2']:.3f} | " |
| f"{time.time()-t0:.1f}s") |
| |
| if m['avg_mae_g'] < best_test_mae: |
| best_test_mae = m['avg_mae_g'] |
| best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()} |
| best_epoch = epoch |
| best_metrics = m |
| patience_counter = 0 |
| else: |
| patience_counter += 1 |
| if patience_counter >= args.patience: |
| print(f" Early stop at epoch {epoch} (best {best_epoch})") |
| break |
|
|
| if best_state is not None: |
| torch.save(best_state, os.path.join(out_dir, 'model_best.pt')) |
|
|
| results = { |
| 'experiment': exp_name, |
| 'backbone': args.backbone, |
| 'modalities': modalities, |
| 'seed': args.seed, |
| 'best_epoch': best_epoch, |
| 'best_test_metrics': best_metrics, |
| 'train_size': len(train_ds), |
| 'test_size': len(test_ds), |
| 'feat_dim': train_ds.feat_dim, |
| 'modality_dims': train_ds.modality_dims, |
| 'target_mean_g': tstats[0].flatten().tolist(), |
| 'target_std_g': tstats[1].flatten().tolist(), |
| 'args': vars(args), |
| } |
| with open(os.path.join(out_dir, 'results.json'), 'w') as f: |
| json.dump(results, f, indent=2) |
| print(f"Saved: {out_dir}/results.json") |
| return results |
|
|
|
|
| def main(): |
| p = argparse.ArgumentParser() |
| p.add_argument('--backbone', type=str, default='transformer', |
| choices=['transformer', 'lstm', 'cnn']) |
| p.add_argument('--modalities', type=str, default='mocap,emg,eyetrack,imu') |
| p.add_argument('--epochs', type=int, default=60) |
| p.add_argument('--batch_size', type=int, default=8) |
| p.add_argument('--lr', type=float, default=1e-3) |
| p.add_argument('--weight_decay', type=float, default=1e-4) |
| p.add_argument('--hidden_dim', type=int, default=128) |
| p.add_argument('--dropout', type=float, default=0.2) |
| p.add_argument('--downsample', type=int, default=5) |
| p.add_argument('--patience', type=int, default=12) |
| p.add_argument('--huber_delta', type=float, default=1.0) |
| p.add_argument('--seed', type=int, default=42) |
| p.add_argument('--output_dir', type=str, required=True) |
| p.add_argument('--tag', type=str, default='') |
| p.add_argument('--log_target', action='store_true', |
| help='Use log1p(force) as regression target') |
| args = p.parse_args() |
| run_experiment(args) |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|