| |
| """ |
| UFUSC: Unified Federated Unlearning via Sensitivity-Guided Contrastive Forgetting |
| |
| A complete self-contained implementation for the research paper: |
| "Sensitivity-Guided Contrastive Forgetting: Unified Label and Feature Unlearning |
| in Vertical Federated Learning" |
| |
| This script includes: |
| - VFL architecture (PassiveModel, ActiveModel, VFLFramework) |
| - 5 baselines (GradientAscent, Finetune, FisherForgetting, ManifoldMixup, Ferrari) |
| - UFUSC with 3 variants (Label Only, Feature Only, Joint) |
| - MIA attack evaluation |
| - Dataset loaders for MNIST, Fashion-MNIST, CIFAR-10 |
| - Ablation study runner |
| - Scalability analysis across K=2,3,4,6 passive parties |
| - Visualization code (bar charts, radar plots, ablation plots, scalability plots) |
| |
| Usage: |
| pip install torch torchvision numpy matplotlib seaborn pandas scikit-learn |
| python research_paper.py |
| |
| Author: UFUSC Research Team |
| """ |
|
|
| import os |
| import json |
| import time |
| import copy |
| import random |
| import warnings |
| from collections import defaultdict |
|
|
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import torch.optim as optim |
| from torch.utils.data import DataLoader, TensorDataset, Subset |
| import torchvision |
| import torchvision.transforms as transforms |
| from sklearn.metrics import accuracy_score, roc_auc_score |
|
|
| warnings.filterwarnings("ignore") |
|
|
| |
| |
| |
|
|
| SEED = 42 |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| NUM_PASSIVE_PARTIES = 2 |
| BATCH_SIZE = 256 |
| TRAIN_EPOCHS = 20 |
| UNLEARN_EPOCHS = 10 |
| LR = 0.001 |
| FORGET_RATIO = 0.1 |
|
|
| |
| ALPHA = 1.0 |
| BETA = 0.5 |
| GAMMA = 0.3 |
| OMEGA = 0.1 |
| TAU = 2.0 |
| SENSITIVITY_SIGMA = 0.01 |
| SENSITIVITY_SAMPLES = 5 |
|
|
| |
| os.makedirs("results", exist_ok=True) |
| os.makedirs("figures", exist_ok=True) |
|
|
|
|
| def set_seed(seed=SEED): |
| """Set all random seeds for reproducibility.""" |
| random.seed(seed) |
| np.random.seed(seed) |
| torch.manual_seed(seed) |
| if torch.cuda.is_available(): |
| torch.cuda.manual_seed_all(seed) |
| torch.backends.cudnn.deterministic = True |
| torch.backends.cudnn.benchmark = False |
|
|
|
|
| |
| |
| |
|
|
| def load_dataset(name="MNIST"): |
| """ |
| Load and preprocess a dataset. Returns flattened feature vectors for VFL. |
| |
| In VFL, each passive party holds a vertical partition of the features. |
| We flatten images and split feature columns across K parties. |
| |
| Args: |
| name: One of "MNIST", "Fashion-MNIST", "CIFAR-10" |
| |
| Returns: |
| (X_train, y_train, X_test, y_test, num_classes, feature_dim) |
| """ |
| data_dir = "./data" |
|
|
| if name == "MNIST": |
| transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]) |
| train_ds = torchvision.datasets.MNIST(data_dir, train=True, download=True, transform=transform) |
| test_ds = torchvision.datasets.MNIST(data_dir, train=False, download=True, transform=transform) |
| num_classes = 10 |
| elif name == "Fashion-MNIST": |
| transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.2860,), (0.3530,))]) |
| train_ds = torchvision.datasets.FashionMNIST(data_dir, train=True, download=True, transform=transform) |
| test_ds = torchvision.datasets.FashionMNIST(data_dir, train=False, download=True, transform=transform) |
| num_classes = 10 |
| elif name == "CIFAR-10": |
| transform = transforms.Compose([ |
| transforms.ToTensor(), |
| transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616)) |
| ]) |
| train_ds = torchvision.datasets.CIFAR10(data_dir, train=True, download=True, transform=transform) |
| test_ds = torchvision.datasets.CIFAR10(data_dir, train=False, download=True, transform=transform) |
| num_classes = 10 |
| else: |
| raise ValueError(f"Unknown dataset: {name}") |
|
|
| |
| X_train = torch.stack([train_ds[i][0] for i in range(len(train_ds))]).view(len(train_ds), -1) |
| y_train = torch.tensor([train_ds[i][1] for i in range(len(train_ds))]) |
| X_test = torch.stack([test_ds[i][0] for i in range(len(test_ds))]).view(len(test_ds), -1) |
| y_test = torch.tensor([test_ds[i][1] for i in range(len(test_ds))]) |
|
|
| feature_dim = X_train.shape[1] |
| print(f" [{name}] Train: {X_train.shape}, Test: {X_test.shape}, Classes: {num_classes}, Features: {feature_dim}") |
|
|
| return X_train, y_train, X_test, y_test, num_classes, feature_dim |
|
|
|
|
| def split_features_vfl(X, num_parties=NUM_PASSIVE_PARTIES): |
| """ |
| Split feature columns across K passive parties for VFL. |
| |
| Each party gets a disjoint subset of columns (vertical partition). |
| |
| Args: |
| X: (N, D) tensor of flattened features |
| num_parties: number of passive parties K |
| |
| Returns: |
| List of K tensors, each (N, D/K) approximately |
| """ |
| D = X.shape[1] |
| split_sizes = [D // num_parties] * num_parties |
| |
| for i in range(D % num_parties): |
| split_sizes[i] += 1 |
| return torch.split(X, split_sizes, dim=1) |
|
|
|
|
| def create_forget_retain_split(y, forget_class=0, forget_ratio=FORGET_RATIO): |
| """ |
| Create forget/retain index split. |
| |
| Selects a fraction of samples from the target class as the forget set. |
| All other samples form the retain set. |
| |
| Args: |
| y: label tensor |
| forget_class: which class to partially forget |
| forget_ratio: fraction of that class to forget |
| |
| Returns: |
| (forget_indices, retain_indices) |
| """ |
| class_indices = (y == forget_class).nonzero(as_tuple=True)[0] |
| num_forget = max(1, int(len(class_indices) * forget_ratio)) |
|
|
| perm = torch.randperm(len(class_indices)) |
| forget_indices = class_indices[perm[:num_forget]] |
|
|
| all_indices = torch.arange(len(y)) |
| mask = torch.ones(len(y), dtype=torch.bool) |
| mask[forget_indices] = False |
| retain_indices = all_indices[mask] |
|
|
| return forget_indices, retain_indices |
|
|
|
|
| |
| |
| |
|
|
| class PassiveModel(nn.Module): |
| """ |
| Passive party model in VFL. |
| |
| Each passive party holds a vertical partition of features and computes |
| a local embedding (forward representation) that is sent to the active party. |
| |
| Architecture: 2-layer MLP with ReLU and BatchNorm. |
| """ |
|
|
| def __init__(self, input_dim, embed_dim=64): |
| super().__init__() |
| hidden_dim = max(128, input_dim // 2) |
| self.net = nn.Sequential( |
| nn.Linear(input_dim, hidden_dim), |
| nn.BatchNorm1d(hidden_dim), |
| nn.ReLU(), |
| nn.Dropout(0.2), |
| nn.Linear(hidden_dim, embed_dim), |
| nn.BatchNorm1d(embed_dim), |
| nn.ReLU() |
| ) |
|
|
| def forward(self, x): |
| return self.net(x) |
|
|
|
|
| class ActiveModel(nn.Module): |
| """ |
| Active party model in VFL. |
| |
| The active party holds the labels and receives concatenated embeddings |
| from all passive parties. It performs final classification. |
| |
| Architecture: 2-layer MLP with ReLU, Dropout, and softmax output. |
| """ |
|
|
| def __init__(self, total_embed_dim, num_classes=10): |
| super().__init__() |
| self.net = nn.Sequential( |
| nn.Linear(total_embed_dim, 128), |
| nn.BatchNorm1d(128), |
| nn.ReLU(), |
| nn.Dropout(0.3), |
| nn.Linear(128, 64), |
| nn.ReLU(), |
| nn.Linear(64, num_classes) |
| ) |
|
|
| def forward(self, x): |
| return self.net(x) |
|
|
|
|
| class VFLFramework: |
| """ |
| Vertical Federated Learning framework. |
| |
| Manages K passive parties and 1 active party. Each passive party |
| computes embeddings from their feature partition, which are concatenated |
| and fed to the active party for classification. |
| |
| The active party holds labels and orchestrates training. |
| """ |
|
|
| def __init__(self, feature_dims, num_classes=10, embed_dim=64, |
| num_parties=NUM_PASSIVE_PARTIES, lr=LR): |
| """ |
| Args: |
| feature_dims: list of input dimensions for each passive party |
| num_classes: number of output classes |
| embed_dim: embedding dimension per passive party |
| num_parties: number of passive parties K |
| lr: learning rate |
| """ |
| self.num_parties = num_parties |
| self.embed_dim = embed_dim |
| self.num_classes = num_classes |
|
|
| |
| self.passive_models = [] |
| for i in range(num_parties): |
| model = PassiveModel(feature_dims[i], embed_dim).to(DEVICE) |
| self.passive_models.append(model) |
|
|
| |
| total_embed = embed_dim * num_parties |
| self.active_model = ActiveModel(total_embed, num_classes).to(DEVICE) |
|
|
| |
| all_params = [] |
| for pm in self.passive_models: |
| all_params += list(pm.parameters()) |
| all_params += list(self.active_model.parameters()) |
| self.optimizer = optim.Adam(all_params, lr=lr) |
| self.criterion = nn.CrossEntropyLoss() |
|
|
| def get_embeddings(self, X_splits): |
| """Compute embeddings from all passive parties and concatenate.""" |
| embeddings = [] |
| for i, pm in enumerate(self.passive_models): |
| emb = pm(X_splits[i].to(DEVICE)) |
| embeddings.append(emb) |
| return torch.cat(embeddings, dim=1) |
|
|
| def forward(self, X_splits): |
| """Full forward pass through VFL.""" |
| combined = self.get_embeddings(X_splits) |
| logits = self.active_model(combined) |
| return logits, combined |
|
|
| def train_model(self, X_train_splits, y_train, X_test_splits, y_test, |
| epochs=TRAIN_EPOCHS, verbose=True): |
| """ |
| Train the VFL model end-to-end. |
| |
| Args: |
| X_train_splits: list of K tensors (one per passive party) |
| y_train: training labels |
| X_test_splits: list of K test tensors |
| y_test: test labels |
| epochs: number of training epochs |
| verbose: print progress |
| """ |
| dataset = TensorDataset(*X_train_splits, y_train) |
| loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=False) |
|
|
| self.set_train() |
|
|
| for epoch in range(epochs): |
| total_loss = 0 |
| correct = 0 |
| total = 0 |
|
|
| for batch in loader: |
| *batch_splits, batch_y = batch |
| batch_y = batch_y.to(DEVICE) |
|
|
| logits, _ = self.forward(batch_splits) |
| loss = self.criterion(logits, batch_y) |
|
|
| self.optimizer.zero_grad() |
| loss.backward() |
| self.optimizer.step() |
|
|
| total_loss += loss.item() * batch_y.size(0) |
| preds = logits.argmax(dim=1) |
| correct += (preds == batch_y).sum().item() |
| total += batch_y.size(0) |
|
|
| if verbose and (epoch + 1) % 5 == 0: |
| train_acc = correct / total * 100 |
| test_acc = self.evaluate(X_test_splits, y_test) |
| print(f" Epoch {epoch+1}/{epochs} — Loss: {total_loss/total:.4f}, " |
| f"Train Acc: {train_acc:.2f}%, Test Acc: {test_acc:.2f}%") |
|
|
| def evaluate(self, X_splits, y, batch_size=512): |
| """Evaluate accuracy on given data.""" |
| self.set_eval() |
| dataset = TensorDataset(*X_splits, y) |
| loader = DataLoader(dataset, batch_size=batch_size, shuffle=False) |
|
|
| correct = 0 |
| total = 0 |
|
|
| with torch.no_grad(): |
| for batch in loader: |
| *batch_splits, batch_y = batch |
| batch_y = batch_y.to(DEVICE) |
| logits, _ = self.forward(batch_splits) |
| preds = logits.argmax(dim=1) |
| correct += (preds == batch_y).sum().item() |
| total += batch_y.size(0) |
|
|
| self.set_train() |
| return correct / total * 100 |
|
|
| def predict_proba(self, X_splits, batch_size=512): |
| """Get prediction probabilities.""" |
| self.set_eval() |
| dataset = TensorDataset(*X_splits) |
| loader = DataLoader(dataset, batch_size=batch_size, shuffle=False) |
|
|
| all_probs = [] |
| with torch.no_grad(): |
| for batch in loader: |
| logits, _ = self.forward(list(batch)) |
| probs = F.softmax(logits, dim=1) |
| all_probs.append(probs.cpu()) |
|
|
| self.set_train() |
| return torch.cat(all_probs, dim=0) |
|
|
| def set_train(self): |
| for pm in self.passive_models: |
| pm.train() |
| self.active_model.train() |
|
|
| def set_eval(self): |
| for pm in self.passive_models: |
| pm.eval() |
| self.active_model.eval() |
|
|
| def clone(self): |
| """Deep copy the entire VFL framework.""" |
| cloned = VFLFramework.__new__(VFLFramework) |
| cloned.num_parties = self.num_parties |
| cloned.embed_dim = self.embed_dim |
| cloned.num_classes = self.num_classes |
| cloned.passive_models = [copy.deepcopy(pm) for pm in self.passive_models] |
| cloned.active_model = copy.deepcopy(self.active_model) |
| cloned.criterion = nn.CrossEntropyLoss() |
|
|
| all_params = [] |
| for pm in cloned.passive_models: |
| all_params += list(pm.parameters()) |
| all_params += list(cloned.active_model.parameters()) |
| cloned.optimizer = optim.Adam(all_params, lr=LR) |
|
|
| return cloned |
|
|
|
|
| |
| |
| |
|
|
| def membership_inference_attack(model, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices): |
| """ |
| Simple Membership Inference Attack (MIA). |
| |
| Uses prediction confidence as a signal: members tend to have higher |
| confidence on the correct class. We compute the attack success rate (ASR) |
| on forget set members vs non-members. |
| |
| Lower ASR after unlearning → better privacy (model doesn't distinguish |
| members from non-members). |
| |
| Args: |
| model: VFLFramework |
| X_train_splits: training feature splits |
| y_train: training labels |
| X_test_splits: test feature splits |
| y_test: test labels |
| forget_indices: indices of forget set in training data |
| retain_indices: indices of retain set in training data |
| |
| Returns: |
| mia_asr: attack success rate (%) |
| """ |
| model.set_eval() |
|
|
| |
| forget_splits = [xs[forget_indices] for xs in X_train_splits] |
| forget_labels = y_train[forget_indices] |
| member_probs = model.predict_proba(forget_splits) |
| member_conf = member_probs[torch.arange(len(forget_labels)), forget_labels].numpy() |
|
|
| |
| forget_class = forget_labels[0].item() |
| test_class_mask = y_test == forget_class |
| if test_class_mask.sum() == 0: |
| return 50.0 |
|
|
| test_class_splits = [xs[test_class_mask] for xs in X_test_splits] |
| test_class_labels = y_test[test_class_mask] |
| nonmember_probs = model.predict_proba(test_class_splits) |
| nonmember_conf = nonmember_probs[torch.arange(len(test_class_labels)), test_class_labels].numpy() |
|
|
| |
| |
| all_conf = np.concatenate([member_conf, nonmember_conf]) |
| threshold = np.median(all_conf) |
|
|
| member_pred = (member_conf > threshold).astype(float) |
| nonmember_pred = (nonmember_conf <= threshold).astype(float) |
|
|
| |
| tpr = member_pred.mean() |
| tnr = nonmember_pred.mean() |
| mia_asr = (tpr + tnr) / 2 * 100 |
|
|
| model.set_train() |
| return mia_asr |
|
|
|
|
| def compute_feature_sensitivity(model, X_splits, sigma=SENSITIVITY_SIGMA, |
| n_samples=SENSITIVITY_SAMPLES): |
| """ |
| Compute Lipschitz-based feature sensitivity via Monte Carlo perturbation. |
| |
| Measures how much the model's output changes when input features are |
| perturbed by Gaussian noise. Lower sensitivity after unlearning means |
| the model is less responsive to the target features. |
| |
| Based on Ferrari (arxiv:2405.17462) Section 4. |
| |
| Args: |
| model: VFLFramework |
| X_splits: feature splits to perturb |
| sigma: std of Gaussian perturbation |
| n_samples: number of MC samples |
| |
| Returns: |
| mean_sensitivity: average sensitivity across samples and parties |
| """ |
| model.set_eval() |
| sensitivities = [] |
|
|
| |
| n = min(500, X_splits[0].shape[0]) |
| subset_splits = [xs[:n] for xs in X_splits] |
|
|
| with torch.no_grad(): |
| |
| logits_orig, _ = model.forward(subset_splits) |
| probs_orig = F.softmax(logits_orig, dim=1) |
|
|
| for _ in range(n_samples): |
| for party_idx in range(len(subset_splits)): |
| perturbed_splits = [xs.clone() for xs in subset_splits] |
| noise = torch.randn_like(perturbed_splits[party_idx]) * sigma |
| perturbed_splits[party_idx] = perturbed_splits[party_idx] + noise |
|
|
| logits_pert, _ = model.forward(perturbed_splits) |
| probs_pert = F.softmax(logits_pert, dim=1) |
|
|
| |
| diff = (probs_orig - probs_pert).norm(dim=1).mean().item() |
| sensitivities.append(diff) |
|
|
| model.set_train() |
| return np.mean(sensitivities) if sensitivities else 0.0 |
|
|
|
|
| def full_evaluation(model, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices, forget_class=0): |
| """ |
| Run full evaluation suite: test accuracy, forget accuracy, retain accuracy, |
| MIA ASR, and feature sensitivity. |
| """ |
| |
| test_acc = model.evaluate(X_test_splits, y_test) |
|
|
| |
| forget_splits = [xs[forget_indices] for xs in X_train_splits] |
| forget_labels = y_train[forget_indices] |
| forget_acc = model.evaluate(forget_splits, forget_labels) |
|
|
| |
| retain_splits = [xs[retain_indices] for xs in X_train_splits] |
| retain_labels = y_train[retain_indices] |
| retain_acc = model.evaluate(retain_splits, retain_labels) |
|
|
| |
| mia_asr = membership_inference_attack( |
| model, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices |
| ) |
|
|
| |
| feat_sens = compute_feature_sensitivity(model, forget_splits) |
|
|
| return { |
| "test_acc": round(test_acc, 2), |
| "forget_acc": round(forget_acc, 2), |
| "retain_acc": round(retain_acc, 2), |
| "mia_asr": round(mia_asr, 1), |
| "feature_sensitivity": round(feat_sens, 3) |
| } |
|
|
|
|
| |
| |
| |
|
|
| class GradientAscentUnlearning: |
| """ |
| Baseline 1: Gradient Ascent |
| |
| Maximizes the loss on the forget set to push the model away from |
| correctly classifying forgotten samples. Simple but can cause |
| catastrophic degradation of retain set performance. |
| |
| Reference: Graves et al. (2020), Thudi et al. (2022) |
| """ |
|
|
| def __init__(self, epochs=5, lr=0.01): |
| self.epochs = epochs |
| self.lr = lr |
|
|
| def unlearn(self, model, X_train_splits, y_train, forget_indices, retain_indices): |
| unlearned = model.clone() |
| forget_splits = [xs[forget_indices] for xs in X_train_splits] |
| forget_labels = y_train[forget_indices] |
|
|
| dataset = TensorDataset(*forget_splits, forget_labels) |
| loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True) |
|
|
| |
| all_params = [] |
| for pm in unlearned.passive_models: |
| all_params += list(pm.parameters()) |
| all_params += list(unlearned.active_model.parameters()) |
| optimizer = optim.SGD(all_params, lr=self.lr) |
|
|
| unlearned.set_train() |
| for epoch in range(self.epochs): |
| for batch in loader: |
| *batch_splits, batch_y = batch |
| batch_y = batch_y.to(DEVICE) |
|
|
| logits, _ = unlearned.forward(batch_splits) |
| loss = unlearned.criterion(logits, batch_y) |
|
|
| optimizer.zero_grad() |
| |
| (-loss).backward() |
| optimizer.step() |
|
|
| return unlearned |
|
|
|
|
| class FineTuneUnlearning: |
| """ |
| Baseline 2: Fine-tuning on Retain Set |
| |
| Simply fine-tunes the model on only the retain set, hoping the model |
| will "forget" the unlearned data. Often insufficient as the model |
| retains significant information about the forget set. |
| |
| Reference: Standard baseline in unlearning literature |
| """ |
|
|
| def __init__(self, epochs=10, lr=0.001): |
| self.epochs = epochs |
| self.lr = lr |
|
|
| def unlearn(self, model, X_train_splits, y_train, forget_indices, retain_indices): |
| unlearned = model.clone() |
| retain_splits = [xs[retain_indices] for xs in X_train_splits] |
| retain_labels = y_train[retain_indices] |
|
|
| dataset = TensorDataset(*retain_splits, retain_labels) |
| loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True) |
|
|
| all_params = [] |
| for pm in unlearned.passive_models: |
| all_params += list(pm.parameters()) |
| all_params += list(unlearned.active_model.parameters()) |
| optimizer = optim.Adam(all_params, lr=self.lr) |
|
|
| unlearned.set_train() |
| for epoch in range(self.epochs): |
| for batch in loader: |
| *batch_splits, batch_y = batch |
| batch_y = batch_y.to(DEVICE) |
|
|
| logits, _ = unlearned.forward(batch_splits) |
| loss = unlearned.criterion(logits, batch_y) |
|
|
| optimizer.zero_grad() |
| loss.backward() |
| optimizer.step() |
|
|
| return unlearned |
|
|
|
|
| class FisherForgetting: |
| """ |
| Baseline 3: Fisher Forgetting |
| |
| Uses the Fisher Information Matrix to identify which parameters are |
| most important for the forget set, then adds noise proportional to |
| the inverse Fisher to those parameters. This selectively "erases" |
| information about the forget set. |
| |
| Reference: Golatkar et al. (2020) "Eternal Sunshine of the Spotless Net" |
| """ |
|
|
| def __init__(self, noise_scale=0.01): |
| self.noise_scale = noise_scale |
|
|
| def unlearn(self, model, X_train_splits, y_train, forget_indices, retain_indices): |
| unlearned = model.clone() |
|
|
| forget_splits = [xs[forget_indices] for xs in X_train_splits] |
| forget_labels = y_train[forget_indices] |
|
|
| |
| unlearned.set_train() |
| fisher_diag = {} |
| for name, param in self._get_all_params(unlearned): |
| fisher_diag[name] = torch.zeros_like(param.data) |
|
|
| dataset = TensorDataset(*forget_splits, forget_labels) |
| loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=False) |
|
|
| for batch in loader: |
| *batch_splits, batch_y = batch |
| batch_y = batch_y.to(DEVICE) |
|
|
| logits, _ = unlearned.forward(batch_splits) |
| loss = unlearned.criterion(logits, batch_y) |
|
|
| unlearned.optimizer.zero_grad() |
| loss.backward() |
|
|
| for name, param in self._get_all_params(unlearned): |
| if param.grad is not None: |
| fisher_diag[name] += param.grad.data ** 2 |
|
|
| |
| n_batches = len(loader) |
| for name in fisher_diag: |
| fisher_diag[name] /= max(n_batches, 1) |
|
|
| |
| with torch.no_grad(): |
| for name, param in self._get_all_params(unlearned): |
| noise_std = self.noise_scale * (fisher_diag[name] + 1e-8).sqrt() |
| param.data += torch.randn_like(param.data) * noise_std |
|
|
| return unlearned |
|
|
| def _get_all_params(self, model): |
| """Get all named parameters from VFL framework.""" |
| params = [] |
| for i, pm in enumerate(model.passive_models): |
| for name, param in pm.named_parameters(): |
| params.append((f"passive_{i}.{name}", param)) |
| for name, param in model.active_model.named_parameters(): |
| params.append((f"active.{name}", param)) |
| return params |
|
|
|
|
| class ManifoldMixupUnlearning: |
| """ |
| Baseline 4: Manifold Mixup (Paper 1 - arxiv:2410.10922) |
| |
| Performs manifold mixup in the embedding space between forget set samples |
| and random noise/other class samples, combined with gradient ascent. |
| This disrupts the learned representations for the forget set. |
| |
| Adapted from: Bryan et al. (2024) "Towards Privacy-Guaranteed Label |
| Unlearning in Vertical Federated Learning" |
| """ |
|
|
| def __init__(self, epochs=10, lr=0.005, mixup_alpha=0.3): |
| self.epochs = epochs |
| self.lr = lr |
| self.mixup_alpha = mixup_alpha |
|
|
| def unlearn(self, model, X_train_splits, y_train, forget_indices, retain_indices): |
| unlearned = model.clone() |
|
|
| forget_splits = [xs[forget_indices] for xs in X_train_splits] |
| forget_labels = y_train[forget_indices] |
| retain_splits = [xs[retain_indices] for xs in X_train_splits] |
| retain_labels = y_train[retain_indices] |
|
|
| all_params = [] |
| for pm in unlearned.passive_models: |
| all_params += list(pm.parameters()) |
| all_params += list(unlearned.active_model.parameters()) |
| optimizer = optim.Adam(all_params, lr=self.lr) |
|
|
| unlearned.set_train() |
| for epoch in range(self.epochs): |
| |
| forget_emb = unlearned.get_embeddings(forget_splits) |
| |
| noise = torch.randn_like(forget_emb) |
| lam = np.random.beta(self.mixup_alpha, self.mixup_alpha) |
| mixed_emb = lam * forget_emb + (1 - lam) * noise |
|
|
| |
| logits_mixed = unlearned.active_model(mixed_emb) |
| loss_forget = unlearned.criterion(logits_mixed, forget_labels.to(DEVICE)) |
|
|
| |
| n_retain_batch = min(BATCH_SIZE, len(retain_labels)) |
| idx = torch.randperm(len(retain_labels))[:n_retain_batch] |
| retain_batch = [xs[idx] for xs in retain_splits] |
| retain_batch_y = retain_labels[idx].to(DEVICE) |
|
|
| logits_retain, _ = unlearned.forward(retain_batch) |
| loss_retain = unlearned.criterion(logits_retain, retain_batch_y) |
|
|
| |
| loss = loss_retain - 0.5 * loss_forget |
|
|
| optimizer.zero_grad() |
| loss.backward() |
| optimizer.step() |
|
|
| return unlearned |
|
|
|
|
| class FerrariUnlearning: |
| """ |
| Baseline 5: Ferrari (Paper 2 - arxiv:2405.17462) |
| |
| Minimizes feature sensitivity to target features via Lipschitz-based |
| optimization. Uses Monte Carlo perturbation to estimate sensitivity |
| and optimizes to reduce it. |
| |
| Adapted from: Ong et al. (2024) "Ferrari: Federated Feature Unlearning |
| via Optimizing Feature Sensitivity" |
| |
| Note: Original Ferrari is for HFL. We adapt it to VFL by applying |
| sensitivity minimization to the passive party that holds the target features. |
| """ |
|
|
| def __init__(self, epochs=15, lr=0.005, sigma=0.01, n_samples=5): |
| self.epochs = epochs |
| self.lr = lr |
| self.sigma = sigma |
| self.n_samples = n_samples |
|
|
| def unlearn(self, model, X_train_splits, y_train, forget_indices, retain_indices): |
| unlearned = model.clone() |
|
|
| forget_splits = [xs[forget_indices] for xs in X_train_splits] |
| forget_labels = y_train[forget_indices] |
| retain_splits = [xs[retain_indices] for xs in X_train_splits] |
| retain_labels = y_train[retain_indices] |
|
|
| all_params = [] |
| for pm in unlearned.passive_models: |
| all_params += list(pm.parameters()) |
| all_params += list(unlearned.active_model.parameters()) |
| optimizer = optim.Adam(all_params, lr=self.lr) |
|
|
| unlearned.set_train() |
| for epoch in range(self.epochs): |
| |
| sensitivity_loss = torch.tensor(0.0, device=DEVICE) |
|
|
| logits_orig, _ = unlearned.forward(forget_splits) |
| probs_orig = F.softmax(logits_orig, dim=1) |
|
|
| for _ in range(self.n_samples): |
| for party_idx in range(len(forget_splits)): |
| perturbed = [xs.clone() for xs in forget_splits] |
| noise = torch.randn_like(perturbed[party_idx]) * self.sigma |
| perturbed[party_idx] = perturbed[party_idx] + noise |
|
|
| logits_pert, _ = unlearned.forward(perturbed) |
| probs_pert = F.softmax(logits_pert, dim=1) |
|
|
| |
| diff = (probs_orig - probs_pert).norm(dim=1).mean() |
| sensitivity_loss = sensitivity_loss + diff |
|
|
| sensitivity_loss = sensitivity_loss / (self.n_samples * len(forget_splits)) |
|
|
| |
| n_retain_batch = min(BATCH_SIZE, len(retain_labels)) |
| idx = torch.randperm(len(retain_labels))[:n_retain_batch] |
| retain_batch = [xs[idx] for xs in retain_splits] |
| retain_batch_y = retain_labels[idx].to(DEVICE) |
|
|
| logits_retain, _ = unlearned.forward(retain_batch) |
| loss_retain = unlearned.criterion(logits_retain, retain_batch_y) |
|
|
| |
| loss = loss_retain + 2.0 * sensitivity_loss |
|
|
| optimizer.zero_grad() |
| loss.backward() |
| optimizer.step() |
|
|
| return unlearned |
|
|
|
|
| |
| |
| |
|
|
| class UFUSC: |
| """ |
| UFUSC: Unified Federated Unlearning via Sensitivity-Guided Contrastive Forgetting |
| |
| The FIRST framework to simultaneously handle BOTH label AND feature unlearning |
| in Vertical Federated Learning. |
| |
| Three components: |
| 1. Contrastive Forgetting Loss (CFL) — Pushes forget-set embeddings toward |
| random noise while anchoring retain-set embeddings to class centroids. |
| Operates in the joint embedding space for "deep forgetting" (not just |
| output-level like gradient ascent). |
| |
| 2. Lipschitz Feature Sensitivity Minimization — Monte Carlo perturbation-based |
| sensitivity estimation, extended to VFL. Minimizes the model's responsiveness |
| to features associated with the forget set. |
| |
| 3. Dual-Variable Certification — Primal-dual formulation that provides a |
| convergence-based forgetting guarantee. The dual variable λ adaptively |
| adjusts the forgetting pressure based on how well the current model |
| has forgotten. |
| |
| Loss function: |
| L = L_retain + α·L_CFL + β·L_sensitivity + γ·L_anchor + Ω·(τ - L_forget_CE) |
| |
| Variants: |
| - Label Only: Uses CFL + anchor (no sensitivity) |
| - Feature Only: Uses sensitivity + CFL (no anchor) |
| - Joint: All three components (full UFUSC) |
| """ |
|
|
| def __init__(self, mode="joint", alpha=ALPHA, beta=BETA, gamma=GAMMA, |
| omega=OMEGA, tau=TAU, epochs=UNLEARN_EPOCHS, lr=0.005, |
| sigma=SENSITIVITY_SIGMA, n_mc_samples=SENSITIVITY_SAMPLES): |
| """ |
| Args: |
| mode: "label_only", "feature_only", or "joint" |
| alpha: weight for Contrastive Forgetting Loss |
| beta: weight for Feature Sensitivity Loss |
| gamma: weight for Anchor Loss (retain embedding stability) |
| omega: weight for dual-variable certification constraint |
| tau: forgetting threshold for certification |
| epochs: number of unlearning epochs |
| lr: learning rate for unlearning |
| sigma: std for MC perturbation (feature sensitivity) |
| n_mc_samples: number of MC samples for sensitivity |
| """ |
| assert mode in ["label_only", "feature_only", "joint"] |
| self.mode = mode |
| self.alpha = alpha |
| self.beta = beta |
| self.gamma = gamma |
| self.omega = omega |
| self.tau = tau |
| self.epochs = epochs |
| self.lr = lr |
| self.sigma = sigma |
| self.n_mc_samples = n_mc_samples |
|
|
| def compute_class_centroids(self, model, X_splits, y, num_classes): |
| """ |
| Compute class centroids in the joint embedding space. |
| |
| These serve as "anchor points" — retain-set embeddings should |
| stay close to their class centroid during unlearning. |
| """ |
| model.set_eval() |
| with torch.no_grad(): |
| embeddings = model.get_embeddings(X_splits) |
|
|
| centroids = {} |
| for c in range(num_classes): |
| mask = (y == c) |
| if mask.sum() > 0: |
| centroids[c] = embeddings[mask].mean(dim=0).detach() |
| else: |
| centroids[c] = torch.zeros(embeddings.shape[1], device=DEVICE) |
|
|
| model.set_train() |
| return centroids |
|
|
| def contrastive_forgetting_loss(self, model, forget_splits, forget_labels, |
| centroids, num_classes): |
| """ |
| Contrastive Forgetting Loss (CFL). |
| |
| Pushes forget-set embeddings AWAY from their true class centroids |
| and TOWARD random noise. This disrupts the learned representations |
| at the embedding level, achieving "deep forgetting." |
| |
| L_CFL = -||e_forget - c_true||^2 + ||e_forget - noise||^2 |
| |
| The first term pushes embeddings away from the correct centroid. |
| The second term pulls embeddings toward meaningless random noise. |
| """ |
| forget_emb = model.get_embeddings(forget_splits) |
|
|
| |
| repulsion_loss = torch.tensor(0.0, device=DEVICE) |
| for i in range(len(forget_labels)): |
| c = forget_labels[i].item() |
| if c in centroids: |
| dist = (forget_emb[i] - centroids[c]).norm() |
| repulsion_loss = repulsion_loss - dist |
|
|
| repulsion_loss = repulsion_loss / max(len(forget_labels), 1) |
|
|
| |
| noise_target = torch.randn_like(forget_emb) |
| attraction_loss = (forget_emb - noise_target).norm(dim=1).mean() |
|
|
| return repulsion_loss + 0.5 * attraction_loss |
|
|
| def feature_sensitivity_loss(self, model, forget_splits): |
| """ |
| Lipschitz Feature Sensitivity Loss. |
| |
| Measures and minimizes the model's sensitivity to features in the |
| forget set via Monte Carlo perturbation. Extended from Ferrari to VFL. |
| |
| For each passive party's features: |
| S = E[||f(x) - f(x + δ)|| / ||δ||] where δ ~ N(0, σ²I) |
| |
| We minimize S to make the model "insensitive" to forget-set features. |
| """ |
| sensitivity = torch.tensor(0.0, device=DEVICE) |
|
|
| logits_orig, _ = model.forward(forget_splits) |
| probs_orig = F.softmax(logits_orig, dim=1) |
|
|
| for _ in range(self.n_mc_samples): |
| for party_idx in range(len(forget_splits)): |
| perturbed = [xs.clone() for xs in forget_splits] |
| noise = torch.randn_like(perturbed[party_idx]) * self.sigma |
| perturbed[party_idx] = perturbed[party_idx] + noise |
|
|
| logits_pert, _ = model.forward(perturbed) |
| probs_pert = F.softmax(logits_pert, dim=1) |
|
|
| diff = (probs_orig - probs_pert).norm(dim=1).mean() |
| sensitivity = sensitivity + diff |
|
|
| sensitivity = sensitivity / (self.n_mc_samples * len(forget_splits)) |
| return sensitivity |
|
|
| def anchor_loss(self, model, retain_splits, retain_labels, centroids): |
| """ |
| Anchor Loss. |
| |
| Ensures retain-set embeddings stay close to their class centroids |
| during unlearning. This prevents "catastrophic forgetting" of |
| the retain set while aggressively unlearning the forget set. |
| |
| L_anchor = E[||e_retain - c_class||^2] |
| """ |
| retain_emb = model.get_embeddings(retain_splits) |
|
|
| loss = torch.tensor(0.0, device=DEVICE) |
| for i in range(len(retain_labels)): |
| c = retain_labels[i].item() |
| if c in centroids: |
| loss = loss + (retain_emb[i] - centroids[c]).norm() ** 2 |
|
|
| return loss / max(len(retain_labels), 1) |
|
|
| def dual_variable_certification(self, model, forget_splits, forget_labels): |
| """ |
| Dual-Variable Certification. |
| |
| Primal-dual formulation that provides a convergence-based forgetting |
| guarantee. The constraint is: |
| |
| L_forget_CE ≥ τ (cross-entropy on forget set should be HIGH) |
| |
| We enforce this via: |
| Ω · max(0, τ - L_forget_CE) |
| |
| When the forget CE is below τ, this adds pressure to increase it. |
| When it's above τ, this term vanishes (constraint satisfied). |
| |
| Inspired by FedORA (arxiv:2512.23171). |
| """ |
| logits, _ = model.forward(forget_splits) |
| forget_ce = model.criterion(logits, forget_labels.to(DEVICE)) |
|
|
| |
| violation = F.relu(self.tau - forget_ce) |
| return self.omega * violation |
|
|
| def unlearn(self, model, X_train_splits, y_train, forget_indices, retain_indices, |
| num_classes=10): |
| """ |
| Execute UFUSC unlearning. |
| |
| Args: |
| model: trained VFLFramework |
| X_train_splits: list of K feature tensors |
| y_train: training labels |
| forget_indices: indices of forget set |
| retain_indices: indices of retain set |
| num_classes: number of classes |
| |
| Returns: |
| unlearned VFLFramework |
| """ |
| unlearned = model.clone() |
|
|
| forget_splits = [xs[forget_indices] for xs in X_train_splits] |
| forget_labels = y_train[forget_indices] |
| retain_splits = [xs[retain_indices] for xs in X_train_splits] |
| retain_labels = y_train[retain_indices] |
|
|
| |
| centroids = self.compute_class_centroids( |
| unlearned, [xs[retain_indices] for xs in X_train_splits], |
| retain_labels, num_classes |
| ) |
|
|
| all_params = [] |
| for pm in unlearned.passive_models: |
| all_params += list(pm.parameters()) |
| all_params += list(unlearned.active_model.parameters()) |
| optimizer = optim.Adam(all_params, lr=self.lr) |
|
|
| unlearned.set_train() |
| for epoch in range(self.epochs): |
| total_loss = torch.tensor(0.0, device=DEVICE) |
|
|
| |
| n_retain_batch = min(BATCH_SIZE, len(retain_labels)) |
| idx = torch.randperm(len(retain_labels))[:n_retain_batch] |
| retain_batch = [xs[idx] for xs in retain_splits] |
| retain_batch_y = retain_labels[idx].to(DEVICE) |
|
|
| logits_retain, _ = unlearned.forward(retain_batch) |
| loss_retain = unlearned.criterion(logits_retain, retain_batch_y) |
| total_loss = total_loss + loss_retain |
|
|
| |
| if self.mode in ["label_only", "joint"]: |
| cfl = self.contrastive_forgetting_loss( |
| unlearned, forget_splits, forget_labels, centroids, num_classes |
| ) |
| total_loss = total_loss + self.alpha * cfl |
|
|
| if self.mode in ["feature_only", "joint"]: |
| cfl_feat = self.contrastive_forgetting_loss( |
| unlearned, forget_splits, forget_labels, centroids, num_classes |
| ) |
| total_loss = total_loss + self.alpha * 0.5 * cfl_feat |
|
|
| |
| if self.mode in ["feature_only", "joint"]: |
| sens = self.feature_sensitivity_loss(unlearned, forget_splits) |
| total_loss = total_loss + self.beta * sens |
|
|
| |
| if self.mode in ["label_only", "joint"]: |
| anc = self.anchor_loss( |
| unlearned, retain_batch, retain_batch_y, centroids |
| ) |
| total_loss = total_loss + self.gamma * anc |
|
|
| |
| cert = self.dual_variable_certification( |
| unlearned, forget_splits, forget_labels |
| ) |
| total_loss = total_loss + cert |
|
|
| optimizer.zero_grad() |
| total_loss.backward() |
| |
| torch.nn.utils.clip_grad_norm_(all_params, max_norm=5.0) |
| optimizer.step() |
|
|
| return unlearned |
|
|
|
|
| |
| |
| |
|
|
| def run_single_experiment(dataset_name, num_parties=NUM_PASSIVE_PARTIES, verbose=True): |
| """ |
| Run complete experiment for one dataset. |
| |
| Steps: |
| 1. Load dataset |
| 2. Split features across K passive parties (VFL) |
| 3. Train VFL model |
| 4. Create forget/retain split |
| 5. Evaluate original model |
| 6. Run all 5 baselines |
| 7. Run 3 UFUSC variants |
| 8. Return all results |
| |
| Args: |
| dataset_name: "MNIST", "Fashion-MNIST", or "CIFAR-10" |
| num_parties: number of passive parties |
| verbose: print progress |
| |
| Returns: |
| list of result dicts |
| """ |
| set_seed() |
| print(f"\n{'='*70}") |
| print(f" EXPERIMENT: {dataset_name} (K={num_parties} parties)") |
| print(f"{'='*70}") |
|
|
| |
| print("\n[1/8] Loading dataset...") |
| X_train, y_train, X_test, y_test, num_classes, feature_dim = load_dataset(dataset_name) |
|
|
| |
| print("[2/8] Splitting features for VFL...") |
| X_train_splits = list(split_features_vfl(X_train, num_parties)) |
| X_test_splits = list(split_features_vfl(X_test, num_parties)) |
| feature_dims = [xs.shape[1] for xs in X_train_splits] |
| print(f" Party feature dims: {feature_dims}") |
|
|
| |
| print("[3/8] Training VFL model...") |
| model = VFLFramework(feature_dims, num_classes, num_parties=num_parties) |
| model.train_model(X_train_splits, y_train, X_test_splits, y_test, epochs=TRAIN_EPOCHS) |
|
|
| |
| print("[4/8] Creating forget/retain split...") |
| forget_class = 0 |
| forget_indices, retain_indices = create_forget_retain_split( |
| y_train, forget_class=forget_class, forget_ratio=FORGET_RATIO |
| ) |
| print(f" Forget set: {len(forget_indices)} samples (class {forget_class})") |
| print(f" Retain set: {len(retain_indices)} samples") |
|
|
| |
| print("[5/8] Evaluating original model...") |
| original_metrics = full_evaluation( |
| model, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices, forget_class |
| ) |
| original_metrics["method"] = "Original (No Unlearn)" |
| original_metrics["time_seconds"] = 0 |
| print(f" Original: {original_metrics}") |
|
|
| results = [original_metrics] |
|
|
| |
| baselines = [ |
| ("Gradient Ascent", GradientAscentUnlearning(epochs=5, lr=0.01)), |
| ("Fine-tuning", FineTuneUnlearning(epochs=10, lr=0.001)), |
| ("Fisher Forgetting", FisherForgetting(noise_scale=0.01)), |
| ("Manifold Mixup (P1)", ManifoldMixupUnlearning(epochs=10, lr=0.005)), |
| ("Ferrari (P2)", FerrariUnlearning(epochs=15, lr=0.005)), |
| ] |
|
|
| print("[6/8] Running baselines...") |
| for name, method in baselines: |
| print(f" Running {name}...") |
| t0 = time.time() |
| unlearned = method.unlearn(model, X_train_splits, y_train, forget_indices, retain_indices) |
| elapsed = time.time() - t0 |
|
|
| metrics = full_evaluation( |
| unlearned, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices, forget_class |
| ) |
| metrics["method"] = name |
| metrics["time_seconds"] = round(elapsed, 2) |
| results.append(metrics) |
| print(f" {name}: Forget={metrics['forget_acc']:.1f}%, " |
| f"Retain={metrics['retain_acc']:.1f}%, MIA={metrics['mia_asr']:.1f}%") |
|
|
| |
| print("[7/8] Running UFUSC variants...") |
| ufusc_variants = [ |
| ("UFUSC (Label Only)", UFUSC(mode="label_only", epochs=UNLEARN_EPOCHS)), |
| ("UFUSC (Feature Only)", UFUSC(mode="feature_only", epochs=UNLEARN_EPOCHS)), |
| ("UFUSC (Joint)", UFUSC(mode="joint", epochs=UNLEARN_EPOCHS)), |
| ] |
|
|
| for name, method in ufusc_variants: |
| print(f" Running {name}...") |
| t0 = time.time() |
| unlearned = method.unlearn( |
| model, X_train_splits, y_train, forget_indices, retain_indices, |
| num_classes=num_classes |
| ) |
| elapsed = time.time() - t0 |
|
|
| metrics = full_evaluation( |
| unlearned, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices, forget_class |
| ) |
| metrics["method"] = name |
| metrics["time_seconds"] = round(elapsed, 2) |
| results.append(metrics) |
| print(f" {name}: Forget={metrics['forget_acc']:.1f}%, " |
| f"Retain={metrics['retain_acc']:.1f}%, MIA={metrics['mia_asr']:.1f}%") |
|
|
| |
| print(f"\n[8/8] {dataset_name} Summary:") |
| print(f" {'Method':<25} {'Test':>8} {'Forget':>8} {'Retain':>8} {'MIA':>8} {'Sens':>8}") |
| print(f" {'-'*73}") |
| for r in results: |
| print(f" {r['method']:<25} {r['test_acc']:>7.2f}% {r['forget_acc']:>7.2f}% " |
| f"{r['retain_acc']:>7.2f}% {r['mia_asr']:>7.1f}% {r['feature_sensitivity']:>7.3f}") |
|
|
| return results |
|
|
|
|
| |
| |
| |
|
|
| def run_ablation_study(dataset_name="MNIST"): |
| """ |
| Ablation study on UFUSC hyperparameters: α, β, γ, and unlearning epochs. |
| |
| Tests the impact of each component by varying one hyperparameter |
| while keeping others at their default values. |
| |
| Returns: |
| list of ablation result dicts |
| """ |
| set_seed() |
| print(f"\n{'='*70}") |
| print(f" ABLATION STUDY: {dataset_name}") |
| print(f"{'='*70}") |
|
|
| |
| X_train, y_train, X_test, y_test, num_classes, feature_dim = load_dataset(dataset_name) |
| X_train_splits = list(split_features_vfl(X_train)) |
| X_test_splits = list(split_features_vfl(X_test)) |
| feature_dims = [xs.shape[1] for xs in X_train_splits] |
|
|
| model = VFLFramework(feature_dims, num_classes) |
| model.train_model(X_train_splits, y_train, X_test_splits, y_test, epochs=TRAIN_EPOCHS, verbose=False) |
|
|
| forget_indices, retain_indices = create_forget_retain_split(y_train) |
|
|
| ablation_results = [] |
|
|
| |
| print("\n Ablation: α (CFL weight)") |
| for alpha_val in [0.0, 0.5, 1.0, 2.0, 5.0]: |
| method = UFUSC(mode="joint", alpha=alpha_val, beta=BETA, gamma=GAMMA, epochs=UNLEARN_EPOCHS) |
| unlearned = method.unlearn(model, X_train_splits, y_train, forget_indices, retain_indices, num_classes) |
| metrics = full_evaluation(unlearned, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices) |
| metrics["ablation_param"] = "alpha" |
| metrics["ablation_value"] = alpha_val |
| ablation_results.append(metrics) |
| print(f" α={alpha_val}: Forget={metrics['forget_acc']:.1f}%, Retain={metrics['retain_acc']:.1f}%") |
|
|
| |
| print("\n Ablation: β (Sensitivity weight)") |
| for beta_val in [0.0, 0.25, 0.5, 1.0, 2.0]: |
| method = UFUSC(mode="joint", alpha=ALPHA, beta=beta_val, gamma=GAMMA, epochs=UNLEARN_EPOCHS) |
| unlearned = method.unlearn(model, X_train_splits, y_train, forget_indices, retain_indices, num_classes) |
| metrics = full_evaluation(unlearned, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices) |
| metrics["ablation_param"] = "beta" |
| metrics["ablation_value"] = beta_val |
| ablation_results.append(metrics) |
| print(f" β={beta_val}: Forget={metrics['forget_acc']:.1f}%, Retain={metrics['retain_acc']:.1f}%") |
|
|
| |
| print("\n Ablation: γ (Anchor weight)") |
| for gamma_val in [0.0, 0.1, 0.3, 0.5, 1.0]: |
| method = UFUSC(mode="joint", alpha=ALPHA, beta=BETA, gamma=gamma_val, epochs=UNLEARN_EPOCHS) |
| unlearned = method.unlearn(model, X_train_splits, y_train, forget_indices, retain_indices, num_classes) |
| metrics = full_evaluation(unlearned, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices) |
| metrics["ablation_param"] = "gamma" |
| metrics["ablation_value"] = gamma_val |
| ablation_results.append(metrics) |
| print(f" γ={gamma_val}: Forget={metrics['forget_acc']:.1f}%, Retain={metrics['retain_acc']:.1f}%") |
|
|
| |
| print("\n Ablation: Unlearning epochs") |
| for ep in [1, 5, 10, 15, 20]: |
| method = UFUSC(mode="joint", alpha=ALPHA, beta=BETA, gamma=GAMMA, epochs=ep) |
| unlearned = method.unlearn(model, X_train_splits, y_train, forget_indices, retain_indices, num_classes) |
| metrics = full_evaluation(unlearned, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices) |
| metrics["ablation_param"] = "epochs" |
| metrics["ablation_value"] = ep |
| ablation_results.append(metrics) |
| print(f" epochs={ep}: Forget={metrics['forget_acc']:.1f}%, Retain={metrics['retain_acc']:.1f}%") |
|
|
| return ablation_results |
|
|
|
|
| |
| |
| |
|
|
| def run_scalability_analysis(dataset_name="MNIST"): |
| """ |
| Scalability analysis: test UFUSC with varying number of passive parties K. |
| |
| Tests K = 2, 3, 4, 6 to see how the method scales in VFL settings |
| with different numbers of data holders. |
| |
| Returns: |
| list of scalability result dicts |
| """ |
| set_seed() |
| print(f"\n{'='*70}") |
| print(f" SCALABILITY ANALYSIS: {dataset_name}") |
| print(f"{'='*70}") |
|
|
| X_train, y_train, X_test, y_test, num_classes, feature_dim = load_dataset(dataset_name) |
|
|
| scalability_results = [] |
|
|
| for K in [2, 3, 4, 6]: |
| print(f"\n K={K} parties...") |
| X_train_splits = list(split_features_vfl(X_train, K)) |
| X_test_splits = list(split_features_vfl(X_test, K)) |
| feature_dims = [xs.shape[1] for xs in X_train_splits] |
|
|
| model = VFLFramework(feature_dims, num_classes, num_parties=K) |
| model.train_model(X_train_splits, y_train, X_test_splits, y_test, |
| epochs=TRAIN_EPOCHS, verbose=False) |
|
|
| forget_indices, retain_indices = create_forget_retain_split(y_train) |
|
|
| |
| orig_metrics = full_evaluation(model, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices) |
|
|
| |
| ufusc = UFUSC(mode="joint", epochs=UNLEARN_EPOCHS) |
| t0 = time.time() |
| unlearned = ufusc.unlearn(model, X_train_splits, y_train, forget_indices, retain_indices, num_classes) |
| elapsed = time.time() - t0 |
|
|
| ufusc_metrics = full_evaluation(unlearned, X_train_splits, y_train, X_test_splits, y_test, |
| forget_indices, retain_indices) |
|
|
| result = { |
| "K": K, |
| "original_test_acc": orig_metrics["test_acc"], |
| "original_forget_acc": orig_metrics["forget_acc"], |
| "ufusc_test_acc": ufusc_metrics["test_acc"], |
| "ufusc_forget_acc": ufusc_metrics["forget_acc"], |
| "ufusc_retain_acc": ufusc_metrics["retain_acc"], |
| "ufusc_mia_asr": ufusc_metrics["mia_asr"], |
| "time_seconds": round(elapsed, 2) |
| } |
| scalability_results.append(result) |
| print(f" K={K}: Original Test={orig_metrics['test_acc']:.1f}%, " |
| f"UFUSC Forget={ufusc_metrics['forget_acc']:.1f}%, " |
| f"Retain={ufusc_metrics['retain_acc']:.1f}%, Time={elapsed:.1f}s") |
|
|
| return scalability_results |
|
|
|
|
| |
| |
| |
|
|
| def create_visualizations(all_results, ablation_results=None, scalability_results=None): |
| """ |
| Create all publication-quality figures. |
| |
| Generates: |
| - Comparison bar charts (1 per dataset) |
| - Radar plots (1 per dataset) |
| - Ablation study plot |
| - Scalability analysis plot |
| - Privacy-utility tradeoff plots (1 per dataset) |
| """ |
| try: |
| import matplotlib |
| matplotlib.use('Agg') |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| sns.set_theme(style="whitegrid") |
| except ImportError: |
| print("WARNING: matplotlib/seaborn not available. Skipping visualization.") |
| return |
|
|
| colors = { |
| "Original (No Unlearn)": "#95a5a6", |
| "Gradient Ascent": "#e74c3c", |
| "Fine-tuning": "#e67e22", |
| "Fisher Forgetting": "#f39c12", |
| "Manifold Mixup (P1)": "#27ae60", |
| "Ferrari (P2)": "#2980b9", |
| "UFUSC (Label Only)": "#8e44ad", |
| "UFUSC (Feature Only)": "#1abc9c", |
| "UFUSC (Joint)": "#c0392b", |
| } |
|
|
| |
| for dataset_name, results in all_results.items(): |
| fig, axes = plt.subplots(1, 3, figsize=(18, 6)) |
| fig.suptitle(f"{dataset_name} — Unlearning Method Comparison", fontsize=16, fontweight='bold') |
|
|
| methods = [r["method"] for r in results] |
| method_colors = [colors.get(m, "#333333") for m in methods] |
|
|
| |
| vals = [r["forget_acc"] for r in results] |
| axes[0].barh(methods, vals, color=method_colors) |
| axes[0].set_xlabel("Forget Accuracy (%) ↓") |
| axes[0].set_title("Forgetting Quality") |
| axes[0].invert_yaxis() |
|
|
| |
| vals = [r["retain_acc"] for r in results] |
| axes[1].barh(methods, vals, color=method_colors) |
| axes[1].set_xlabel("Retain Accuracy (%) ↑") |
| axes[1].set_title("Utility Preservation") |
| axes[1].invert_yaxis() |
|
|
| |
| vals = [r["mia_asr"] for r in results] |
| axes[2].barh(methods, vals, color=method_colors) |
| axes[2].set_xlabel("MIA ASR (%) ↓") |
| axes[2].set_title("Privacy Protection") |
| axes[2].axvline(x=50, color='red', linestyle='--', alpha=0.5, label='Random (50%)') |
| axes[2].invert_yaxis() |
| axes[2].legend() |
|
|
| plt.tight_layout() |
| plt.savefig(f"figures/{dataset_name.replace('-', '_')}_comparison.png", dpi=150, bbox_inches='tight') |
| plt.close() |
| print(f" Saved: figures/{dataset_name.replace('-', '_')}_comparison.png") |
|
|
| |
| for dataset_name, results in all_results.items(): |
| |
| key_methods = ["Gradient Ascent", "Manifold Mixup (P1)", "Ferrari (P2)", "UFUSC (Joint)"] |
| key_results = [r for r in results if r["method"] in key_methods] |
|
|
| if len(key_results) < 2: |
| continue |
|
|
| categories = ["Retain Acc", "1 - Forget Acc", "1 - MIA ASR", "Low Sensitivity"] |
| N = len(categories) |
| angles = [n / float(N) * 2 * np.pi for n in range(N)] |
| angles += angles[:1] |
|
|
| fig, ax = plt.subplots(figsize=(8, 8), subplot_kw=dict(polar=True)) |
| ax.set_title(f"{dataset_name} — Method Radar Comparison", fontsize=14, fontweight='bold', pad=20) |
|
|
| for r in key_results: |
| values = [ |
| r["retain_acc"] / 100, |
| (100 - r["forget_acc"]) / 100, |
| (100 - r["mia_asr"]) / 100, |
| max(0, 1 - r["feature_sensitivity"]), |
| ] |
| values += values[:1] |
| color = colors.get(r["method"], "#333333") |
| ax.plot(angles, values, 'o-', linewidth=2, label=r["method"], color=color) |
| ax.fill(angles, values, alpha=0.1, color=color) |
|
|
| ax.set_xticks(angles[:-1]) |
| ax.set_xticklabels(categories) |
| ax.set_ylim(0, 1) |
| ax.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1)) |
|
|
| plt.tight_layout() |
| plt.savefig(f"figures/{dataset_name.replace('-', '_')}_radar.png", dpi=150, bbox_inches='tight') |
| plt.close() |
| print(f" Saved: figures/{dataset_name.replace('-', '_')}_radar.png") |
|
|
| |
| if ablation_results: |
| fig, axes = plt.subplots(2, 2, figsize=(14, 10)) |
| fig.suptitle("UFUSC Ablation Study (MNIST)", fontsize=16, fontweight='bold') |
|
|
| params = {"alpha": "α (CFL weight)", "beta": "β (Sensitivity weight)", |
| "gamma": "γ (Anchor weight)", "epochs": "Unlearning Epochs"} |
|
|
| for idx, (param_key, param_label) in enumerate(params.items()): |
| ax = axes[idx // 2][idx % 2] |
| param_results = [r for r in ablation_results if r["ablation_param"] == param_key] |
|
|
| if not param_results: |
| continue |
|
|
| x_vals = [r["ablation_value"] for r in param_results] |
| forget_vals = [r["forget_acc"] for r in param_results] |
| retain_vals = [r["retain_acc"] for r in param_results] |
|
|
| ax.plot(x_vals, forget_vals, 's-', color='#e74c3c', label='Forget Acc ↓', linewidth=2, markersize=8) |
| ax.plot(x_vals, retain_vals, 'o-', color='#2980b9', label='Retain Acc ↑', linewidth=2, markersize=8) |
| ax.set_xlabel(param_label) |
| ax.set_ylabel("Accuracy (%)") |
| ax.set_title(f"Effect of {param_label}") |
| ax.legend() |
| ax.grid(True, alpha=0.3) |
|
|
| plt.tight_layout() |
| plt.savefig("figures/ablation_study.png", dpi=150, bbox_inches='tight') |
| plt.close() |
| print(" Saved: figures/ablation_study.png") |
|
|
| |
| if scalability_results: |
| fig, axes = plt.subplots(1, 2, figsize=(14, 5)) |
| fig.suptitle("UFUSC Scalability Analysis (Varying K)", fontsize=14, fontweight='bold') |
|
|
| ks = [r["K"] for r in scalability_results] |
|
|
| |
| axes[0].plot(ks, [r["ufusc_forget_acc"] for r in scalability_results], |
| 's-', color='#e74c3c', label='Forget Acc ↓', linewidth=2, markersize=8) |
| axes[0].plot(ks, [r["ufusc_retain_acc"] for r in scalability_results], |
| 'o-', color='#2980b9', label='Retain Acc ↑', linewidth=2, markersize=8) |
| axes[0].plot(ks, [r["ufusc_mia_asr"] for r in scalability_results], |
| '^-', color='#27ae60', label='MIA ASR ↓', linewidth=2, markersize=8) |
| axes[0].set_xlabel("Number of Passive Parties (K)") |
| axes[0].set_ylabel("Metric (%)") |
| axes[0].set_title("Metrics vs K") |
| axes[0].legend() |
| axes[0].set_xticks(ks) |
|
|
| |
| axes[1].bar(ks, [r["time_seconds"] for r in scalability_results], |
| color='#8e44ad', alpha=0.7) |
| axes[1].set_xlabel("Number of Passive Parties (K)") |
| axes[1].set_ylabel("Time (seconds)") |
| axes[1].set_title("Unlearning Time vs K") |
| axes[1].set_xticks(ks) |
|
|
| plt.tight_layout() |
| plt.savefig("figures/scalability_analysis.png", dpi=150, bbox_inches='tight') |
| plt.close() |
| print(" Saved: figures/scalability_analysis.png") |
|
|
| |
| for dataset_name, results in all_results.items(): |
| fig, ax = plt.subplots(figsize=(10, 7)) |
| ax.set_title(f"{dataset_name} — Privacy-Utility Tradeoff", fontsize=14, fontweight='bold') |
|
|
| for r in results: |
| if r["method"] == "Original (No Unlearn)": |
| continue |
| color = colors.get(r["method"], "#333333") |
| marker = 'D' if 'UFUSC' in r["method"] else 'o' |
| size = 200 if 'UFUSC' in r["method"] else 100 |
| ax.scatter(r["retain_acc"], 100 - r["mia_asr"], |
| c=color, s=size, marker=marker, |
| label=r["method"], edgecolors='black', linewidth=0.5, zorder=5) |
|
|
| ax.set_xlabel("Retain Accuracy (%) ↑ — Utility", fontsize=12) |
| ax.set_ylabel("Privacy Protection (100 - MIA ASR) ↑", fontsize=12) |
| ax.legend(fontsize=9, loc='best') |
| ax.grid(True, alpha=0.3) |
|
|
| |
| ax.annotate("← Better Privacy & Utility →", |
| xy=(0.5, 0.02), xycoords='axes fraction', |
| fontsize=10, ha='center', alpha=0.5, style='italic') |
|
|
| plt.tight_layout() |
| plt.savefig(f"figures/{dataset_name.replace('-', '_')}_tradeoff.png", dpi=150, bbox_inches='tight') |
| plt.close() |
| print(f" Saved: figures/{dataset_name.replace('-', '_')}_tradeoff.png") |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| """ |
| Full experimental pipeline: |
| 1. Run experiments on MNIST, Fashion-MNIST, CIFAR-10 |
| 2. Run ablation study on MNIST |
| 3. Run scalability analysis on MNIST |
| 4. Generate all visualizations |
| 5. Save results to JSON |
| """ |
| print("=" * 70) |
| print(" UFUSC: Unified Federated Unlearning via") |
| print(" Sensitivity-Guided Contrastive Forgetting") |
| print("=" * 70) |
| print(f" Device: {DEVICE}") |
| print(f" Seed: {SEED}") |
| print(f" VFL Parties: {NUM_PASSIVE_PARTIES}") |
| print(f" Batch Size: {BATCH_SIZE}") |
| print(f" Train Epochs: {TRAIN_EPOCHS}") |
| print(f" Unlearn Epochs: {UNLEARN_EPOCHS}") |
| print(f" Forget Ratio: {FORGET_RATIO}") |
| print(f" UFUSC params: α={ALPHA}, β={BETA}, γ={GAMMA}, Ω={OMEGA}, τ={TAU}") |
| print() |
|
|
| |
| all_results = {} |
| for dataset_name in ["MNIST", "Fashion-MNIST", "CIFAR-10"]: |
| results = run_single_experiment(dataset_name) |
| all_results[dataset_name] = results |
|
|
| |
| with open("results/all_results.json", "w") as f: |
| json.dump(all_results, f, indent=2) |
| print("\n✓ Saved: results/all_results.json") |
|
|
| |
| ablation_results = run_ablation_study("MNIST") |
| with open("results/ablation_results.json", "w") as f: |
| json.dump(ablation_results, f, indent=2) |
| print("✓ Saved: results/ablation_results.json") |
|
|
| |
| scalability_results = run_scalability_analysis("MNIST") |
| with open("results/scalability_results.json", "w") as f: |
| json.dump(scalability_results, f, indent=2) |
| print("✓ Saved: results/scalability_results.json") |
|
|
| |
| print("\n" + "=" * 70) |
| print(" GENERATING VISUALIZATIONS") |
| print("=" * 70) |
| create_visualizations(all_results, ablation_results, scalability_results) |
|
|
| |
| print("\n" + "=" * 70) |
| print(" FINAL SUMMARY") |
| print("=" * 70) |
|
|
| for dataset_name, results in all_results.items(): |
| joint = next((r for r in results if r["method"] == "UFUSC (Joint)"), None) |
| if joint: |
| print(f"\n {dataset_name}:") |
| print(f" UFUSC-Joint → Retain: {joint['retain_acc']:.1f}%, " |
| f"Forget: {joint['forget_acc']:.1f}%, MIA: {joint['mia_asr']:.1f}%") |
|
|
| print("\n All experiments complete!") |
| print(f" Results: results/all_results.json") |
| print(f" Ablation: results/ablation_results.json") |
| print(f" Scalability: results/scalability_results.json") |
| print(f" Figures: figures/*.png") |
| print("=" * 70) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|