PULSE-code / experiments /tasks /train_exp_grip.py
velvet-pine-22's picture
Upload folder using huggingface_hub
b4b2877 verified
#!/usr/bin/env python3
"""
Experiment B: Quantitative grip force regression (T4').
Predict per-hand summed fingertip pressure (grip force, in grams) at every
20 Hz frame from NON-pressure modalities (MoCap + EMG + IMU + EyeTrack).
Output: (T, 2) -- [total_right_force_g, total_left_force_g]
This directly exploits the dataset's unique 50-channel quantitative
pressure array, going beyond binary contact detection (T4).
Train/test: subject-independent split over the 80 recordings with pressure.
Loss: Huber (robust to peak forces). Metrics: MAE, Pearson r, R^2 per hand.
"""
import os
import sys
import json
import time
import random
import argparse
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from scipy.stats import pearsonr
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from data.dataset import (
DATASET_DIR, MODALITY_FILES, TRAIN_VOLS, TEST_VOLS,
load_modality_array, SCENE_LABELS,
)
from nets.models import TransformerBackbone, LSTMBackbone, CNN1DBackbone
# ---------------------------------------------------------------------------
# Dataset
# ---------------------------------------------------------------------------
class GripForceDataset(Dataset):
"""Per-timestep regression: sensor features -> (R_force_g, L_force_g).
Loads only recordings that have both the requested sensor modalities AND
a valid pressure CSV.
"""
def __init__(self, volunteers, modalities, downsample=5, stats=None,
target_stats=None, log_target=False):
self.modalities = modalities
self.downsample = downsample
self.log_target = log_target
self.data = []
self.targets = []
self.sample_info = []
self._modality_dims = {}
self._raw_targets_cache = []
for vol in volunteers:
vol_dir = os.path.join(DATASET_DIR, vol)
if not os.path.isdir(vol_dir):
continue
for scenario in sorted(os.listdir(vol_dir)):
scenario_dir = os.path.join(vol_dir, scenario)
if not os.path.isdir(scenario_dir) or scenario not in SCENE_LABELS:
continue
pressure_fp = os.path.join(scenario_dir, 'aligned_pressure_100hz.csv')
if not os.path.exists(pressure_fp):
continue
# Load pressure -> (T, 50)
try:
pdf = pd.read_csv(pressure_fp)
pvals = pdf.iloc[:, 1:].values.astype(np.float32) # drop time col
if pvals.shape[1] != 50:
continue
except Exception as e:
print(f" SKIP {vol}/{scenario} pressure: {e}")
continue
# R is cols 0-24, L is cols 25-49 (already checked header)
r_sum = pvals[:, :25].sum(axis=1)
l_sum = pvals[:, 25:].sum(axis=1)
raw_target = np.stack([r_sum, l_sum], axis=1) # (T, 2) grams
# Optionally log-scale to compress dynamic range
if getattr(self, 'log_target', False):
target = np.log1p(raw_target) # log(1+x)
else:
target = raw_target
self._raw_targets_cache = self._raw_targets_cache if hasattr(
self, '_raw_targets_cache') else []
self._raw_targets_cache.append(raw_target.astype(np.float32))
# Load sensor modalities
parts = []
skip = False
for mod in modalities:
if mod == 'mocap':
filepath = os.path.join(
scenario_dir, f"aligned_{vol}{scenario}_s_Q.tsv",
)
else:
filepath = os.path.join(scenario_dir, MODALITY_FILES[mod])
if not os.path.exists(filepath):
skip = True
break
arr = load_modality_array(filepath, mod)
if arr is None:
skip = True
break
if mod in self._modality_dims and arr.shape[1] != self._modality_dims[mod]:
expected = self._modality_dims[mod]
if arr.shape[1] < expected:
pad = np.zeros((arr.shape[0], expected - arr.shape[1]),
dtype=np.float32)
arr = np.concatenate([arr, pad], axis=1)
else:
arr = arr[:, :expected]
if mod not in self._modality_dims:
self._modality_dims[mod] = arr.shape[1]
parts.append(arr)
if skip:
continue
T_min = min(target.shape[0], *(p.shape[0] for p in parts))
parts = [p[:T_min] for p in parts]
target = target[:T_min]
combined = np.concatenate(parts, axis=1) # (T, F)
# downsample both sensors and target
combined = combined[::downsample]
target = target[::downsample]
self.data.append(combined)
self.targets.append(target.astype(np.float32))
self.sample_info.append(f"{vol}/{scenario}")
if len(self.data) == 0:
raise RuntimeError("No data loaded. Check modality availability / pressure files.")
print(f" Loaded {len(self.data)} recordings (vol split), "
f"feat dim {sum(self._modality_dims.values())}, "
f"avg T {np.mean([d.shape[0] for d in self.data]):.0f}")
# Normalize sensor features
if stats is not None:
self.mean, self.std = stats
else:
all_frames = np.concatenate(self.data, axis=0).astype(np.float64)
self.mean = all_frames.mean(axis=0, keepdims=True)
self.std = all_frames.std(axis=0, keepdims=True)
self.std[self.std < 1e-8] = 1.0
for i in range(len(self.data)):
self.data[i] = ((self.data[i].astype(np.float64) - self.mean) / self.std).astype(np.float32)
self.data[i] = np.nan_to_num(self.data[i], nan=0.0, posinf=0.0, neginf=0.0)
# Normalize target (grams -> approximately unit scale)
if target_stats is not None:
self.t_mean, self.t_std = target_stats
else:
all_t = np.concatenate(self.targets, axis=0).astype(np.float64)
self.t_mean = all_t.mean(axis=0, keepdims=True)
self.t_std = all_t.std(axis=0, keepdims=True)
self.t_std[self.t_std < 1e-8] = 1.0
for i in range(len(self.targets)):
self.targets[i] = (
(self.targets[i] - self.t_mean) / self.t_std
).astype(np.float32)
def get_stats(self):
return (self.mean, self.std)
def get_target_stats(self):
return (self.t_mean, self.t_std)
@property
def feat_dim(self):
return sum(self._modality_dims.values())
@property
def modality_dims(self):
return dict(self._modality_dims)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return (
torch.from_numpy(self.data[idx]),
torch.from_numpy(self.targets[idx]),
)
def regress_collate_fn(batch):
seqs, targs = zip(*batch)
lens = torch.LongTensor([s.shape[0] for s in seqs])
padded = pad_sequence(seqs, batch_first=True, padding_value=0.0)
padded_t = pad_sequence(targs, batch_first=True, padding_value=0.0)
max_len = padded.shape[1]
mask = torch.arange(max_len).unsqueeze(0) < lens.unsqueeze(1)
return padded, padded_t, mask, lens
# ---------------------------------------------------------------------------
# Model (regression head)
# ---------------------------------------------------------------------------
class GripRegressor(nn.Module):
"""Per-timestep regression head on top of a sequence backbone."""
def __init__(self, backbone_name, feat_dim, hidden_dim=128,
output_dim=2, dropout=0.2):
super().__init__()
if backbone_name == 'transformer':
# Transformer with per-timestep features (not pooled)
self.input_proj = nn.Linear(feat_dim, hidden_dim)
enc_layer = nn.TransformerEncoderLayer(
d_model=hidden_dim, nhead=4,
dim_feedforward=4 * hidden_dim, dropout=dropout,
batch_first=True, activation='gelu',
)
self.encoder = nn.TransformerEncoder(enc_layer, num_layers=2)
self.pos_enc = nn.Parameter(torch.zeros(1, 4800, hidden_dim))
nn.init.trunc_normal_(self.pos_enc, std=0.02)
self.head = nn.Sequential(
nn.LayerNorm(hidden_dim),
nn.Linear(hidden_dim, hidden_dim),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, output_dim),
)
self.backbone_type = 'transformer'
elif backbone_name == 'lstm':
self.lstm = nn.LSTM(
feat_dim, hidden_dim, num_layers=2, batch_first=True,
bidirectional=True, dropout=dropout,
)
self.head = nn.Sequential(
nn.LayerNorm(2 * hidden_dim),
nn.Linear(2 * hidden_dim, hidden_dim),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, output_dim),
)
self.backbone_type = 'lstm'
elif backbone_name == 'cnn':
self.cnn = nn.Sequential(
nn.Conv1d(feat_dim, hidden_dim, 7, padding=3),
nn.BatchNorm1d(hidden_dim), nn.ReLU(),
nn.Conv1d(hidden_dim, hidden_dim, 5, padding=2),
nn.BatchNorm1d(hidden_dim), nn.ReLU(),
nn.Conv1d(hidden_dim, hidden_dim, 3, padding=1),
nn.BatchNorm1d(hidden_dim), nn.ReLU(),
)
self.head = nn.Sequential(
nn.LayerNorm(hidden_dim),
nn.Linear(hidden_dim, output_dim),
)
self.backbone_type = 'cnn'
else:
raise ValueError(f"Unknown backbone: {backbone_name}")
def forward(self, x, mask):
if self.backbone_type == 'transformer':
T = x.size(1)
h = self.input_proj(x) + self.pos_enc[:, :T, :]
key_padding = ~mask
h = self.encoder(h, src_key_padding_mask=key_padding)
return self.head(h)
elif self.backbone_type == 'lstm':
h, _ = self.lstm(x)
return self.head(h)
elif self.backbone_type == 'cnn':
# (B, T, F) -> (B, F, T) -> conv -> (B, T, H)
h = self.cnn(x.transpose(1, 2)).transpose(1, 2)
return self.head(h)
# ---------------------------------------------------------------------------
# Training / Eval
# ---------------------------------------------------------------------------
def set_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
def masked_huber(pred, target, mask, delta=1.0):
diff = pred - target
abs_d = diff.abs()
quad = 0.5 * diff * diff
lin = delta * (abs_d - 0.5 * delta)
loss = torch.where(abs_d < delta, quad, lin)
m = mask.unsqueeze(-1).float() # (B, T, 1)
return (loss * m).sum() / (m.sum() * loss.size(-1) + 1e-8)
def train_one_epoch(model, loader, optimizer, device, huber_delta=1.0):
model.train()
total = 0.0
n_frames = 0
for x, y, mask, _ in loader:
x, y, mask = x.to(device), y.to(device), mask.to(device)
optimizer.zero_grad()
pred = model(x, mask)
loss = masked_huber(pred, y, mask, delta=huber_delta)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
nf = mask.sum().item()
total += loss.item() * nf
n_frames += nf
return total / max(n_frames, 1)
@torch.no_grad()
def evaluate(model, loader, device, target_mean, target_std, huber_delta=1.0,
log_target=False):
model.eval()
preds_R, preds_L = [], []
trues_R, trues_L = [], []
total_loss = 0.0
n_frames = 0
for x, y, mask, lens in loader:
x, y, mask = x.to(device), y.to(device), mask.to(device)
pred = model(x, mask)
loss = masked_huber(pred, y, mask, delta=huber_delta)
nf = mask.sum().item()
total_loss += loss.item() * nf
n_frames += nf
# Un-normalize and (optionally) un-log to recover grams
pred_np = pred.cpu().numpy() * target_std + target_mean
true_np = y.cpu().numpy() * target_std + target_mean
if log_target:
pred_np = np.expm1(np.maximum(pred_np, 0)) # invert log1p, clip neg
true_np = np.expm1(np.maximum(true_np, 0))
mask_np = mask.cpu().numpy()
for b in range(pred_np.shape[0]):
valid = mask_np[b]
preds_R.extend(pred_np[b, valid, 0])
trues_R.extend(true_np[b, valid, 0])
preds_L.extend(pred_np[b, valid, 1])
trues_L.extend(true_np[b, valid, 1])
preds_R, preds_L = np.array(preds_R), np.array(preds_L)
trues_R, trues_L = np.array(trues_R), np.array(trues_L)
def metrics(p, t):
mae = float(np.mean(np.abs(p - t)))
if np.std(p) < 1e-6 or np.std(t) < 1e-6:
r, r2 = 0.0, 0.0
else:
r = float(pearsonr(p, t)[0])
ss_res = float(((p - t) ** 2).sum())
ss_tot = float(((t - t.mean()) ** 2).sum())
r2 = 1.0 - ss_res / (ss_tot + 1e-8)
return {'mae_g': mae, 'pearson_r': r, 'r2': r2,
'mean_true_g': float(t.mean()),
'mean_pred_g': float(p.mean())}
return {
'loss': total_loss / max(n_frames, 1),
'right_hand': metrics(preds_R, trues_R),
'left_hand': metrics(preds_L, trues_L),
'avg_mae_g': 0.5 * (np.mean(np.abs(preds_R - trues_R)) +
np.mean(np.abs(preds_L - trues_L))),
'avg_pearson_r': 0.5 * (metrics(preds_R, trues_R)['pearson_r'] +
metrics(preds_L, trues_L)['pearson_r']),
}
def run_experiment(args):
set_seed(args.seed)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")
modalities = args.modalities.split(',')
print(f"Backbone: {args.backbone} | Modalities: {modalities} | Seed: {args.seed}")
print("Loading train...")
train_ds = GripForceDataset(TRAIN_VOLS, modalities, downsample=args.downsample,
log_target=args.log_target)
stats = train_ds.get_stats()
tstats = train_ds.get_target_stats()
print(f" target mean: {tstats[0].flatten()} std: {tstats[1].flatten()} "
f"(log_target={args.log_target})")
print("Loading test...")
test_ds = GripForceDataset(TEST_VOLS, modalities, downsample=args.downsample,
stats=stats, target_stats=tstats,
log_target=args.log_target)
train_loader = DataLoader(train_ds, batch_size=args.batch_size, shuffle=True,
collate_fn=regress_collate_fn, num_workers=0)
test_loader = DataLoader(test_ds, batch_size=args.batch_size, shuffle=False,
collate_fn=regress_collate_fn, num_workers=0)
model = GripRegressor(
args.backbone, train_ds.feat_dim, hidden_dim=args.hidden_dim,
output_dim=2, dropout=args.dropout,
).to(device)
n_params = sum(p.numel() for p in model.parameters())
print(f"Params: {n_params:,}")
optimizer = torch.optim.Adam(model.parameters(), lr=args.lr,
weight_decay=args.weight_decay)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6,
)
# Output dir
mod_str = '-'.join(modalities)
exp_name = f"grip_{args.backbone}_{mod_str}_seed{args.seed}"
if args.tag:
exp_name += f"_{args.tag}"
out_dir = os.path.join(args.output_dir, exp_name)
os.makedirs(out_dir, exist_ok=True)
best_test_mae = float('inf')
best_state = None
best_epoch = 0
patience_counter = 0
for epoch in range(1, args.epochs + 1):
t0 = time.time()
train_loss = train_one_epoch(model, train_loader, optimizer, device,
huber_delta=args.huber_delta)
m = evaluate(model, test_loader, device,
tstats[0], tstats[1], huber_delta=args.huber_delta,
log_target=args.log_target)
scheduler.step(m['loss'])
print(f" E{epoch:3d} | tr {train_loss:.4f} | "
f"te_loss {m['loss']:.4f} mae {m['avg_mae_g']:.2f}g "
f"r {m['avg_pearson_r']:.3f} | "
f"R: r={m['right_hand']['pearson_r']:.3f} r2={m['right_hand']['r2']:.3f} "
f"L: r={m['left_hand']['pearson_r']:.3f} r2={m['left_hand']['r2']:.3f} | "
f"{time.time()-t0:.1f}s")
# Early stopping on test MAE (test set acts as validation given no val split)
if m['avg_mae_g'] < best_test_mae:
best_test_mae = m['avg_mae_g']
best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
best_epoch = epoch
best_metrics = m
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= args.patience:
print(f" Early stop at epoch {epoch} (best {best_epoch})")
break
if best_state is not None:
torch.save(best_state, os.path.join(out_dir, 'model_best.pt'))
results = {
'experiment': exp_name,
'backbone': args.backbone,
'modalities': modalities,
'seed': args.seed,
'best_epoch': best_epoch,
'best_test_metrics': best_metrics,
'train_size': len(train_ds),
'test_size': len(test_ds),
'feat_dim': train_ds.feat_dim,
'modality_dims': train_ds.modality_dims,
'target_mean_g': tstats[0].flatten().tolist(),
'target_std_g': tstats[1].flatten().tolist(),
'args': vars(args),
}
with open(os.path.join(out_dir, 'results.json'), 'w') as f:
json.dump(results, f, indent=2)
print(f"Saved: {out_dir}/results.json")
return results
def main():
p = argparse.ArgumentParser()
p.add_argument('--backbone', type=str, default='transformer',
choices=['transformer', 'lstm', 'cnn'])
p.add_argument('--modalities', type=str, default='mocap,emg,eyetrack,imu')
p.add_argument('--epochs', type=int, default=60)
p.add_argument('--batch_size', type=int, default=8)
p.add_argument('--lr', type=float, default=1e-3)
p.add_argument('--weight_decay', type=float, default=1e-4)
p.add_argument('--hidden_dim', type=int, default=128)
p.add_argument('--dropout', type=float, default=0.2)
p.add_argument('--downsample', type=int, default=5)
p.add_argument('--patience', type=int, default=12)
p.add_argument('--huber_delta', type=float, default=1.0)
p.add_argument('--seed', type=int, default=42)
p.add_argument('--output_dir', type=str, required=True)
p.add_argument('--tag', type=str, default='')
p.add_argument('--log_target', action='store_true',
help='Use log1p(force) as regression target')
args = p.parse_args()
run_experiment(args)
if __name__ == '__main__':
main()