Saracasm
Phase 4: Stage 2 fine-tuning — 0.69% dev EER (93% error reduction vs Stage 1)
4fd0c50
"""
Training loop for Wav2Vec-based deepfake audio detection.
Supports both Stage 1 (frozen backbone, simple) and Stage 2 (fine-tuning,
with mixed precision + warmup scheduler).
"""
import os
import time
from dataclasses import dataclass, field
from typing import Optional, Callable, List
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from tqdm import tqdm
from src.evaluation.metrics import (
compute_eer,
compute_auc,
aggregate_window_scores_to_utterance,
)
@dataclass
class TrainConfig:
"""Hyperparameters for one training stage."""
learning_rate: float = 1e-3
batch_size: int = 32
epochs: int = 5
weight_decay: float = 0.01
grad_clip: float = 1.0
early_stopping_patience: int = 3
checkpoint_dir: str = "/content/drive/MyDrive/deepfake_audio/checkpoints"
checkpoint_name: str = "stage1_best.pt"
log_every_n_steps: int = 20
use_wandb: bool = True
wandb_project: str = "deepfake-audio-detection"
wandb_run_name: Optional[str] = None
class_weights: Optional[List[float]] = None
# Stage 2 additions
use_mixed_precision: bool = False
warmup_ratio: float = 0.0 # fraction of total steps used for LR warmup
use_lr_scheduler: bool = False # set True with warmup_ratio > 0
def make_loss_fn(class_weights: Optional[List[float]], device: str) -> Callable:
if class_weights is not None:
weights = torch.tensor(class_weights, dtype=torch.float32, device=device)
return nn.CrossEntropyLoss(weight=weights)
return nn.CrossEntropyLoss()
def make_lr_scheduler(optimizer, total_steps: int, warmup_ratio: float):
"""Linear warmup followed by linear decay to zero."""
warmup_steps = int(total_steps * warmup_ratio)
def lr_lambda(step):
if step < warmup_steps:
return step / max(1, warmup_steps)
progress = (step - warmup_steps) / max(1, total_steps - warmup_steps)
return max(0.0, 1.0 - progress)
return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
@torch.no_grad()
def evaluate(
model: nn.Module,
dev_loader: DataLoader,
device: str,
desc: str = "Eval",
use_mixed_precision: bool = False,
) -> dict:
"""Run inference over the dev set and compute per-utterance metrics."""
model.eval()
all_window_scores, all_window_labels, all_window_utts = [], [], []
total_loss, n_batches = 0.0, 0
loss_fn = nn.CrossEntropyLoss()
autocast_ctx = torch.amp.autocast(device_type="cuda", enabled=use_mixed_precision)
for waveforms, labels, utt_ids in tqdm(dev_loader, desc=desc, leave=False):
waveforms = waveforms.to(device, non_blocking=True)
labels_gpu = labels.to(device, non_blocking=True)
with autocast_ctx:
logits = model(waveforms)
loss = loss_fn(logits, labels_gpu)
total_loss += loss.item()
n_batches += 1
probs = torch.softmax(logits.float(), dim=-1)
spoof_probs = probs[:, 1].detach().cpu().numpy()
all_window_scores.extend(spoof_probs.tolist())
all_window_labels.extend(labels.tolist())
all_window_utts.extend(list(utt_ids))
utt_scores, utt_ids_sorted = aggregate_window_scores_to_utterance(
np.array(all_window_scores), all_window_utts, method="mean",
)
label_map = {}
for s, l, u in zip(all_window_scores, all_window_labels, all_window_utts):
if u not in label_map:
label_map[u] = l
utt_labels = np.array([label_map[u] for u in utt_ids_sorted])
eer, threshold = compute_eer(utt_scores, utt_labels)
auc = compute_auc(utt_scores, utt_labels)
preds = (utt_scores > threshold).astype(int)
accuracy = float((preds == utt_labels).mean())
return {
"eer": eer, "auc": auc, "accuracy": accuracy,
"threshold": float(threshold),
"loss": total_loss / max(n_batches, 1),
"n_utterances": len(utt_ids_sorted),
}
def train(
model: nn.Module,
train_loader: DataLoader,
dev_loader: DataLoader,
config: TrainConfig,
device: str = "cuda",
) -> dict:
"""Train the model for `config.epochs` epochs, evaluating each epoch."""
wandb = None
if config.use_wandb:
import wandb as _wandb
run = _wandb.init(
project=config.wandb_project,
name=config.wandb_run_name,
config={
"learning_rate": config.learning_rate,
"batch_size": config.batch_size,
"epochs": config.epochs,
"weight_decay": config.weight_decay,
"use_mixed_precision": config.use_mixed_precision,
"warmup_ratio": config.warmup_ratio,
"trainable_params": sum(p.numel() for p in model.parameters() if p.requires_grad),
},
settings=_wandb.Settings(init_timeout=180),
)
wandb = _wandb
trainable = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.AdamW(
trainable,
lr=config.learning_rate,
weight_decay=config.weight_decay,
)
# Optional LR scheduler with warmup
scheduler = None
if config.use_lr_scheduler and config.warmup_ratio > 0:
total_steps = len(train_loader) * config.epochs
scheduler = make_lr_scheduler(optimizer, total_steps, config.warmup_ratio)
# Mixed precision setup
scaler = torch.amp.GradScaler("cuda", enabled=config.use_mixed_precision)
autocast_ctx_factory = lambda: torch.amp.autocast(
device_type="cuda", enabled=config.use_mixed_precision
)
loss_fn = make_loss_fn(config.class_weights, device)
os.makedirs(config.checkpoint_dir, exist_ok=True)
checkpoint_path = os.path.join(config.checkpoint_dir, config.checkpoint_name)
history = {"train_loss": [], "dev_eer": [], "dev_auc": [], "dev_accuracy": []}
best_eer = float("inf")
epochs_without_improvement = 0
global_step = 0
for epoch in range(config.epochs):
model.train()
epoch_start = time.time()
epoch_losses = []
pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{config.epochs}")
for waveforms, labels, utt_ids in pbar:
waveforms = waveforms.to(device, non_blocking=True)
labels = labels.to(device, non_blocking=True)
optimizer.zero_grad()
with autocast_ctx_factory():
logits = model(waveforms)
loss = loss_fn(logits, labels)
if config.use_mixed_precision:
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(trainable, config.grad_clip)
scaler.step(optimizer)
scaler.update()
else:
loss.backward()
torch.nn.utils.clip_grad_norm_(trainable, config.grad_clip)
optimizer.step()
if scheduler is not None:
scheduler.step()
epoch_losses.append(loss.item())
global_step += 1
if wandb is not None and global_step % config.log_every_n_steps == 0:
log_data = {
"train/step_loss": loss.item(),
"train/global_step": global_step,
"train/lr": optimizer.param_groups[0]["lr"],
}
wandb.log(log_data)
pbar.set_postfix(loss=f"{loss.item():.4f}",
lr=f"{optimizer.param_groups[0]['lr']:.2e}")
train_loss = float(np.mean(epoch_losses))
dev_metrics = evaluate(
model, dev_loader, device,
desc=f"Epoch {epoch+1} dev",
use_mixed_precision=config.use_mixed_precision,
)
epoch_time = time.time() - epoch_start
history["train_loss"].append(train_loss)
history["dev_eer"].append(dev_metrics["eer"])
history["dev_auc"].append(dev_metrics["auc"])
history["dev_accuracy"].append(dev_metrics["accuracy"])
print(f"\nEpoch {epoch+1}/{config.epochs} ({epoch_time:.0f}s)")
print(f" train_loss: {train_loss:.4f}")
print(f" dev_eer: {dev_metrics['eer']*100:.2f}%")
print(f" dev_auc: {dev_metrics['auc']:.4f}")
print(f" dev_accuracy: {dev_metrics['accuracy']*100:.2f}%")
if wandb is not None:
wandb.log({
"epoch": epoch + 1,
"train/epoch_loss": train_loss,
"dev/eer": dev_metrics["eer"],
"dev/auc": dev_metrics["auc"],
"dev/accuracy": dev_metrics["accuracy"],
"dev/loss": dev_metrics["loss"],
})
if dev_metrics["eer"] < best_eer:
best_eer = dev_metrics["eer"]
epochs_without_improvement = 0
torch.save({
"epoch": epoch + 1,
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"scheduler_state_dict": scheduler.state_dict() if scheduler else None,
"best_eer": best_eer,
"config": vars(config),
}, checkpoint_path)
print(f" → Saved best checkpoint (EER={best_eer*100:.2f}%)")
else:
epochs_without_improvement += 1
print(f" No improvement for {epochs_without_improvement} epoch(s)")
if epochs_without_improvement >= config.early_stopping_patience:
print(f"\nEarly stopping after {epoch+1} epochs.")
break
if wandb is not None:
wandb.summary["best_dev_eer"] = best_eer
wandb.finish()
return {"history": history, "best_eer": best_eer, "checkpoint_path": checkpoint_path}