SID2000's picture
Upload folder using huggingface_hub
f2e4921 verified
"""Shared utilities for the CortexLab dashboard.
Provides synthetic data generation and analysis functions that mirror
CortexLab's API without requiring the full library or GPU.
"""
import numpy as np
from scipy.stats import spearmanr
from scipy.cluster.hierarchy import linkage, fcluster
# --- ROI Definitions ---
ROI_GROUPS = {
"Executive": ["46", "9-46d", "8Av", "8Ad", "FEF", "p32pr", "a32pr"],
"Visual": ["V1", "V2", "V3", "V4", "MT", "MST", "FFC", "VVC"],
"Auditory": ["A1", "LBelt", "MBelt", "PBelt", "A4", "A5"],
"Language": ["44", "45", "IFJa", "IFJp", "TPOJ1", "TPOJ2", "STV", "PSL"],
}
ALL_ROIS = [roi for group in ROI_GROUPS.values() for roi in group]
def make_roi_indices(n_vertices_per_roi=20):
"""Create ROI -> vertex index mapping."""
indices = {}
offset = 0
for roi in ALL_ROIS:
indices[roi] = np.arange(offset, offset + n_vertices_per_roi)
offset += n_vertices_per_roi
return indices, offset
# --- Brain Alignment ---
def compute_rdm(features):
norms = np.linalg.norm(features, axis=1, keepdims=True)
norms = np.where(norms > 0, norms, 1.0)
normalised = features / norms
return 1.0 - normalised @ normalised.T
def rsa_score(model_features, brain_features):
rdm_m = compute_rdm(model_features)
rdm_b = compute_rdm(brain_features)
idx = np.triu_indices(rdm_m.shape[0], k=1)
corr, _ = spearmanr(rdm_m[idx], rdm_b[idx])
return float(corr) if not np.isnan(corr) else 0.0
def cka_score(X, Y):
n = X.shape[0]
X = X - X.mean(axis=0)
Y = Y - Y.mean(axis=0)
XX = X @ X.T
YY = Y @ Y.T
hsic_xy = np.trace(XX @ YY) / (n - 1) ** 2
hsic_xx = np.trace(XX @ XX) / (n - 1) ** 2
hsic_yy = np.trace(YY @ YY) / (n - 1) ** 2
denom = np.sqrt(hsic_xx * hsic_yy)
return float(hsic_xy / denom) if denom > 1e-12 else 0.0
def procrustes_score(X, Y):
min_dim = min(X.shape[1], Y.shape[1])
X, Y = X[:, :min_dim], Y[:, :min_dim]
X = X - X.mean(axis=0)
Y = Y - Y.mean(axis=0)
nx, ny = np.linalg.norm(X), np.linalg.norm(Y)
if nx < 1e-12 or ny < 1e-12:
return 0.0
X, Y = X / nx, Y / ny
U, _, Vt = np.linalg.svd(Y.T @ X, full_matrices=False)
rotated = Y @ (U @ Vt)
return float(max(0.0, 1.0 - np.linalg.norm(X - rotated)))
ALIGNMENT_METHODS = {"RSA": rsa_score, "CKA": cka_score, "Procrustes": procrustes_score}
def permutation_test(model_feat, brain_pred, method_fn, n_perm=500, seed=42):
"""Returns (observed_score, p_value, null_distribution)."""
rng = np.random.default_rng(seed)
observed = method_fn(model_feat, brain_pred)
null_dist = []
for _ in range(n_perm):
perm_score = method_fn(model_feat[rng.permutation(len(model_feat))], brain_pred)
null_dist.append(perm_score)
null_dist = np.array(null_dist)
count = np.sum(null_dist >= observed)
p_value = (count + 1) / (n_perm + 1)
return observed, p_value, null_dist
def bootstrap_ci(model_feat, brain_pred, method_fn, n_boot=500, confidence=0.95, seed=42):
"""Returns (point_estimate, ci_lower, ci_upper)."""
rng = np.random.default_rng(seed)
n = model_feat.shape[0]
point = method_fn(model_feat, brain_pred)
scores = []
for _ in range(n_boot):
idx = rng.choice(n, size=n, replace=True)
scores.append(method_fn(model_feat[idx], brain_pred[idx]))
scores = np.array(scores)
alpha = 1 - confidence
return point, float(np.percentile(scores, 100 * alpha / 2)), float(np.percentile(scores, 100 * (1 - alpha / 2)))
def fdr_correction(p_values, alpha=0.05):
"""Benjamini-Hochberg FDR correction. Returns corrected p-values and significance mask."""
p = np.array(p_values)
n = len(p)
sorted_idx = np.argsort(p)
sorted_p = p[sorted_idx]
corrected = np.empty(n)
corrected[sorted_idx[-1]] = sorted_p[-1]
for i in range(n - 2, -1, -1):
corrected[sorted_idx[i]] = min(corrected[sorted_idx[i + 1]], sorted_p[i] * n / (i + 1))
return corrected, corrected < alpha
def noise_ceiling(brain_pred, method_fn, n_splits=20, seed=42):
"""Estimate noise ceiling via split-half reliability."""
rng = np.random.default_rng(seed)
n = brain_pred.shape[0]
scores = []
for _ in range(n_splits):
idx = rng.permutation(n)
half = n // 2
s = method_fn(brain_pred[idx[:half]], brain_pred[idx[half:half * 2]])
scores.append(s)
return float(np.mean(scores)), float(np.std(scores))
def partial_correlation(predictions, roi_indices):
"""Compute partial correlation matrix (correlation controlling for mean signal)."""
names = list(roi_indices.keys())
n = len(names)
T = predictions.shape[0]
timecourses = np.zeros((n, T))
for i, name in enumerate(names):
verts = roi_indices[name]
valid = verts[verts < predictions.shape[1]]
if len(valid) > 0:
timecourses[i] = predictions[:, valid].mean(axis=1)
# Partial correlation via precision matrix
cov = np.cov(timecourses)
try:
prec = np.linalg.inv(cov + 1e-6 * np.eye(n))
d = np.sqrt(np.diag(prec))
d[d == 0] = 1
partial = -prec / np.outer(d, d)
np.fill_diagonal(partial, 1.0)
except np.linalg.LinAlgError:
partial = np.eye(n)
return np.nan_to_num(partial, nan=0.0), names
def betweenness_centrality(corr_matrix, roi_names, threshold=0.3):
"""Compute betweenness centrality from thresholded connectivity."""
import networkx as nx
n = corr_matrix.shape[0]
G = nx.Graph()
for i, name in enumerate(roi_names):
G.add_node(name)
for i in range(n):
for j in range(i + 1, n):
if abs(corr_matrix[i, j]) > threshold:
G.add_edge(roi_names[i], roi_names[j], weight=abs(corr_matrix[i, j]))
bc = nx.betweenness_centrality(G)
return {name: bc.get(name, 0.0) for name in roi_names}
def modularity_score(corr_matrix, labels):
"""Compute Newman's modularity Q for a given partition."""
n = corr_matrix.shape[0]
adj = np.abs(corr_matrix).copy()
np.fill_diagonal(adj, 0)
m = adj.sum() / 2
if m == 0:
return 0.0
Q = 0.0
k = adj.sum(axis=1)
for i in range(n):
for j in range(n):
if labels[i] == labels[j]:
Q += adj[i, j] - k[i] * k[j] / (2 * m)
return float(Q / (2 * m))
# --- Cognitive Load ---
COGNITIVE_DIMENSIONS = {
"Executive Load": ["46", "9-46d", "8Av", "8Ad", "FEF", "p32pr", "a32pr"],
"Visual Complexity": ["V1", "V2", "V3", "V4", "MT", "MST", "FFC", "VVC"],
"Auditory Demand": ["A1", "LBelt", "MBelt", "PBelt", "A4", "A5"],
"Language Processing": ["44", "45", "IFJa", "IFJp", "TPOJ1", "TPOJ2", "STV", "PSL"],
}
def score_cognitive_load(predictions, roi_indices, tr_seconds=1.0):
baseline = max(float(np.median(np.abs(predictions))), 1e-8)
timeline = []
dim_scores = {d: [] for d in COGNITIVE_DIMENSIONS}
for t in range(predictions.shape[0]):
row = {}
for dim, rois in COGNITIVE_DIMENSIONS.items():
vals = []
for roi in rois:
if roi in roi_indices:
verts = roi_indices[roi]
valid = verts[verts < predictions.shape[1]]
if len(valid) > 0:
vals.append(np.abs(predictions[t, valid]).mean())
score = min(float(np.mean(vals)) / baseline, 1.0) if vals else 0.0
dim_scores[dim].append(score)
row[dim] = score
row["time"] = t * tr_seconds
timeline.append(row)
averages = {d: float(np.mean(v)) for d, v in dim_scores.items()}
averages["Overall"] = float(np.mean(list(averages.values())))
return averages, timeline
# --- Temporal Dynamics ---
def peak_latency(predictions, roi_indices, roi_name, tr_seconds=1.0):
verts = roi_indices.get(roi_name, np.array([]))
valid = verts[verts < predictions.shape[1]]
if len(valid) == 0:
return 0.0
tc = np.abs(predictions[:, valid]).mean(axis=1)
return float(np.argmax(tc) * tr_seconds)
def temporal_correlation(predictions, features, roi_indices, roi_name, max_lag=10):
verts = roi_indices.get(roi_name, np.array([]))
valid = verts[verts < predictions.shape[1]]
if len(valid) == 0:
return np.zeros(2 * max_lag + 1)
brain_tc = np.abs(predictions[:, valid]).mean(axis=1)
model_tc = features.mean(axis=1) if features.ndim > 1 else features
n = min(len(brain_tc), len(model_tc))
brain_tc, model_tc = brain_tc[:n], model_tc[:n]
corrs = []
for lag in range(-max_lag, max_lag + 1):
if lag >= 0:
b, m = brain_tc[lag:], model_tc[:n - lag]
else:
b, m = brain_tc[:n + lag], model_tc[-lag:]
if len(b) < 2:
corrs.append(0.0)
continue
bz, mz = b - b.mean(), m - m.mean()
denom = np.sqrt((bz ** 2).sum() * (mz ** 2).sum())
corrs.append(float((bz * mz).sum() / denom) if denom > 1e-12 else 0.0)
return np.array(corrs)
def decompose_response(predictions, roi_indices, roi_name, cutoff_seconds=4.0, tr_seconds=1.0):
verts = roi_indices.get(roi_name, np.array([]))
valid = verts[verts < predictions.shape[1]]
if len(valid) == 0:
return np.zeros(predictions.shape[0]), np.zeros(predictions.shape[0])
tc = np.abs(predictions[:, valid]).mean(axis=1)
window = max(1, int(cutoff_seconds / tr_seconds))
sustained = np.convolve(tc, np.ones(window) / window, mode="same")
return sustained, tc - sustained
# --- Connectivity ---
def compute_connectivity(predictions, roi_indices):
names = list(roi_indices.keys())
n = len(names)
T = predictions.shape[0]
timecourses = np.zeros((n, T))
for i, name in enumerate(names):
verts = roi_indices[name]
valid = verts[verts < predictions.shape[1]]
if len(valid) > 0:
timecourses[i] = predictions[:, valid].mean(axis=1)
corr = np.corrcoef(timecourses) if T >= 2 else np.eye(n)
return np.nan_to_num(corr, nan=0.0), names
def cluster_rois(corr_matrix, roi_names, n_clusters=4):
n = corr_matrix.shape[0]
n_clusters = min(n_clusters, n)
dist = 1.0 - np.abs(corr_matrix)
np.fill_diagonal(dist, 0.0)
condensed = [dist[i, j] for i in range(n) for j in range(i + 1, n)]
Z = linkage(condensed, method="average")
labels = fcluster(Z, t=n_clusters, criterion="maxclust")
clusters = {}
for name, cid in zip(roi_names, labels):
clusters.setdefault(int(cid), []).append(name)
return clusters, labels
def graph_metrics(corr_matrix, roi_names, threshold=0.3):
n = corr_matrix.shape[0]
adj = (np.abs(corr_matrix) > threshold).astype(float)
np.fill_diagonal(adj, 0.0)
degree = adj.sum(axis=1)
max_d = max(n - 1, 1)
return {name: float(degree[i] / max_d) for i, name in enumerate(roi_names)}
# --- Synthetic Data Generators ---
def generate_brain_predictions(n_timepoints=60, n_vertices=580, seed=42):
rng = np.random.default_rng(seed)
return rng.standard_normal((n_timepoints, n_vertices))
def generate_model_features(n_stimuli=60, feature_dim=512, seed=42):
rng = np.random.default_rng(seed)
return rng.standard_normal((n_stimuli, feature_dim))