BaseChange / train.py
Vedant Jigarbhai Mehta
Implement full training loop and visualization utilities
0cbf4d6
"""Main training script for change detection models.
Supports mixed-precision training, gradient accumulation, gradient clipping,
early stopping on validation F1, checkpoint saving (best + last) to Google
Drive or local disk, and full resume from checkpoint after Colab disconnects.
Usage:
python train.py --config configs/config.yaml --model unet_pp
python train.py --config configs/config.yaml --model changeformer \
--resume /content/drive/MyDrive/change-detection/checkpoints/changeformer_last.pth
"""
import argparse
import logging
import random
import time
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import numpy as np
import torch
import torch.nn as nn
from torch.cuda.amp import GradScaler, autocast
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR, LinearLR, SequentialLR
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
import yaml
from data.dataset import ChangeDetectionDataset
from models import get_model
from utils.losses import get_loss
from utils.metrics import MetricTracker
from utils.visualization import log_predictions_to_tensorboard
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Reproducibility
# ---------------------------------------------------------------------------
def set_seed(seed: int) -> None:
"""Set all random seeds for reproducibility.
Configures Python, NumPy, PyTorch (CPU + CUDA), and cuDNN for
deterministic behaviour.
Args:
seed: Random seed value.
"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# ---------------------------------------------------------------------------
# GPU / config helpers
# ---------------------------------------------------------------------------
def detect_gpu_type() -> str:
"""Detect the current GPU type for automatic batch-size selection.
Returns:
One of ``'T4'``, ``'V100'``, or ``'default'``.
"""
if not torch.cuda.is_available():
return "default"
name = torch.cuda.get_device_name(0).upper()
if "T4" in name:
return "T4"
elif "V100" in name:
return "V100"
return "default"
def get_batch_size(config: Dict[str, Any], model_name: str) -> int:
"""Look up the batch size for the current GPU + model combination.
Args:
config: Full project config dict.
model_name: Model identifier string.
Returns:
Batch size as an integer.
"""
gpu_type = detect_gpu_type()
model_sizes = config.get("batch_sizes", {}).get(model_name, {})
return model_sizes.get(gpu_type, model_sizes.get("default", 4))
def get_learning_rate(config: Dict[str, Any], model_name: str) -> float:
"""Look up the per-model learning rate, falling back to the global default.
Args:
config: Full project config dict.
model_name: Model identifier string.
Returns:
Learning rate as a float.
"""
return config.get("learning_rates", {}).get(
model_name, config["training"]["learning_rate"]
)
def get_num_epochs(config: Dict[str, Any], model_name: str) -> int:
"""Look up the per-model epoch count, falling back to the global default.
Args:
config: Full project config dict.
model_name: Model identifier string.
Returns:
Number of epochs as an integer.
"""
return config.get("epoch_counts", {}).get(
model_name, config["training"]["epochs"]
)
def resolve_paths(config: Dict[str, Any]) -> Dict[str, Path]:
"""Build a path dict based on whether Colab mode is enabled.
When ``config["colab"]["enabled"]`` is ``True`` all persistent artefacts
point to Google Drive; otherwise they use the local ``paths`` section.
Args:
config: Full project config dict.
Returns:
Dict with keys ``'data'``, ``'checkpoints'``, ``'logs'``,
``'outputs'``.
"""
if config.get("colab", {}).get("enabled", False):
c = config["colab"]
return {
"data": Path(c["data_dir"]),
"checkpoints": Path(c["checkpoint_dir"]),
"logs": Path(c["log_dir"]),
"outputs": Path(c["output_dir"]),
}
p = config.get("paths", {})
return {
"data": Path(p.get("processed_data", "./processed_data")),
"checkpoints": Path(p.get("checkpoint_dir", "./checkpoints")),
"logs": Path(p.get("log_dir", "./logs")),
"outputs": Path(p.get("output_dir", "./outputs")),
}
# ---------------------------------------------------------------------------
# Data
# ---------------------------------------------------------------------------
def build_dataloaders(
config: Dict[str, Any],
data_dir: Path,
batch_size: int,
) -> Tuple[DataLoader, DataLoader]:
"""Create training and validation ``DataLoader`` instances.
Args:
config: Full project config dict.
data_dir: Root of the processed dataset (contains ``train/``, ``val/``).
batch_size: Mini-batch size.
Returns:
Tuple of ``(train_loader, val_loader)``.
"""
ds_cfg = config.get("dataset", {})
num_workers = ds_cfg.get("num_workers", 4)
pin_memory = ds_cfg.get("pin_memory", True)
train_ds = ChangeDetectionDataset(
root=data_dir / "train", split="train", config=config,
)
val_ds = ChangeDetectionDataset(
root=data_dir / "val", split="val", config=config,
)
train_loader = DataLoader(
train_ds,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
pin_memory=pin_memory,
drop_last=True,
)
val_loader = DataLoader(
val_ds,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
pin_memory=pin_memory,
)
return train_loader, val_loader
# ---------------------------------------------------------------------------
# Scheduler with linear warmup
# ---------------------------------------------------------------------------
def build_scheduler(
optimizer: torch.optim.Optimizer,
total_epochs: int,
warmup_epochs: int,
) -> torch.optim.lr_scheduler._LRScheduler:
"""Create a CosineAnnealingLR scheduler preceded by linear warmup.
During the first ``warmup_epochs`` the LR ramps linearly from
``start_factor`` to the base LR, then cosine-decays for the remainder.
Args:
optimizer: Optimizer whose LR groups will be scheduled.
total_epochs: Total number of training epochs.
warmup_epochs: Number of warmup epochs (0 to disable).
Returns:
A learning-rate scheduler instance.
"""
if warmup_epochs > 0 and warmup_epochs < total_epochs:
warmup = LinearLR(
optimizer,
start_factor=0.01,
end_factor=1.0,
total_iters=warmup_epochs,
)
cosine = CosineAnnealingLR(
optimizer,
T_max=total_epochs - warmup_epochs,
)
return SequentialLR(
optimizer,
schedulers=[warmup, cosine],
milestones=[warmup_epochs],
)
return CosineAnnealingLR(optimizer, T_max=total_epochs)
# ---------------------------------------------------------------------------
# Checkpointing
# ---------------------------------------------------------------------------
def save_checkpoint(
model: nn.Module,
optimizer: torch.optim.Optimizer,
scheduler: torch.optim.lr_scheduler._LRScheduler,
scaler: GradScaler,
epoch: int,
best_f1: float,
best_epoch: int,
save_path: Path,
) -> None:
"""Persist a full training checkpoint to disk.
Args:
model: Model whose weights to save.
optimizer: Optimizer state to save.
scheduler: LR scheduler state to save.
scaler: ``GradScaler`` state to save.
epoch: Epoch number just completed (1-indexed).
best_f1: Best validation F1 achieved so far.
best_epoch: Epoch that achieved ``best_f1``.
save_path: Destination file path.
"""
save_path.parent.mkdir(parents=True, exist_ok=True)
torch.save(
{
"epoch": epoch,
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"scheduler_state_dict": scheduler.state_dict(),
"scaler_state_dict": scaler.state_dict(),
"best_f1": best_f1,
"best_epoch": best_epoch,
},
save_path,
)
logger.info("Checkpoint saved → %s", save_path)
def load_checkpoint(
path: Path,
model: nn.Module,
optimizer: torch.optim.Optimizer,
scheduler: torch.optim.lr_scheduler._LRScheduler,
scaler: GradScaler,
device: torch.device,
) -> Tuple[int, float, int]:
"""Restore training state from a checkpoint.
Args:
path: Checkpoint file to load.
model: Model to receive saved weights.
optimizer: Optimizer to receive saved state.
scheduler: Scheduler to receive saved state.
scaler: ``GradScaler`` to receive saved state.
device: Target device for ``map_location``.
Returns:
Tuple of ``(start_epoch, best_f1, best_epoch)``.
"""
ckpt = torch.load(path, map_location=device)
model.load_state_dict(ckpt["model_state_dict"])
optimizer.load_state_dict(ckpt["optimizer_state_dict"])
scheduler.load_state_dict(ckpt["scheduler_state_dict"])
scaler.load_state_dict(ckpt["scaler_state_dict"])
best_f1 = ckpt["best_f1"]
best_epoch = ckpt.get("best_epoch", ckpt["epoch"])
logger.info(
"Resumed from epoch %d (best F1: %.4f @ epoch %d)",
ckpt["epoch"], best_f1, best_epoch,
)
return ckpt["epoch"], best_f1, best_epoch
# ---------------------------------------------------------------------------
# Train / validate one epoch
# ---------------------------------------------------------------------------
def train_one_epoch(
model: nn.Module,
loader: DataLoader,
criterion: nn.Module,
optimizer: torch.optim.Optimizer,
scaler: GradScaler,
device: torch.device,
tracker: MetricTracker,
accum_steps: int,
grad_clip: float,
) -> Tuple[float, Dict[str, float]]:
"""Execute one full training epoch.
Args:
model: Change-detection model.
loader: Training ``DataLoader``.
criterion: Loss module (operates on raw logits).
optimizer: Optimiser instance.
scaler: ``GradScaler`` for mixed-precision training.
device: Target CUDA / CPU device.
tracker: ``MetricTracker`` (reset externally before this call).
accum_steps: Number of gradient-accumulation micro-steps.
grad_clip: Maximum gradient norm for clipping.
Returns:
Tuple of ``(average_loss, metrics_dict)``.
"""
model.train()
running_loss = 0.0
num_batches = 0
optimizer.zero_grad(set_to_none=True)
pbar = tqdm(loader, desc=" Train", leave=False, dynamic_ncols=True)
for step, batch in enumerate(pbar):
img_a = batch["A"].to(device, non_blocking=True)
img_b = batch["B"].to(device, non_blocking=True)
mask = batch["mask"].to(device, non_blocking=True)
with autocast():
logits = model(img_a, img_b)
loss = criterion(logits, mask) / accum_steps
scaler.scale(loss).backward()
if (step + 1) % accum_steps == 0:
scaler.unscale_(optimizer)
nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad(set_to_none=True)
# Track loss (undo the accumulation scaling for logging)
running_loss += loss.item() * accum_steps
num_batches += 1
# Track metrics (MetricTracker handles sigmoid + threshold internally)
tracker.update(logits.detach(), mask)
pbar.set_postfix(loss=f"{running_loss / num_batches:.4f}")
avg_loss = running_loss / max(num_batches, 1)
metrics = tracker.compute()
return avg_loss, metrics
@torch.no_grad()
def validate(
model: nn.Module,
loader: DataLoader,
criterion: nn.Module,
device: torch.device,
tracker: MetricTracker,
) -> Tuple[float, Dict[str, float], Optional[Dict[str, torch.Tensor]]]:
"""Run one full validation pass.
Args:
model: Change-detection model (set to eval internally).
loader: Validation ``DataLoader``.
criterion: Loss module (operates on raw logits).
device: Target device.
tracker: ``MetricTracker`` (reset externally before this call).
Returns:
Tuple of ``(average_loss, metrics_dict, last_batch)`` where
``last_batch`` is the final mini-batch dict (for visualisation).
"""
model.eval()
running_loss = 0.0
num_batches = 0
last_batch: Optional[Dict[str, torch.Tensor]] = None
pbar = tqdm(loader, desc=" Val ", leave=False, dynamic_ncols=True)
for batch in pbar:
img_a = batch["A"].to(device, non_blocking=True)
img_b = batch["B"].to(device, non_blocking=True)
mask = batch["mask"].to(device, non_blocking=True)
logits = model(img_a, img_b)
loss = criterion(logits, mask)
running_loss += loss.item()
num_batches += 1
tracker.update(logits, mask)
# Keep the last batch for TensorBoard visualisation
last_batch = {
"A": img_a,
"B": img_b,
"mask": mask,
"logits": logits,
}
pbar.set_postfix(loss=f"{running_loss / num_batches:.4f}")
avg_loss = running_loss / max(num_batches, 1)
metrics = tracker.compute()
return avg_loss, metrics, last_batch
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
"""Entry point — parse CLI args, build components, run training loop."""
# ---- CLI ----------------------------------------------------------
parser = argparse.ArgumentParser(
description="Train a change-detection model",
)
parser.add_argument(
"--config", type=Path, default=Path("configs/config.yaml"),
help="Path to the YAML configuration file.",
)
parser.add_argument(
"--model", type=str, default=None,
help="Override the model name from config (siamese_cnn | unet_pp | changeformer).",
)
parser.add_argument(
"--resume", type=Path, default=None,
help="Path to a checkpoint file to resume training from.",
)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# ---- Config -------------------------------------------------------
with open(args.config, "r") as fh:
config: Dict[str, Any] = yaml.safe_load(fh)
model_name: str = args.model or config["model"]["name"]
train_cfg = config["training"]
seed: int = config.get("project", {}).get("seed", 42)
set_seed(seed)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
gpu_type = detect_gpu_type()
logger.info("Device: %s | GPU type: %s", device, gpu_type)
# ---- Paths --------------------------------------------------------
paths = resolve_paths(config)
for p in paths.values():
p.mkdir(parents=True, exist_ok=True)
# ---- Hyperparams (auto from per-model tables) ---------------------
batch_size = get_batch_size(config, model_name)
lr = get_learning_rate(config, model_name)
num_epochs = get_num_epochs(config, model_name)
accum_steps: int = train_cfg.get("gradient_accumulation_steps", 1)
grad_clip: float = train_cfg.get("grad_clip_max_norm", 1.0)
warmup_epochs: int = train_cfg.get("warmup_epochs", 5)
vis_interval: int = train_cfg.get("vis_interval", 5)
threshold: float = config.get("evaluation", {}).get("threshold", 0.5)
logger.info(
"Hyperparams → model=%s bs=%d lr=%.1e epochs=%d accum=%d warmup=%d",
model_name, batch_size, lr, num_epochs, accum_steps, warmup_epochs,
)
# ---- Model --------------------------------------------------------
model = get_model(model_name, config).to(device)
param_count = sum(p.numel() for p in model.parameters()) / 1e6
logger.info("Model: %s (%.2fM parameters)", model_name, param_count)
# ---- Data ---------------------------------------------------------
train_loader, val_loader = build_dataloaders(config, paths["data"], batch_size)
logger.info(
"Data: %d train batches, %d val batches (batch_size=%d)",
len(train_loader), len(val_loader), batch_size,
)
# ---- Loss / optimiser / scheduler ---------------------------------
criterion = get_loss(config).to(device)
optimizer = AdamW(
model.parameters(),
lr=lr,
weight_decay=train_cfg["weight_decay"],
)
scheduler = build_scheduler(optimizer, num_epochs, warmup_epochs)
scaler = GradScaler(enabled=train_cfg.get("amp", True))
# ---- TensorBoard --------------------------------------------------
writer = SummaryWriter(log_dir=str(paths["logs"] / model_name))
# ---- MetricTrackers -----------------------------------------------
train_tracker = MetricTracker(threshold=threshold)
val_tracker = MetricTracker(threshold=threshold)
# ---- Resume -------------------------------------------------------
start_epoch: int = 0
best_f1: float = 0.0
best_epoch: int = 0
if args.resume is not None and args.resume.exists():
start_epoch, best_f1, best_epoch = load_checkpoint(
args.resume, model, optimizer, scheduler, scaler, device,
)
elif args.resume is not None:
logger.warning("Resume path does not exist: %s — training from scratch", args.resume)
# ---- Early stopping state -----------------------------------------
es_cfg = train_cfg.get("early_stopping", {})
es_enabled: bool = es_cfg.get("enabled", True)
patience: int = es_cfg.get("patience", 15)
patience_counter: int = 0
# ---- Training loop ------------------------------------------------
wall_start = time.monotonic()
logger.info("=" * 60)
logger.info("Starting training from epoch %d", start_epoch + 1)
logger.info("=" * 60)
for epoch in range(start_epoch, num_epochs):
epoch_start = time.monotonic()
epoch_num = epoch + 1 # 1-indexed for display / checkpoints
current_lr = optimizer.param_groups[0]["lr"]
logger.info("Epoch %d/%d (lr=%.2e)", epoch_num, num_epochs, current_lr)
# -- Train ------------------------------------------------------
train_tracker.reset()
train_loss, train_metrics = train_one_epoch(
model, train_loader, criterion, optimizer, scaler, device,
train_tracker, accum_steps, grad_clip,
)
# -- Validate ---------------------------------------------------
val_tracker.reset()
val_loss, val_metrics, last_val_batch = validate(
model, val_loader, criterion, device, val_tracker,
)
# -- Step scheduler (after both train + val) --------------------
scheduler.step()
# -- TensorBoard scalars ----------------------------------------
writer.add_scalar("Loss/train", train_loss, epoch_num)
writer.add_scalar("Loss/val", val_loss, epoch_num)
writer.add_scalar("LR", current_lr, epoch_num)
for key, value in train_metrics.items():
writer.add_scalar(f"Train/{key}", value, epoch_num)
for key, value in val_metrics.items():
writer.add_scalar(f"Val/{key}", value, epoch_num)
# -- TensorBoard prediction images ------------------------------
if last_val_batch is not None and epoch_num % vis_interval == 0:
log_predictions_to_tensorboard(
writer,
img_a=last_val_batch["A"],
img_b=last_val_batch["B"],
mask_true=last_val_batch["mask"],
mask_pred=last_val_batch["logits"],
step=epoch_num,
num_samples=4,
)
# -- Console log ------------------------------------------------
epoch_time = time.monotonic() - epoch_start
logger.info(
" Train — loss: %.4f | F1: %.4f | IoU: %.4f",
train_loss, train_metrics["f1"], train_metrics["iou"],
)
logger.info(
" Val — loss: %.4f | F1: %.4f | IoU: %.4f | Prec: %.4f | Rec: %.4f | OA: %.4f",
val_loss,
val_metrics["f1"],
val_metrics["iou"],
val_metrics["precision"],
val_metrics["recall"],
val_metrics["oa"],
)
logger.info(" Epoch time: %.1fs", epoch_time)
# -- Save last checkpoint (every epoch) -------------------------
save_checkpoint(
model, optimizer, scheduler, scaler,
epoch=epoch_num,
best_f1=best_f1,
best_epoch=best_epoch,
save_path=paths["checkpoints"] / f"{model_name}_last.pth",
)
# -- Save best checkpoint (if improved) -------------------------
if val_metrics["f1"] > best_f1:
best_f1 = val_metrics["f1"]
best_epoch = epoch_num
patience_counter = 0
save_checkpoint(
model, optimizer, scheduler, scaler,
epoch=epoch_num,
best_f1=best_f1,
best_epoch=best_epoch,
save_path=paths["checkpoints"] / f"{model_name}_best.pth",
)
logger.info(" ★ New best F1: %.4f (epoch %d)", best_f1, best_epoch)
else:
patience_counter += 1
logger.info(
" No improvement (%d/%d patience)", patience_counter, patience,
)
# -- Early stopping ---------------------------------------------
if es_enabled and patience_counter >= patience:
logger.info(
"Early stopping triggered after %d epochs without improvement.",
patience,
)
break
# ---- Summary ------------------------------------------------------
writer.close()
total_time = time.monotonic() - wall_start
hours, remainder = divmod(total_time, 3600)
minutes, seconds = divmod(remainder, 60)
logger.info("=" * 60)
logger.info("Training complete.")
logger.info(" Best val F1 : %.4f (epoch %d)", best_f1, best_epoch)
logger.info(" Total time : %dh %dm %ds", int(hours), int(minutes), int(seconds))
logger.info(" Checkpoints : %s", paths["checkpoints"])
logger.info("=" * 60)
if __name__ == "__main__":
main()