"""Main training script for change detection models. Supports mixed-precision training, gradient accumulation, gradient clipping, early stopping on validation F1, checkpoint saving (best + last) to Google Drive or local disk, and full resume from checkpoint after Colab disconnects. Usage: python train.py --config configs/config.yaml --model unet_pp python train.py --config configs/config.yaml --model changeformer \ --resume /content/drive/MyDrive/change-detection/checkpoints/changeformer_last.pth """ import argparse import logging import random import time from pathlib import Path from typing import Any, Dict, Optional, Tuple import numpy as np import torch import torch.nn as nn from torch.cuda.amp import GradScaler, autocast from torch.optim import AdamW from torch.optim.lr_scheduler import CosineAnnealingLR, LinearLR, SequentialLR from torch.utils.data import DataLoader from torch.utils.tensorboard import SummaryWriter from tqdm import tqdm import yaml from data.dataset import ChangeDetectionDataset from models import get_model from utils.losses import get_loss from utils.metrics import MetricTracker from utils.visualization import log_predictions_to_tensorboard logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Reproducibility # --------------------------------------------------------------------------- def set_seed(seed: int) -> None: """Set all random seeds for reproducibility. Configures Python, NumPy, PyTorch (CPU + CUDA), and cuDNN for deterministic behaviour. Args: seed: Random seed value. """ random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False # --------------------------------------------------------------------------- # GPU / config helpers # --------------------------------------------------------------------------- def detect_gpu_type() -> str: """Detect the current GPU type for automatic batch-size selection. Returns: One of ``'T4'``, ``'V100'``, or ``'default'``. """ if not torch.cuda.is_available(): return "default" name = torch.cuda.get_device_name(0).upper() if "T4" in name: return "T4" elif "V100" in name: return "V100" return "default" def get_batch_size(config: Dict[str, Any], model_name: str) -> int: """Look up the batch size for the current GPU + model combination. Args: config: Full project config dict. model_name: Model identifier string. Returns: Batch size as an integer. """ gpu_type = detect_gpu_type() model_sizes = config.get("batch_sizes", {}).get(model_name, {}) return model_sizes.get(gpu_type, model_sizes.get("default", 4)) def get_learning_rate(config: Dict[str, Any], model_name: str) -> float: """Look up the per-model learning rate, falling back to the global default. Args: config: Full project config dict. model_name: Model identifier string. Returns: Learning rate as a float. """ return config.get("learning_rates", {}).get( model_name, config["training"]["learning_rate"] ) def get_num_epochs(config: Dict[str, Any], model_name: str) -> int: """Look up the per-model epoch count, falling back to the global default. Args: config: Full project config dict. model_name: Model identifier string. Returns: Number of epochs as an integer. """ return config.get("epoch_counts", {}).get( model_name, config["training"]["epochs"] ) def resolve_paths(config: Dict[str, Any]) -> Dict[str, Path]: """Build a path dict based on whether Colab mode is enabled. When ``config["colab"]["enabled"]`` is ``True`` all persistent artefacts point to Google Drive; otherwise they use the local ``paths`` section. Args: config: Full project config dict. Returns: Dict with keys ``'data'``, ``'checkpoints'``, ``'logs'``, ``'outputs'``. """ if config.get("colab", {}).get("enabled", False): c = config["colab"] return { "data": Path(c["data_dir"]), "checkpoints": Path(c["checkpoint_dir"]), "logs": Path(c["log_dir"]), "outputs": Path(c["output_dir"]), } p = config.get("paths", {}) return { "data": Path(p.get("processed_data", "./processed_data")), "checkpoints": Path(p.get("checkpoint_dir", "./checkpoints")), "logs": Path(p.get("log_dir", "./logs")), "outputs": Path(p.get("output_dir", "./outputs")), } # --------------------------------------------------------------------------- # Data # --------------------------------------------------------------------------- def build_dataloaders( config: Dict[str, Any], data_dir: Path, batch_size: int, ) -> Tuple[DataLoader, DataLoader]: """Create training and validation ``DataLoader`` instances. Args: config: Full project config dict. data_dir: Root of the processed dataset (contains ``train/``, ``val/``). batch_size: Mini-batch size. Returns: Tuple of ``(train_loader, val_loader)``. """ ds_cfg = config.get("dataset", {}) num_workers = ds_cfg.get("num_workers", 4) pin_memory = ds_cfg.get("pin_memory", True) train_ds = ChangeDetectionDataset( root=data_dir / "train", split="train", config=config, ) val_ds = ChangeDetectionDataset( root=data_dir / "val", split="val", config=config, ) train_loader = DataLoader( train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=pin_memory, drop_last=True, ) val_loader = DataLoader( val_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=pin_memory, ) return train_loader, val_loader # --------------------------------------------------------------------------- # Scheduler with linear warmup # --------------------------------------------------------------------------- def build_scheduler( optimizer: torch.optim.Optimizer, total_epochs: int, warmup_epochs: int, ) -> torch.optim.lr_scheduler._LRScheduler: """Create a CosineAnnealingLR scheduler preceded by linear warmup. During the first ``warmup_epochs`` the LR ramps linearly from ``start_factor`` to the base LR, then cosine-decays for the remainder. Args: optimizer: Optimizer whose LR groups will be scheduled. total_epochs: Total number of training epochs. warmup_epochs: Number of warmup epochs (0 to disable). Returns: A learning-rate scheduler instance. """ if warmup_epochs > 0 and warmup_epochs < total_epochs: warmup = LinearLR( optimizer, start_factor=0.01, end_factor=1.0, total_iters=warmup_epochs, ) cosine = CosineAnnealingLR( optimizer, T_max=total_epochs - warmup_epochs, ) return SequentialLR( optimizer, schedulers=[warmup, cosine], milestones=[warmup_epochs], ) return CosineAnnealingLR(optimizer, T_max=total_epochs) # --------------------------------------------------------------------------- # Checkpointing # --------------------------------------------------------------------------- def save_checkpoint( model: nn.Module, optimizer: torch.optim.Optimizer, scheduler: torch.optim.lr_scheduler._LRScheduler, scaler: GradScaler, epoch: int, best_f1: float, best_epoch: int, save_path: Path, ) -> None: """Persist a full training checkpoint to disk. Args: model: Model whose weights to save. optimizer: Optimizer state to save. scheduler: LR scheduler state to save. scaler: ``GradScaler`` state to save. epoch: Epoch number just completed (1-indexed). best_f1: Best validation F1 achieved so far. best_epoch: Epoch that achieved ``best_f1``. save_path: Destination file path. """ save_path.parent.mkdir(parents=True, exist_ok=True) torch.save( { "epoch": epoch, "model_state_dict": model.state_dict(), "optimizer_state_dict": optimizer.state_dict(), "scheduler_state_dict": scheduler.state_dict(), "scaler_state_dict": scaler.state_dict(), "best_f1": best_f1, "best_epoch": best_epoch, }, save_path, ) logger.info("Checkpoint saved → %s", save_path) def load_checkpoint( path: Path, model: nn.Module, optimizer: torch.optim.Optimizer, scheduler: torch.optim.lr_scheduler._LRScheduler, scaler: GradScaler, device: torch.device, ) -> Tuple[int, float, int]: """Restore training state from a checkpoint. Args: path: Checkpoint file to load. model: Model to receive saved weights. optimizer: Optimizer to receive saved state. scheduler: Scheduler to receive saved state. scaler: ``GradScaler`` to receive saved state. device: Target device for ``map_location``. Returns: Tuple of ``(start_epoch, best_f1, best_epoch)``. """ ckpt = torch.load(path, map_location=device) model.load_state_dict(ckpt["model_state_dict"]) optimizer.load_state_dict(ckpt["optimizer_state_dict"]) scheduler.load_state_dict(ckpt["scheduler_state_dict"]) scaler.load_state_dict(ckpt["scaler_state_dict"]) best_f1 = ckpt["best_f1"] best_epoch = ckpt.get("best_epoch", ckpt["epoch"]) logger.info( "Resumed from epoch %d (best F1: %.4f @ epoch %d)", ckpt["epoch"], best_f1, best_epoch, ) return ckpt["epoch"], best_f1, best_epoch # --------------------------------------------------------------------------- # Train / validate one epoch # --------------------------------------------------------------------------- def train_one_epoch( model: nn.Module, loader: DataLoader, criterion: nn.Module, optimizer: torch.optim.Optimizer, scaler: GradScaler, device: torch.device, tracker: MetricTracker, accum_steps: int, grad_clip: float, ) -> Tuple[float, Dict[str, float]]: """Execute one full training epoch. Args: model: Change-detection model. loader: Training ``DataLoader``. criterion: Loss module (operates on raw logits). optimizer: Optimiser instance. scaler: ``GradScaler`` for mixed-precision training. device: Target CUDA / CPU device. tracker: ``MetricTracker`` (reset externally before this call). accum_steps: Number of gradient-accumulation micro-steps. grad_clip: Maximum gradient norm for clipping. Returns: Tuple of ``(average_loss, metrics_dict)``. """ model.train() running_loss = 0.0 num_batches = 0 optimizer.zero_grad(set_to_none=True) pbar = tqdm(loader, desc=" Train", leave=False, dynamic_ncols=True) for step, batch in enumerate(pbar): img_a = batch["A"].to(device, non_blocking=True) img_b = batch["B"].to(device, non_blocking=True) mask = batch["mask"].to(device, non_blocking=True) with autocast(): logits = model(img_a, img_b) loss = criterion(logits, mask) / accum_steps scaler.scale(loss).backward() if (step + 1) % accum_steps == 0: scaler.unscale_(optimizer) nn.utils.clip_grad_norm_(model.parameters(), grad_clip) scaler.step(optimizer) scaler.update() optimizer.zero_grad(set_to_none=True) # Track loss (undo the accumulation scaling for logging) running_loss += loss.item() * accum_steps num_batches += 1 # Track metrics (MetricTracker handles sigmoid + threshold internally) tracker.update(logits.detach(), mask) pbar.set_postfix(loss=f"{running_loss / num_batches:.4f}") avg_loss = running_loss / max(num_batches, 1) metrics = tracker.compute() return avg_loss, metrics @torch.no_grad() def validate( model: nn.Module, loader: DataLoader, criterion: nn.Module, device: torch.device, tracker: MetricTracker, ) -> Tuple[float, Dict[str, float], Optional[Dict[str, torch.Tensor]]]: """Run one full validation pass. Args: model: Change-detection model (set to eval internally). loader: Validation ``DataLoader``. criterion: Loss module (operates on raw logits). device: Target device. tracker: ``MetricTracker`` (reset externally before this call). Returns: Tuple of ``(average_loss, metrics_dict, last_batch)`` where ``last_batch`` is the final mini-batch dict (for visualisation). """ model.eval() running_loss = 0.0 num_batches = 0 last_batch: Optional[Dict[str, torch.Tensor]] = None pbar = tqdm(loader, desc=" Val ", leave=False, dynamic_ncols=True) for batch in pbar: img_a = batch["A"].to(device, non_blocking=True) img_b = batch["B"].to(device, non_blocking=True) mask = batch["mask"].to(device, non_blocking=True) logits = model(img_a, img_b) loss = criterion(logits, mask) running_loss += loss.item() num_batches += 1 tracker.update(logits, mask) # Keep the last batch for TensorBoard visualisation last_batch = { "A": img_a, "B": img_b, "mask": mask, "logits": logits, } pbar.set_postfix(loss=f"{running_loss / num_batches:.4f}") avg_loss = running_loss / max(num_batches, 1) metrics = tracker.compute() return avg_loss, metrics, last_batch # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: """Entry point — parse CLI args, build components, run training loop.""" # ---- CLI ---------------------------------------------------------- parser = argparse.ArgumentParser( description="Train a change-detection model", ) parser.add_argument( "--config", type=Path, default=Path("configs/config.yaml"), help="Path to the YAML configuration file.", ) parser.add_argument( "--model", type=str, default=None, help="Override the model name from config (siamese_cnn | unet_pp | changeformer).", ) parser.add_argument( "--resume", type=Path, default=None, help="Path to a checkpoint file to resume training from.", ) args = parser.parse_args() logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) # ---- Config ------------------------------------------------------- with open(args.config, "r") as fh: config: Dict[str, Any] = yaml.safe_load(fh) model_name: str = args.model or config["model"]["name"] train_cfg = config["training"] seed: int = config.get("project", {}).get("seed", 42) set_seed(seed) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") gpu_type = detect_gpu_type() logger.info("Device: %s | GPU type: %s", device, gpu_type) # ---- Paths -------------------------------------------------------- paths = resolve_paths(config) for p in paths.values(): p.mkdir(parents=True, exist_ok=True) # ---- Hyperparams (auto from per-model tables) --------------------- batch_size = get_batch_size(config, model_name) lr = get_learning_rate(config, model_name) num_epochs = get_num_epochs(config, model_name) accum_steps: int = train_cfg.get("gradient_accumulation_steps", 1) grad_clip: float = train_cfg.get("grad_clip_max_norm", 1.0) warmup_epochs: int = train_cfg.get("warmup_epochs", 5) vis_interval: int = train_cfg.get("vis_interval", 5) threshold: float = config.get("evaluation", {}).get("threshold", 0.5) logger.info( "Hyperparams → model=%s bs=%d lr=%.1e epochs=%d accum=%d warmup=%d", model_name, batch_size, lr, num_epochs, accum_steps, warmup_epochs, ) # ---- Model -------------------------------------------------------- model = get_model(model_name, config).to(device) param_count = sum(p.numel() for p in model.parameters()) / 1e6 logger.info("Model: %s (%.2fM parameters)", model_name, param_count) # ---- Data --------------------------------------------------------- train_loader, val_loader = build_dataloaders(config, paths["data"], batch_size) logger.info( "Data: %d train batches, %d val batches (batch_size=%d)", len(train_loader), len(val_loader), batch_size, ) # ---- Loss / optimiser / scheduler --------------------------------- criterion = get_loss(config).to(device) optimizer = AdamW( model.parameters(), lr=lr, weight_decay=train_cfg["weight_decay"], ) scheduler = build_scheduler(optimizer, num_epochs, warmup_epochs) scaler = GradScaler(enabled=train_cfg.get("amp", True)) # ---- TensorBoard -------------------------------------------------- writer = SummaryWriter(log_dir=str(paths["logs"] / model_name)) # ---- MetricTrackers ----------------------------------------------- train_tracker = MetricTracker(threshold=threshold) val_tracker = MetricTracker(threshold=threshold) # ---- Resume ------------------------------------------------------- start_epoch: int = 0 best_f1: float = 0.0 best_epoch: int = 0 if args.resume is not None and args.resume.exists(): start_epoch, best_f1, best_epoch = load_checkpoint( args.resume, model, optimizer, scheduler, scaler, device, ) elif args.resume is not None: logger.warning("Resume path does not exist: %s — training from scratch", args.resume) # ---- Early stopping state ----------------------------------------- es_cfg = train_cfg.get("early_stopping", {}) es_enabled: bool = es_cfg.get("enabled", True) patience: int = es_cfg.get("patience", 15) patience_counter: int = 0 # ---- Training loop ------------------------------------------------ wall_start = time.monotonic() logger.info("=" * 60) logger.info("Starting training from epoch %d", start_epoch + 1) logger.info("=" * 60) for epoch in range(start_epoch, num_epochs): epoch_start = time.monotonic() epoch_num = epoch + 1 # 1-indexed for display / checkpoints current_lr = optimizer.param_groups[0]["lr"] logger.info("Epoch %d/%d (lr=%.2e)", epoch_num, num_epochs, current_lr) # -- Train ------------------------------------------------------ train_tracker.reset() train_loss, train_metrics = train_one_epoch( model, train_loader, criterion, optimizer, scaler, device, train_tracker, accum_steps, grad_clip, ) # -- Validate --------------------------------------------------- val_tracker.reset() val_loss, val_metrics, last_val_batch = validate( model, val_loader, criterion, device, val_tracker, ) # -- Step scheduler (after both train + val) -------------------- scheduler.step() # -- TensorBoard scalars ---------------------------------------- writer.add_scalar("Loss/train", train_loss, epoch_num) writer.add_scalar("Loss/val", val_loss, epoch_num) writer.add_scalar("LR", current_lr, epoch_num) for key, value in train_metrics.items(): writer.add_scalar(f"Train/{key}", value, epoch_num) for key, value in val_metrics.items(): writer.add_scalar(f"Val/{key}", value, epoch_num) # -- TensorBoard prediction images ------------------------------ if last_val_batch is not None and epoch_num % vis_interval == 0: log_predictions_to_tensorboard( writer, img_a=last_val_batch["A"], img_b=last_val_batch["B"], mask_true=last_val_batch["mask"], mask_pred=last_val_batch["logits"], step=epoch_num, num_samples=4, ) # -- Console log ------------------------------------------------ epoch_time = time.monotonic() - epoch_start logger.info( " Train — loss: %.4f | F1: %.4f | IoU: %.4f", train_loss, train_metrics["f1"], train_metrics["iou"], ) logger.info( " Val — loss: %.4f | F1: %.4f | IoU: %.4f | Prec: %.4f | Rec: %.4f | OA: %.4f", val_loss, val_metrics["f1"], val_metrics["iou"], val_metrics["precision"], val_metrics["recall"], val_metrics["oa"], ) logger.info(" Epoch time: %.1fs", epoch_time) # -- Save last checkpoint (every epoch) ------------------------- save_checkpoint( model, optimizer, scheduler, scaler, epoch=epoch_num, best_f1=best_f1, best_epoch=best_epoch, save_path=paths["checkpoints"] / f"{model_name}_last.pth", ) # -- Save best checkpoint (if improved) ------------------------- if val_metrics["f1"] > best_f1: best_f1 = val_metrics["f1"] best_epoch = epoch_num patience_counter = 0 save_checkpoint( model, optimizer, scheduler, scaler, epoch=epoch_num, best_f1=best_f1, best_epoch=best_epoch, save_path=paths["checkpoints"] / f"{model_name}_best.pth", ) logger.info(" ★ New best F1: %.4f (epoch %d)", best_f1, best_epoch) else: patience_counter += 1 logger.info( " No improvement (%d/%d patience)", patience_counter, patience, ) # -- Early stopping --------------------------------------------- if es_enabled and patience_counter >= patience: logger.info( "Early stopping triggered after %d epochs without improvement.", patience, ) break # ---- Summary ------------------------------------------------------ writer.close() total_time = time.monotonic() - wall_start hours, remainder = divmod(total_time, 3600) minutes, seconds = divmod(remainder, 60) logger.info("=" * 60) logger.info("Training complete.") logger.info(" Best val F1 : %.4f (epoch %d)", best_f1, best_epoch) logger.info(" Total time : %dh %dm %ds", int(hours), int(minutes), int(seconds)) logger.info(" Checkpoints : %s", paths["checkpoints"]) logger.info("=" * 60) if __name__ == "__main__": main()