| """ |
| Population graph construction for subject-level GCN (Parisot et al. 2017/2018). |
| |
| Nodes = subjects |
| Edges = phenotypic similarity (sex match × age Gaussian kernel) |
| Features = PCA-reduced FC upper triangle, fitted on training subjects only |
| """ |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| from sklearn.decomposition import PCA |
| from sklearn.preprocessing import StandardScaler |
|
|
|
|
| |
| |
| |
|
|
| def load_phenotypic(pheno_csv: str | Path, processed_dir: str | Path) -> pd.DataFrame: |
| """Load ABIDE phenotypic CSV and filter to subjects with processed .npz files.""" |
| pheno = pd.read_csv(pheno_csv) |
| processed_dir = Path(processed_dir) |
|
|
| available = {int(p.stem) for p in processed_dir.glob("*.npz")} |
| pheno = pheno[pheno["SUB_ID"].isin(available)].copy().reset_index(drop=True) |
|
|
| |
| pheno["label"] = (pheno["DX_GROUP"] == 1).astype(int) |
| |
| pheno["sex_enc"] = (pheno["SEX"] == 2).astype(int) |
|
|
| return pheno |
|
|
|
|
| |
| |
| |
|
|
| def extract_fc_features(processed_dir: str | Path, subject_ids: list[int]) -> np.ndarray: |
| """Load upper-triangle FC for each subject. Returns (N, 19900) float32.""" |
| processed_dir = Path(processed_dir) |
| out = [] |
| for sid in subject_ids: |
| data = np.load(processed_dir / f"{sid}.npz", allow_pickle=True) |
| fc = data["mean_fc"].astype(np.float32) |
| r, c = np.triu_indices(fc.shape[0], k=1) |
| out.append(fc[r, c]) |
| return np.stack(out) |
|
|
|
|
| def harmonize_combat( |
| features: np.ndarray, |
| sites: list[str], |
| labels: np.ndarray, |
| ages: np.ndarray, |
| sexes: np.ndarray, |
| ) -> np.ndarray: |
| """ComBat site harmonization on FC upper triangle. |
| |
| Preserves biological signal (age, sex, diagnosis) while removing |
| scanner-specific batch effects — the dominant noise source in multi-site |
| fMRI (ABIDE has 17+ sites with different scanners and protocols). |
| """ |
| from neuroCombat import neuroCombat |
|
|
| |
| data_T = features.T |
| covars = pd.DataFrame({ |
| "site": sites, |
| "age": ages, |
| "sex": sexes, |
| "dx": labels, |
| }) |
| result = neuroCombat( |
| dat=data_T, |
| covars=covars, |
| batch_col="site", |
| continuous_cols=["age"], |
| categorical_cols=["sex", "dx"], |
| ) |
| return result["data"].T.astype(np.float32) |
|
|
|
|
| def fit_pca(train_feats: np.ndarray, n_components: int = 256) -> tuple[StandardScaler, PCA]: |
| """Fit StandardScaler + PCA on training features. Returns fitted objects.""" |
| scaler = StandardScaler() |
| train_scaled = scaler.fit_transform(train_feats) |
| n_comp = min(n_components, train_scaled.shape[0] - 1, train_scaled.shape[1]) |
| pca = PCA(n_components=n_comp, random_state=42) |
| pca.fit(train_scaled) |
| var = pca.explained_variance_ratio_.sum() |
| print(f"PCA {n_comp} components → {var:.1%} variance explained") |
| return scaler, pca |
|
|
|
|
| def apply_pca(feats: np.ndarray, scaler: StandardScaler, pca: PCA) -> np.ndarray: |
| return pca.transform(scaler.transform(feats)).astype(np.float32) |
|
|
|
|
| |
| |
| |
|
|
| def build_population_adj( |
| subject_df: pd.DataFrame, |
| threshold: float = 0.5, |
| age_sigma: float | None = None, |
| use_site: bool = False, |
| ) -> np.ndarray: |
| """Build N×N weighted adjacency from phenotypic similarity. |
| |
| Edge weight = sex_match * age_gaussian_sim (* site_match if use_site). |
| Edge exists only if weight > threshold. |
| |
| Parameters |
| ---------- |
| subject_df : DataFrame with columns sex_enc, AGE_AT_SCAN, SITE_ID |
| threshold : minimum similarity to keep an edge |
| age_sigma : std dev for Gaussian age kernel (default: std of ages) |
| use_site : include site-match as a multiplier (Parisot original) |
| Disable after ComBat since site effects are removed. |
| """ |
| N = len(subject_df) |
| ages = subject_df["AGE_AT_SCAN"].values.astype(np.float32) |
| sexes = subject_df["sex_enc"].values |
|
|
| if age_sigma is None: |
| age_sigma = float(np.std(ages)) |
|
|
| |
| diff = ages[:, None] - ages[None, :] |
| age_sim = np.exp(-diff**2 / (2 * age_sigma**2)) |
|
|
| |
| sex_sim = (sexes[:, None] == sexes[None, :]).astype(np.float32) |
|
|
| W = sex_sim * age_sim |
|
|
| if use_site: |
| sites = np.array(subject_df["SITE_ID"].tolist()) |
| site_sim = (sites[:, None] == sites[None, :]).astype(np.float32) |
| W = W * site_sim |
|
|
| adj = np.where(W > threshold, W, 0.0).astype(np.float32) |
| np.fill_diagonal(adj, 0.0) |
|
|
| n_edges = int((adj > 0).sum()) // 2 |
| density = n_edges / (N * (N - 1) / 2) |
| print(f"Population graph: {N} nodes, {n_edges} edges, {density:.1%} density") |
| return adj |
|
|
|
|
| def normalize_adj(adj: np.ndarray) -> np.ndarray: |
| """Symmetric normalization with self-loops: D^{-1/2}(A+I)D^{-1/2}.""" |
| A = adj + np.eye(adj.shape[0], dtype=np.float32) |
| d = A.sum(axis=1) |
| d_inv_sqrt = np.where(d > 0, 1.0 / np.sqrt(d), 0.0) |
| return (d_inv_sqrt[:, None] * A * d_inv_sqrt[None, :]).astype(np.float32) |
|
|