| |
|
|
| """ |
| Cantor Fusion Classifier with AdamW + Cosine Warm Restarts |
| ----------------------------------------------------------- |
| Features: |
| - AdamW optimizer (best for ViTs) |
| - CosineAnnealingWarmRestarts (automatic drop + restart cycles) |
| - HuggingFace Hub uploads (ONE shared repo, organized by run) |
| - TensorBoard logging (loss, accuracy, fusion metrics, LR tracking) |
| - Easy CIFAR-10/100 switching |
| - Automatic checkpoint management |
| - SafeTensors format (ClamAV safe) |
| |
| Author: AbstractPhil |
| License: MIT |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torch.utils.data import DataLoader |
| from torch.utils.tensorboard import SummaryWriter |
| from torchvision import datasets, transforms |
| from torch.cuda.amp import autocast, GradScaler |
| from safetensors.torch import save_file, load_file |
|
|
| import math |
| import os |
| import json |
| from typing import Optional, Dict, List, Tuple, Union |
| from dataclasses import dataclass, asdict |
| import time |
| from pathlib import Path |
| from tqdm import tqdm |
|
|
| |
| from huggingface_hub import HfApi, create_repo, upload_folder, upload_file |
| import yaml |
|
|
| |
| from geovocab2.train.model.layers.attention.cantor_multiheaded_fusion import ( |
| CantorMultiheadFusion, |
| CantorFusionConfig |
| ) |
| from geovocab2.shapes.factory.cantor_route_factory import ( |
| CantorRouteFactory, |
| RouteMode, |
| SimplexConfig |
| ) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class CantorTrainingConfig: |
| """Complete configuration for Cantor fusion training with AdamW + Warm Restarts.""" |
| |
| |
| dataset: str = "cifar10" |
| num_classes: int = 10 |
| |
| |
| image_size: int = 32 |
| patch_size: int = 4 |
| embed_dim: int = 384 |
| num_fusion_blocks: int = 6 |
| num_heads: int = 8 |
| fusion_window: int = 32 |
| fusion_mode: str = "weighted" |
| k_simplex: int = 4 |
| use_beatrix: bool = False |
| beatrix_tau: float = 0.25 |
| |
| |
| precompute_geometric: bool = True |
| use_torch_compile: bool = True |
| use_mixed_precision: bool = False |
| |
| |
| dropout: float = 0.1 |
| drop_path_rate: float = 0.1 |
| label_smoothing: float = 0.1 |
| |
| |
| optimizer_type: str = "adamw" |
| batch_size: int = 128 |
| num_epochs: int = 300 |
| learning_rate: float = 3e-4 |
| weight_decay: float = 0.05 |
| grad_clip: float = 1.0 |
| |
| |
| sgd_momentum: float = 0.9 |
| sgd_nesterov: bool = True |
| |
| |
| adamw_betas: Tuple[float, float] = (0.9, 0.999) |
| adamw_eps: float = 1e-8 |
| |
| |
| scheduler_type: str = "cosine_restarts" |
| |
| |
| restart_period: int = 50 |
| restart_mult: int = 2 |
| min_lr: float = 1e-7 |
| |
| |
| lr_milestones: List[int] = None |
| lr_gamma: float = 0.2 |
| |
| |
| warmup_epochs: int = 10 |
| |
| |
| use_augmentation: bool = True |
| use_autoaugment: bool = True |
| use_cutout: bool = False |
| cutout_length: int = 16 |
| |
| |
| device: str = "cuda" if torch.cuda.is_available() else "cpu" |
| num_workers: int = 4 |
| seed: int = 42 |
| |
| |
| weights_dir: str = "weights" |
| model_name: str = "vit-beans-v3" |
| run_name: Optional[str] = None |
| |
| |
| hf_username: str = "AbstractPhil" |
| hf_repo_name: Optional[str] = None |
| upload_to_hf: bool = True |
| hf_token: Optional[str] = None |
| |
| |
| log_interval: int = 50 |
| save_interval: int = 10 |
| checkpoint_upload_interval: int = 20 |
| |
| def __post_init__(self): |
| |
| if self.dataset == "cifar10": |
| self.num_classes = 10 |
| elif self.dataset == "cifar100": |
| self.num_classes = 100 |
| else: |
| raise ValueError(f"Unknown dataset: {self.dataset}") |
| |
| |
| if self.lr_milestones is None: |
| if self.num_epochs >= 200: |
| self.lr_milestones = [60, 120, 160] |
| elif self.num_epochs >= 100: |
| self.lr_milestones = [30, 60, 80] |
| else: |
| self.lr_milestones = [ |
| int(self.num_epochs * 0.5), |
| int(self.num_epochs * 0.75) |
| ] |
| |
| |
| if self.run_name is None: |
| timestamp = time.strftime("%Y%m%d_%H%M%S") |
| opt_name = self.optimizer_type.upper() |
| sched_name = "WarmRestart" if self.scheduler_type == "cosine_restarts" else self.scheduler_type |
| self.run_name = f"{self.dataset}_{self.fusion_mode}_{opt_name}_{sched_name}_{timestamp}" |
| |
| |
| if self.hf_repo_name is None: |
| self.hf_repo_name = self.model_name |
| |
| |
| if self.hf_token is None: |
| self.hf_token = os.environ.get("HF_TOKEN") |
| |
| |
| assert self.image_size % self.patch_size == 0 |
| self.num_patches = (self.image_size // self.patch_size) ** 2 |
| self.patch_dim = self.patch_size * self.patch_size * 3 |
| |
| |
| self.output_dir = Path(self.weights_dir) / self.model_name / self.run_name |
| self.checkpoint_dir = self.output_dir / "checkpoints" |
| self.tensorboard_dir = self.output_dir / "tensorboard" |
| |
| |
| self.output_dir.mkdir(parents=True, exist_ok=True) |
| self.checkpoint_dir.mkdir(parents=True, exist_ok=True) |
| self.tensorboard_dir.mkdir(parents=True, exist_ok=True) |
| |
| def save(self, path: Union[str, Path]): |
| """Save config to YAML file.""" |
| path = Path(path) |
| config_dict = asdict(self) |
| |
| if 'adamw_betas' in config_dict: |
| config_dict['adamw_betas'] = list(config_dict['adamw_betas']) |
| with open(path, 'w') as f: |
| yaml.dump(config_dict, f, default_flow_style=False) |
| |
| @classmethod |
| def load(cls, path: Union[str, Path]): |
| """Load config from YAML file.""" |
| path = Path(path) |
| with open(path, 'r') as f: |
| config_dict = yaml.safe_load(f) |
| |
| if 'adamw_betas' in config_dict: |
| config_dict['adamw_betas'] = tuple(config_dict['adamw_betas']) |
| return cls(**config_dict) |
|
|
|
|
| |
| |
| |
|
|
| class PatchEmbedding(nn.Module): |
| """Patch embedding layer.""" |
| def __init__(self, config: CantorTrainingConfig): |
| super().__init__() |
| self.config = config |
| self.proj = nn.Conv2d(3, config.embed_dim, kernel_size=config.patch_size, stride=config.patch_size) |
| self.pos_embed = nn.Parameter(torch.randn(1, config.num_patches, config.embed_dim) * 0.02) |
| |
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| x = self.proj(x) |
| x = x.flatten(2).transpose(1, 2) |
| x = x + self.pos_embed |
| return x |
|
|
|
|
| class DropPath(nn.Module): |
| """Stochastic depth.""" |
| def __init__(self, drop_prob: float = 0.0): |
| super().__init__() |
| self.drop_prob = drop_prob |
|
|
| def forward(self, x): |
| if self.drop_prob == 0. or not self.training: |
| return x |
| keep_prob = 1 - self.drop_prob |
| shape = (x.shape[0],) + (1,) * (x.ndim - 1) |
| random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device) |
| random_tensor.floor_() |
| return x.div(keep_prob) * random_tensor |
|
|
|
|
| class CantorFusionBlock(nn.Module): |
| """Cantor fusion block.""" |
| def __init__(self, config: CantorTrainingConfig, drop_path: float = 0.0): |
| super().__init__() |
| self.norm1 = nn.LayerNorm(config.embed_dim) |
| |
| fusion_config = CantorFusionConfig( |
| dim=config.embed_dim, |
| num_heads=config.num_heads, |
| fusion_window=config.fusion_window, |
| fusion_mode=config.fusion_mode, |
| k_simplex=config.k_simplex, |
| use_beatrix_routing=config.use_beatrix, |
| use_consciousness_weighting=(config.fusion_mode == "consciousness"), |
| beatrix_tau=config.beatrix_tau, |
| use_gating=True, |
| dropout=config.dropout, |
| residual=False, |
| precompute_staircase=config.precompute_geometric, |
| precompute_routes=config.precompute_geometric, |
| precompute_distances=config.precompute_geometric, |
| use_optimized_gather=True, |
| staircase_cache_sizes=[config.num_patches], |
| use_torch_compile=config.use_torch_compile |
| ) |
| self.fusion = CantorMultiheadFusion(fusion_config) |
| |
| self.norm2 = nn.LayerNorm(config.embed_dim) |
| mlp_hidden = config.embed_dim * 4 |
| self.mlp = nn.Sequential( |
| nn.Linear(config.embed_dim, mlp_hidden), |
| nn.GELU(), |
| nn.Dropout(config.dropout), |
| nn.Linear(mlp_hidden, config.embed_dim), |
| nn.Dropout(config.dropout) |
| ) |
| self.drop_path = DropPath(drop_path) if drop_path > 0 else nn.Identity() |
| |
| def forward(self, x: torch.Tensor, return_fusion_info: bool = False) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict]]: |
| fusion_result = self.fusion(self.norm1(x)) |
| x = x + self.drop_path(fusion_result['output']) |
| x = x + self.drop_path(self.mlp(self.norm2(x))) |
| |
| if return_fusion_info: |
| fusion_info = { |
| 'consciousness': fusion_result.get('consciousness'), |
| 'cantor_measure': fusion_result.get('cantor_measure') |
| } |
| return x, fusion_info |
| return x |
|
|
|
|
| class CantorClassifier(nn.Module): |
| """Cantor fusion classifier.""" |
| def __init__(self, config: CantorTrainingConfig): |
| super().__init__() |
| self.config = config |
| |
| self.patch_embed = PatchEmbedding(config) |
| |
| dpr = [x.item() for x in torch.linspace(0, config.drop_path_rate, config.num_fusion_blocks)] |
| self.blocks = nn.ModuleList([ |
| CantorFusionBlock(config, drop_path=dpr[i]) |
| for i in range(config.num_fusion_blocks) |
| ]) |
| |
| self.norm = nn.LayerNorm(config.embed_dim) |
| self.head = nn.Linear(config.embed_dim, config.num_classes) |
| |
| self.apply(self._init_weights) |
| |
| def _init_weights(self, m): |
| if isinstance(m, nn.Linear): |
| nn.init.trunc_normal_(m.weight, std=0.02) |
| if m.bias is not None: |
| nn.init.constant_(m.bias, 0) |
| elif isinstance(m, nn.LayerNorm): |
| nn.init.constant_(m.bias, 0) |
| nn.init.constant_(m.weight, 1.0) |
| elif isinstance(m, nn.Conv2d): |
| nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') |
| |
| def forward(self, x: torch.Tensor, return_fusion_info: bool = False) -> Union[torch.Tensor, Tuple[torch.Tensor, List[Dict]]]: |
| x = self.patch_embed(x) |
| |
| fusion_infos = [] |
| for i, block in enumerate(self.blocks): |
| if return_fusion_info and i == len(self.blocks) - 1: |
| x, fusion_info = block(x, return_fusion_info=True) |
| fusion_infos.append(fusion_info) |
| else: |
| x = block(x) |
| |
| x = self.norm(x) |
| x = x.mean(dim=1) |
| logits = self.head(x) |
| |
| if return_fusion_info: |
| return logits, fusion_infos |
| return logits |
|
|
|
|
| |
| |
| |
|
|
| class HuggingFaceUploader: |
| """Manages HuggingFace Hub uploads to ONE shared repo.""" |
| |
| def __init__(self, config: CantorTrainingConfig): |
| self.config = config |
| self.api = HfApi(token=config.hf_token) if config.upload_to_hf else None |
| self.repo_id = f"{config.hf_username}/{config.hf_repo_name}" |
| self.run_prefix = f"runs/{config.run_name}" |
| |
| if config.upload_to_hf: |
| self._create_repo() |
| self._update_main_readme() |
| |
| def _create_repo(self): |
| """Create HuggingFace repo if it doesn't exist.""" |
| try: |
| create_repo( |
| repo_id=self.repo_id, |
| token=self.config.hf_token, |
| exist_ok=True, |
| private=False |
| ) |
| print(f"[HF] Repository: https://huggingface.co/{self.repo_id}") |
| print(f"[HF] Run folder: {self.run_prefix}") |
| except Exception as e: |
| print(f"[HF] Warning: Could not create repo: {e}") |
| |
| def _update_main_readme(self): |
| """Create or update the main shared README at repo root.""" |
| if not self.config.upload_to_hf or self.api is None: |
| return |
| |
| main_readme = f"""--- |
| tags: |
| - image-classification |
| - cantor-fusion |
| - geometric-deep-learning |
| - safetensors |
| - vision-transformer |
| - warm-restarts |
| library_name: pytorch |
| datasets: |
| - cifar10 |
| - cifar100 |
| metrics: |
| - accuracy |
| --- |
| |
| # {self.config.hf_repo_name} |
| |
| **Geometric Deep Learning with Cantor Multihead Fusion + AdamW Warm Restarts** |
| |
| This repository contains multiple training runs using Cantor fusion architecture with pentachoron structures, geometric routing, and **CosineAnnealingWarmRestarts** for automatic exploration cycles. |
| |
| ## Training Strategy: AdamW + Warm Restarts |
| |
| This model uses **AdamW with Cosine Annealing Warm Restarts** (SGDR): |
| - **Drop phase**: LR decays from {self.config.learning_rate} β {self.config.min_lr} over {self.config.restart_period} epochs |
| - **Restart phase**: LR jumps back to {self.config.learning_rate} to explore new regions |
| - **Cycle multiplier**: Each cycle is {self.config.restart_mult}x longer than previous |
| - **Benefits**: Automatic exploration + exploitation, finds better minima, robust training |
| |
| ### Restart Schedule |
| ``` |
| Epochs 0-{self.config.restart_period}: LR: {self.config.learning_rate} β {self.config.min_lr} (first cycle) |
| Epoch {self.config.restart_period}: LR: RESTART to {self.config.learning_rate} π |
| Epochs {self.config.restart_period}-{self.config.restart_period * (1 + self.config.restart_mult)}: LR: {self.config.learning_rate} β {self.config.min_lr} (longer cycle) |
| ... |
| ``` |
| |
| ## Current Run |
| |
| **Latest**: `{self.config.run_name}` |
| - **Dataset**: {self.config.dataset.upper()} |
| - **Fusion Mode**: {self.config.fusion_mode} |
| - **Optimizer**: AdamW (adaptive moments) |
| - **Scheduler**: CosineAnnealingWarmRestarts |
| - **Architecture**: {self.config.num_fusion_blocks} blocks, {self.config.num_heads} heads |
| - **Simplex**: {self.config.k_simplex}-simplex ({self.config.k_simplex + 1} vertices) |
| |
| ## Architecture |
| |
| The Cantor Fusion architecture uses: |
| - **Geometric Routing**: Pentachoron (5-simplex) structures for token routing |
| - **Cantor Multihead Fusion**: Multiple fusion heads with geometric attention |
| - **Beatrix Consciousness Routing**: Optional consciousness-aware token fusion |
| - **SafeTensors Format**: All model weights use SafeTensors (not pickle) |
| |
| ## Usage |
| ```python |
| from huggingface_hub import hf_hub_download |
| from safetensors.torch import load_file |
| |
| model_path = hf_hub_download( |
| repo_id="{self.repo_id}", |
| filename="runs/YOUR_RUN_NAME/checkpoints/best_model.safetensors" |
| ) |
| |
| state_dict = load_file(model_path) |
| model.load_state_dict(state_dict) |
| ``` |
| |
| ## Citation |
| ```bibtex |
| @misc{{{self.config.hf_repo_name.replace('-', '_')}, |
| author = {{AbstractPhil}}, |
| title = {{{self.config.hf_repo_name}: Geometric Deep Learning with Warm Restarts}}, |
| year = {{2025}}, |
| publisher = {{HuggingFace}}, |
| url = {{https://huggingface.co/{self.repo_id}}} |
| }} |
| ``` |
| |
| --- |
| |
| **Repository maintained by**: [@{self.config.hf_username}](https://huggingface.co/{self.config.hf_username}) |
| |
| **Latest update**: {time.strftime("%Y-%m-%d %H:%M:%S")} |
| """ |
| |
| main_readme_path = Path(self.config.weights_dir) / self.config.model_name / "MAIN_README.md" |
| main_readme_path.parent.mkdir(parents=True, exist_ok=True) |
| with open(main_readme_path, 'w') as f: |
| f.write(main_readme) |
| |
| try: |
| upload_file( |
| path_or_fileobj=str(main_readme_path), |
| path_in_repo="README.md", |
| repo_id=self.repo_id, |
| token=self.config.hf_token |
| ) |
| print(f"[HF] Updated main README") |
| except Exception as e: |
| print(f"[HF] Main README upload failed: {e}") |
| |
| def upload_file(self, file_path: Path, repo_path: str): |
| """Upload single file to HuggingFace.""" |
| if not self.config.upload_to_hf or self.api is None: |
| return |
| |
| try: |
| if not repo_path.startswith(self.run_prefix) and not repo_path.startswith("runs/"): |
| full_path = f"{self.run_prefix}/{repo_path}" |
| else: |
| full_path = repo_path |
| |
| upload_file( |
| path_or_fileobj=str(file_path), |
| path_in_repo=full_path, |
| repo_id=self.repo_id, |
| token=self.config.hf_token |
| ) |
| print(f"[HF] β Uploaded: {full_path}") |
| except Exception as e: |
| print(f"[HF] β Upload failed ({full_path}): {e}") |
| |
| def upload_folder_contents(self, folder_path: Path, repo_folder: str): |
| """Upload entire folder to HuggingFace.""" |
| if not self.config.upload_to_hf or self.api is None: |
| return |
| |
| try: |
| full_path = f"{self.run_prefix}/{repo_folder}" |
| upload_folder( |
| folder_path=str(folder_path), |
| repo_id=self.repo_id, |
| path_in_repo=full_path, |
| token=self.config.hf_token, |
| ignore_patterns=["*.pyc", "__pycache__"] |
| ) |
| print(f"[HF] Uploaded folder: {full_path}") |
| except Exception as e: |
| print(f"[HF] Folder upload failed: {e}") |
| |
| def create_model_card(self, trainer_stats: Dict): |
| """Create and upload run-specific model card.""" |
| if not self.config.upload_to_hf: |
| return |
| |
| run_card = f"""# Run: {self.config.run_name} |
| |
| ## Configuration |
| - **Dataset**: {self.config.dataset.upper()} |
| - **Fusion Mode**: {self.config.fusion_mode} |
| - **Parameters**: {trainer_stats['total_params']:,} |
| - **Simplex**: {self.config.k_simplex}-simplex ({self.config.k_simplex + 1} vertices) |
| |
| ## Performance |
| - **Best Validation Accuracy**: {trainer_stats['best_acc']:.2f}% |
| - **Training Time**: {trainer_stats['training_time']:.1f} hours |
| - **Final Epoch**: {trainer_stats['final_epoch']} |
| |
| ## Training Setup: AdamW + Warm Restarts |
| - **Optimizer**: AdamW (lr={self.config.learning_rate}, wd={self.config.weight_decay}) |
| - **Scheduler**: CosineAnnealingWarmRestarts |
| - **Restart Period (T_0)**: {self.config.restart_period} epochs |
| - **Cycle Multiplier (T_mult)**: {self.config.restart_mult}x |
| - **Min LR**: {self.config.min_lr} |
| - **Batch Size**: {self.config.batch_size} |
| - **Mixed Precision**: {trainer_stats.get('mixed_precision', False)} |
| |
| ### Learning Rate Schedule |
| ``` |
| Cycle 1: Epochs 0-{self.config.restart_period} |
| LR: {self.config.learning_rate} β {self.config.min_lr} (drop) |
| Expected: Convergence to local minimum |
| |
| Epoch {self.config.restart_period}: RESTART π |
| LR: {self.config.min_lr} β {self.config.learning_rate} (jump!) |
| Expected: Escape local minimum, explore new regions |
| |
| Cycle 2: Epochs {self.config.restart_period}-{self.config.restart_period * (1 + self.config.restart_mult)} |
| LR: {self.config.learning_rate} β {self.config.min_lr} (longer cycle) |
| Expected: Deeper convergence |
| |
| ... and so on |
| ``` |
| |
| ## Files |
| - `{self.run_prefix}/checkpoints/best_model.safetensors` - Model weights |
| - `{self.run_prefix}/checkpoints/best_training_state.pt` - Optimizer state |
| - `{self.run_prefix}/config.yaml` - Full configuration |
| - `{self.run_prefix}/tensorboard/` - TensorBoard logs (LR tracking!) |
| |
| ## Usage |
| ```python |
| from safetensors.torch import load_file |
| from huggingface_hub import hf_hub_download |
| |
| model_path = hf_hub_download( |
| repo_id="{self.repo_id}", |
| filename="{self.run_prefix}/checkpoints/best_model.safetensors" |
| ) |
| |
| state_dict = load_file(model_path) |
| model.load_state_dict(state_dict) |
| ``` |
| |
| ## Training Notes |
| |
| **Warm Restarts Benefits:** |
| - π **Exploration**: Periodic LR jumps escape local minima |
| - π **Exploitation**: Long drop phases converge deeply |
| - π― **Robustness**: Multiple restarts find better solutions |
| - π **Monitoring**: Watch TensorBoard for restart effects! |
| |
| **Expected Behavior:** |
| - Accuracy improves during each drop phase |
| - Brief accuracy dips after restarts (exploration) |
| - Overall upward trend across cycles |
| - Best models often found late in long cycles |
| |
| --- |
| |
| Built with geometric consciousness-aware routing using the Devil's Staircase (Beatrix) and pentachoron parameterization. |
| |
| **Training completed**: {time.strftime("%Y-%m-%d %H:%M:%S")} |
| |
| [β Back to main repository](https://huggingface.co/{self.repo_id}) |
| """ |
| |
| readme_path = self.config.output_dir / "RUN_README.md" |
| with open(readme_path, 'w') as f: |
| f.write(run_card) |
| |
| try: |
| upload_file( |
| path_or_fileobj=str(readme_path), |
| path_in_repo=f"{self.run_prefix}/README.md", |
| repo_id=self.repo_id, |
| token=self.config.hf_token |
| ) |
| print(f"[HF] Uploaded run README") |
| except Exception as e: |
| print(f"[HF] Run README upload failed: {e}") |
|
|
|
|
| |
| |
| |
|
|
| class Trainer: |
| """Training manager with AdamW + Warm Restarts.""" |
| |
| def __init__(self, config: CantorTrainingConfig): |
| self.config = config |
| self.device = torch.device(config.device) |
| |
| |
| torch.manual_seed(config.seed) |
| if torch.cuda.is_available(): |
| torch.cuda.manual_seed(config.seed) |
| |
| |
| print("\n" + "=" * 70) |
| print(f"Initializing Cantor Classifier - {config.dataset.upper()}") |
| print("=" * 70) |
| |
| init_start = time.time() |
| self.model = CantorClassifier(config).to(self.device) |
| init_time = time.time() - init_start |
| |
| print(f"\n[Model] Initialization time: {init_time:.2f}s") |
| self.print_model_info() |
| |
| |
| self.restart_epochs = self._calculate_restart_epochs() |
|
|
| |
| self.optimizer = self.create_optimizer() |
| |
| |
| self.scheduler = self.create_scheduler() |
| |
| |
| self.criterion = nn.CrossEntropyLoss(label_smoothing=config.label_smoothing) |
| |
| |
| self.use_amp = config.use_mixed_precision and config.device == "cuda" |
| self.scaler = GradScaler() if self.use_amp else None |
| |
| if self.use_amp: |
| print(f"[Training] Mixed precision enabled") |
| |
| |
| self.writer = SummaryWriter(log_dir=str(config.tensorboard_dir)) |
| print(f"[TensorBoard] Logging to: {config.tensorboard_dir}") |
| print(f"[Checkpoints] Format: SafeTensors (ClamAV safe)") |
| |
| |
| self.hf_uploader = HuggingFaceUploader(config) if config.upload_to_hf else None |
| |
| |
| config.save(config.output_dir / "config.yaml") |
| |
| |
| self.best_acc = 0.0 |
| self.global_step = 0 |
| self.start_time = time.time() |
| self.upload_count = 0 |
| |
| |
| def _calculate_restart_epochs(self) -> List[int]: |
| """Calculate when restarts will occur.""" |
| if self.config.scheduler_type != "cosine_restarts": |
| return [] |
| |
| restarts = [] |
| current = self.config.restart_period |
| period = self.config.restart_period |
| |
| while current < self.config.num_epochs: |
| restarts.append(current) |
| period *= self.config.restart_mult |
| current += period |
| |
| return restarts |
| |
| def create_optimizer(self): |
| """Create optimizer based on config.""" |
| if self.config.optimizer_type == "sgd": |
| print(f"\n[Optimizer] SGD") |
| print(f" LR: {self.config.learning_rate}") |
| print(f" Momentum: {self.config.sgd_momentum}") |
| print(f" Nesterov: {self.config.sgd_nesterov}") |
| print(f" Weight decay: {self.config.weight_decay}") |
| |
| return torch.optim.SGD( |
| self.model.parameters(), |
| lr=self.config.learning_rate, |
| momentum=self.config.sgd_momentum, |
| weight_decay=self.config.weight_decay, |
| nesterov=self.config.sgd_nesterov |
| ) |
| |
| elif self.config.optimizer_type == "adamw": |
| print(f"\n[Optimizer] AdamW") |
| print(f" LR: {self.config.learning_rate}") |
| print(f" Betas: {self.config.adamw_betas}") |
| print(f" Weight decay: {self.config.weight_decay}") |
| |
| return torch.optim.AdamW( |
| self.model.parameters(), |
| lr=self.config.learning_rate, |
| betas=self.config.adamw_betas, |
| eps=self.config.adamw_eps, |
| weight_decay=self.config.weight_decay |
| ) |
| |
| else: |
| raise ValueError(f"Unknown optimizer: {self.config.optimizer_type}") |
| |
| def create_scheduler(self): |
| """Create LR scheduler based on config.""" |
| if self.config.scheduler_type == "cosine_restarts": |
| print(f"\n[Scheduler] CosineAnnealingWarmRestarts") |
| print(f" T_0 (restart period): {self.config.restart_period} epochs") |
| print(f" T_mult (cycle multiplier): {self.config.restart_mult}x") |
| print(f" Min LR: {self.config.min_lr}") |
| print(f"\n Restart schedule:") |
| for i, epoch in enumerate(self.restart_epochs[:5]): |
| print(f" Restart #{i+1}: Epoch {epoch}") |
| if len(self.restart_epochs) > 5: |
| print(f" ... and {len(self.restart_epochs) - 5} more") |
| |
| return torch.optim.lr_scheduler.CosineAnnealingWarmRestarts( |
| self.optimizer, |
| T_0=self.config.restart_period, |
| T_mult=self.config.restart_mult, |
| eta_min=self.config.min_lr |
| ) |
| |
| elif self.config.scheduler_type == "multistep": |
| print(f"\n[Scheduler] MultiStepLR") |
| print(f" Milestones: {self.config.lr_milestones}") |
| print(f" Gamma: {self.config.lr_gamma}") |
| |
| return torch.optim.lr_scheduler.MultiStepLR( |
| self.optimizer, |
| milestones=self.config.lr_milestones, |
| gamma=self.config.lr_gamma |
| ) |
| |
| elif self.config.scheduler_type == "cosine": |
| print(f"\n[Scheduler] Cosine annealing with warmup") |
| print(f" Warmup epochs: {self.config.warmup_epochs}") |
| print(f" Min LR: {self.config.min_lr}") |
| |
| def lr_lambda(epoch): |
| if epoch < self.config.warmup_epochs: |
| return (epoch + 1) / self.config.warmup_epochs |
| progress = (epoch - self.config.warmup_epochs) / (self.config.num_epochs - self.config.warmup_epochs) |
| return 0.5 * (1 + math.cos(math.pi * progress)) |
| |
| return torch.optim.lr_scheduler.LambdaLR(self.optimizer, lr_lambda) |
| |
| else: |
| raise ValueError(f"Unknown scheduler: {self.config.scheduler_type}") |
| |
| def print_model_info(self): |
| """Print model info.""" |
| total_params = sum(p.numel() for p in self.model.parameters()) |
| print(f"\nParameters: {total_params:,}") |
| print(f"Dataset: {self.config.dataset.upper()}") |
| print(f"Classes: {self.config.num_classes}") |
| print(f"Fusion mode: {self.config.fusion_mode}") |
| print(f"Optimizer: {self.config.optimizer_type.upper()}") |
| print(f"Scheduler: {self.config.scheduler_type}") |
| print(f"Output: {self.config.output_dir}") |
| |
| def train_epoch(self, train_loader: DataLoader, epoch: int) -> Tuple[float, float]: |
| """Train one epoch.""" |
| self.model.train() |
| total_loss, correct, total = 0.0, 0, 0 |
| |
| |
| is_restart = (epoch in self.restart_epochs) |
| epoch_desc = f"Epoch {epoch+1}/{self.config.num_epochs}" |
| if is_restart: |
| epoch_desc += " π RESTART" |
| |
| pbar = tqdm(train_loader, desc=f"{epoch_desc} [Train]") |
| |
| for batch_idx, (images, labels) in enumerate(pbar): |
| images, labels = images.to(self.device, non_blocking=True), labels.to(self.device, non_blocking=True) |
| |
| |
| if self.use_amp: |
| with autocast(): |
| logits = self.model(images) |
| loss = self.criterion(logits, labels) |
| self.optimizer.zero_grad(set_to_none=True) |
| self.scaler.scale(loss).backward() |
| self.scaler.unscale_(self.optimizer) |
| torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_clip) |
| self.scaler.step(self.optimizer) |
| self.scaler.update() |
| else: |
| logits = self.model(images) |
| loss = self.criterion(logits, labels) |
| self.optimizer.zero_grad(set_to_none=True) |
| loss.backward() |
| torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_clip) |
| self.optimizer.step() |
| |
| |
| total_loss += loss.item() |
| _, predicted = logits.max(1) |
| correct += predicted.eq(labels).sum().item() |
| total += labels.size(0) |
| |
| |
| if batch_idx % self.config.log_interval == 0: |
| current_lr = self.scheduler.get_last_lr()[0] |
| self.writer.add_scalar('train/loss', loss.item(), self.global_step) |
| self.writer.add_scalar('train/accuracy', 100. * correct / total, self.global_step) |
| self.writer.add_scalar('train/learning_rate', current_lr, self.global_step) |
| |
| self.global_step += 1 |
| |
| pbar.set_postfix({ |
| 'loss': f'{loss.item():.4f}', |
| 'acc': f'{100. * correct / total:.2f}%', |
| 'lr': f'{self.scheduler.get_last_lr()[0]:.6f}' |
| }) |
| |
| return total_loss / len(train_loader), 100. * correct / total |
| |
| @torch.no_grad() |
| def evaluate(self, val_loader: DataLoader, epoch: int) -> Tuple[float, Dict]: |
| """Evaluate.""" |
| self.model.eval() |
| total_loss, correct, total = 0.0, 0, 0 |
| consciousness_values = [] |
| |
| pbar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{self.config.num_epochs} [Val] ") |
| |
| for batch_idx, (images, labels) in enumerate(pbar): |
| images, labels = images.to(self.device, non_blocking=True), labels.to(self.device, non_blocking=True) |
| |
| |
| return_info = (batch_idx == len(val_loader) - 1) |
| |
| if self.use_amp: |
| with autocast(): |
| if return_info: |
| logits, fusion_infos = self.model(images, return_fusion_info=True) |
| if fusion_infos and fusion_infos[0].get('consciousness') is not None: |
| consciousness_values.append(fusion_infos[0]['consciousness'].mean().item()) |
| else: |
| logits = self.model(images) |
| loss = self.criterion(logits, labels) |
| else: |
| if return_info: |
| logits, fusion_infos = self.model(images, return_fusion_info=True) |
| if fusion_infos and fusion_infos[0].get('consciousness') is not None: |
| consciousness_values.append(fusion_infos[0]['consciousness'].mean().item()) |
| else: |
| logits = self.model(images) |
| loss = self.criterion(logits, labels) |
| |
| total_loss += loss.item() |
| _, predicted = logits.max(1) |
| correct += predicted.eq(labels).sum().item() |
| total += labels.size(0) |
| |
| pbar.set_postfix({ |
| 'loss': f'{total_loss / (batch_idx + 1):.4f}', |
| 'acc': f'{100. * correct / total:.2f}%' |
| }) |
| |
| avg_loss = total_loss / len(val_loader) |
| accuracy = 100. * correct / total |
| |
| |
| self.writer.add_scalar('val/loss', avg_loss, epoch) |
| self.writer.add_scalar('val/accuracy', accuracy, epoch) |
| if consciousness_values: |
| self.writer.add_scalar('val/consciousness', sum(consciousness_values) / len(consciousness_values), epoch) |
| |
| metrics = { |
| 'loss': avg_loss, |
| 'accuracy': accuracy, |
| 'consciousness': sum(consciousness_values) / len(consciousness_values) if consciousness_values else None |
| } |
| |
| return accuracy, metrics |
| |
| def train(self, train_loader: DataLoader, val_loader: DataLoader): |
| """Full training loop.""" |
| print("\n" + "=" * 70) |
| print("Starting training with AdamW + Warm Restarts") |
| print(f"Optimizer: {self.config.optimizer_type.upper()}") |
| print(f"Scheduler: {self.config.scheduler_type}") |
| print(f"Restart period: {self.config.restart_period} epochs (T_0)") |
| print(f"Cycle multiplier: {self.config.restart_mult}x (T_mult)") |
| print(f"Total restarts: {len(self.restart_epochs)}") |
| print("=" * 70 + "\n") |
| |
| for epoch in range(self.config.num_epochs): |
| |
| train_loss, train_acc = self.train_epoch(train_loader, epoch) |
| |
| |
| val_acc, val_metrics = self.evaluate(val_loader, epoch) |
| |
| |
| self.scheduler.step() |
| |
| |
| is_restart = (epoch in self.restart_epochs) |
| next_lr = self.scheduler.get_last_lr()[0] |
| |
| |
| print(f"\n{'='*70}") |
| print(f"Epoch [{epoch + 1}/{self.config.num_epochs}] Summary:") |
| print(f" Train: Loss={train_loss:.4f}, Acc={train_acc:.2f}%") |
| print(f" Val: Loss={val_metrics['loss']:.4f}, Acc={val_acc:.2f}%") |
| if val_metrics['consciousness']: |
| print(f" Consciousness: {val_metrics['consciousness']:.4f}") |
| |
| if is_restart: |
| print(f" π WARM RESTART! Next LR: {next_lr:.6f}") |
| print(f" (Escaping local minimum, exploring new regions)") |
| else: |
| print(f" Current LR: {next_lr:.6f}") |
| |
| |
| is_best = val_acc > self.best_acc |
| should_save_regular = ((epoch + 1) % self.config.save_interval == 0) |
| should_upload_regular = ((epoch + 1) % self.config.checkpoint_upload_interval == 0) |
| |
| if is_best: |
| self.best_acc = val_acc |
| print(f" β New best model! Accuracy: {val_acc:.2f}%") |
| self.save_checkpoint(epoch, val_acc, prefix="best", upload=should_upload_regular) |
| |
| if should_save_regular: |
| self.save_checkpoint(epoch, val_acc, prefix=f"epoch_{epoch+1}", upload=should_upload_regular) |
| |
| print(f" HF Uploads: {self.upload_count}") |
| print(f"{'='*70}\n") |
| |
| |
| if (epoch + 1) % 10 == 0: |
| self.writer.flush() |
| |
| |
| training_time = (time.time() - self.start_time) / 3600 |
| |
| print("\n" + "=" * 70) |
| print("Training Complete!") |
| print(f"Best Validation Accuracy: {self.best_acc:.2f}%") |
| print(f"Training Time: {training_time:.2f} hours") |
| print(f"Total Uploads: {self.upload_count}") |
| print(f"Warm Restarts: {len(self.restart_epochs)}") |
| print("=" * 70) |
| |
| |
| if self.hf_uploader: |
| print("\n[HF] Uploading final best model...") |
| best_model_path = self.config.checkpoint_dir / "best_model.safetensors" |
| best_state_path = self.config.checkpoint_dir / "best_training_state.pt" |
| best_metadata_path = self.config.checkpoint_dir / "best_metadata.json" |
| config_path = self.config.output_dir / "config.yaml" |
| |
| if best_model_path.exists(): |
| self.hf_uploader.upload_file(best_model_path, "checkpoints/best_model.safetensors") |
| if best_state_path.exists(): |
| self.hf_uploader.upload_file(best_state_path, "checkpoints/best_training_state.pt") |
| if best_metadata_path.exists(): |
| self.hf_uploader.upload_file(best_metadata_path, "checkpoints/best_metadata.json") |
| if config_path.exists(): |
| self.hf_uploader.upload_file(config_path, "config.yaml") |
| |
| print("[HF] Final upload: TensorBoard logs...") |
| self.hf_uploader.upload_folder_contents(self.config.tensorboard_dir, "tensorboard") |
| |
| trainer_stats = { |
| 'total_params': sum(p.numel() for p in self.model.parameters()), |
| 'best_acc': self.best_acc, |
| 'training_time': training_time, |
| 'final_epoch': self.config.num_epochs, |
| 'batch_size': self.config.batch_size, |
| 'mixed_precision': self.use_amp |
| } |
| self.hf_uploader.create_model_card(trainer_stats) |
| |
| self.writer.close() |
| |
| def save_checkpoint(self, epoch: int, accuracy: float, prefix: str = "checkpoint", upload: bool = False): |
| """Save checkpoint as safetensors with selective upload.""" |
| checkpoint_dir = self.config.checkpoint_dir |
| checkpoint_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| model_path = checkpoint_dir / f"{prefix}_model.safetensors" |
| save_file(self.model.state_dict(), str(model_path)) |
| |
| |
| training_state = { |
| 'optimizer_state_dict': self.optimizer.state_dict(), |
| 'scheduler_state_dict': self.scheduler.state_dict(), |
| } |
| if self.scaler is not None: |
| training_state['scaler_state_dict'] = self.scaler.state_dict() |
| |
| training_state_path = checkpoint_dir / f"{prefix}_training_state.pt" |
| torch.save(training_state, training_state_path) |
| |
| |
| metadata = { |
| 'epoch': epoch, |
| 'accuracy': accuracy, |
| 'best_accuracy': self.best_acc, |
| 'global_step': self.global_step, |
| 'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"), |
| 'optimizer': self.config.optimizer_type, |
| 'scheduler': self.config.scheduler_type, |
| 'learning_rate': self.scheduler.get_last_lr()[0] |
| } |
| metadata_path = checkpoint_dir / f"{prefix}_metadata.json" |
| with open(metadata_path, 'w') as f: |
| json.dump(metadata, f, indent=2) |
| |
| is_best = (prefix == "best") |
| |
| if is_best: |
| print(f" πΎ Saved best: {prefix}_model.safetensors") |
| else: |
| print(f" πΎ Saved: {prefix}_model.safetensors", end="") |
| |
| |
| if self.hf_uploader and upload: |
| self.hf_uploader.upload_file(model_path, f"checkpoints/{prefix}_model.safetensors") |
| self.hf_uploader.upload_file(training_state_path, f"checkpoints/{prefix}_training_state.pt") |
| self.hf_uploader.upload_file(metadata_path, f"checkpoints/{prefix}_metadata.json") |
| |
| if is_best: |
| config_path = self.config.output_dir / "config.yaml" |
| if config_path.exists(): |
| self.hf_uploader.upload_file(config_path, "config.yaml") |
| |
| self.upload_count += 1 |
| |
| if not is_best: |
| print(" β Uploaded to HF") |
| else: |
| if not is_best: |
| print(" (local only)") |
|
|
|
|
| |
| |
| |
|
|
| class Cutout: |
| """Cutout data augmentation.""" |
| def __init__(self, length: int): |
| self.length = length |
| |
| def __call__(self, img): |
| h, w = img.size(1), img.size(2) |
| mask = torch.ones((h, w), dtype=torch.float32) |
| y = torch.randint(h, (1,)).item() |
| x = torch.randint(w, (1,)).item() |
| |
| y1 = max(0, y - self.length // 2) |
| y2 = min(h, y + self.length // 2) |
| x1 = max(0, x - self.length // 2) |
| x2 = min(w, x + self.length // 2) |
| |
| mask[y1:y2, x1:x2] = 0. |
| mask = mask.expand_as(img) |
| return img * mask |
|
|
|
|
| def get_data_loaders(config: CantorTrainingConfig) -> Tuple[DataLoader, DataLoader]: |
| """Create data loaders.""" |
| |
| |
| mean = (0.4914, 0.4822, 0.4465) |
| std = (0.2470, 0.2435, 0.2616) |
| |
| |
| if config.use_augmentation: |
| transforms_list = [] |
| |
| if config.use_autoaugment: |
| policy = transforms.AutoAugmentPolicy.CIFAR10 |
| transforms_list.append(transforms.AutoAugment(policy)) |
| else: |
| transforms_list.extend([ |
| transforms.RandomCrop(32, padding=4), |
| transforms.RandomHorizontalFlip(), |
| ]) |
| |
| transforms_list.append(transforms.ToTensor()) |
| transforms_list.append(transforms.Normalize(mean, std)) |
| |
| if config.use_cutout: |
| transforms_list.append(Cutout(config.cutout_length)) |
| |
| train_transform = transforms.Compose(transforms_list) |
| else: |
| train_transform = transforms.Compose([ |
| transforms.ToTensor(), |
| transforms.Normalize(mean, std) |
| ]) |
| |
| val_transform = transforms.Compose([ |
| transforms.ToTensor(), |
| transforms.Normalize(mean, std) |
| ]) |
| |
| |
| if config.dataset == "cifar10": |
| train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=train_transform) |
| val_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=val_transform) |
| elif config.dataset == "cifar100": |
| train_dataset = datasets.CIFAR100(root='./data', train=True, download=True, transform=train_transform) |
| val_dataset = datasets.CIFAR100(root='./data', train=False, download=True, transform=val_transform) |
| else: |
| raise ValueError(f"Unknown dataset: {config.dataset}") |
| |
| train_loader = DataLoader( |
| train_dataset, |
| batch_size=config.batch_size, |
| shuffle=True, |
| num_workers=config.num_workers, |
| pin_memory=(config.device == "cuda") |
| ) |
| |
| val_loader = DataLoader( |
| val_dataset, |
| batch_size=config.batch_size, |
| shuffle=False, |
| num_workers=config.num_workers, |
| pin_memory=(config.device == "cuda") |
| ) |
| |
| return train_loader, val_loader |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| """Main training function with AdamW + Warm Restarts.""" |
| |
| |
| |
| |
| |
| config = CantorTrainingConfig( |
| |
| dataset="cifar100", |
| |
| |
| embed_dim=512, |
| num_fusion_blocks=6, |
| num_heads=8, |
| fusion_mode="weighted", |
| k_simplex=4, |
| use_beatrix=True, |
| |
| |
| optimizer_type="adamw", |
| learning_rate=3e-4, |
| weight_decay=0.05, |
| adamw_betas=(0.9, 0.999), |
| |
| |
| scheduler_type="cosine_restarts", |
| restart_period=20, |
| restart_mult=2, |
| min_lr=1e-7, |
| |
| |
| num_epochs=300, |
| batch_size=128, |
| grad_clip=1.0, |
| label_smoothing=0.1, |
| |
| |
| use_augmentation=True, |
| use_autoaugment=True, |
| use_cutout=False, |
| cutout_length=16, |
| |
| |
| dropout=0.1, |
| drop_path_rate=0.1, |
| |
| |
| device="cuda", |
| use_mixed_precision=False, |
| |
| |
| hf_username="AbstractPhil", |
| upload_to_hf=True, |
| checkpoint_upload_interval=25, |
| ) |
| |
| print("=" * 70) |
| print(f"Cantor Fusion Classifier - {config.dataset.upper()}") |
| print("Training Strategy: AdamW + Cosine Annealing Warm Restarts (SGDR)") |
| print("=" * 70) |
| print(f"\nConfiguration:") |
| print(f" Dataset: {config.dataset}") |
| print(f" Fusion mode: {config.fusion_mode}") |
| print(f" Optimizer: AdamW") |
| print(f" Scheduler: CosineAnnealingWarmRestarts") |
| print(f" Initial LR: {config.learning_rate}") |
| print(f" Min LR: {config.min_lr}") |
| print(f" Restart period (T_0): {config.restart_period} epochs") |
| print(f" Cycle multiplier (T_mult): {config.restart_mult}x") |
| print(f" Total epochs: {config.num_epochs}") |
| |
| |
| restarts = [] |
| current = config.restart_period |
| period = config.restart_period |
| while current < config.num_epochs: |
| restarts.append(current) |
| period *= config.restart_mult |
| current += period |
| |
| print(f"\n Restart schedule ({len(restarts)} restarts):") |
| for i, epoch in enumerate(restarts[:5]): |
| print(f" Restart #{i+1}: Epoch {epoch}") |
| if len(restarts) > 5: |
| print(f" ... and {len(restarts) - 5} more") |
| |
| print(f"\n Output: {config.output_dir}") |
| print(f" HuggingFace: {'Enabled' if config.upload_to_hf else 'Disabled'}") |
| if config.upload_to_hf: |
| print(f" Repo: {config.hf_username}/{config.hf_repo_name}") |
| print(f" Run: {config.run_name}") |
| |
| print("\n" + "=" * 70) |
| print("Expected Training Behavior:") |
| print("=" * 70) |
| print("π Cycle 1 (epochs 0-50):") |
| print(" LR: 3e-4 β 1e-7 (smooth drop)") |
| print(" Expected: Convergence to local minimum") |
| print("") |
| print("π Epoch 50: RESTART!") |
| print(" LR: 1e-7 β 3e-4 (jump back up)") |
| print(" Expected: Escape local minimum, explore new regions") |
| print("") |
| print("π Cycle 2 (epochs 50-150):") |
| print(" LR: 3e-4 β 1e-7 (longer cycle, deeper convergence)") |
| print("") |
| print("π Epoch 150: RESTART!") |
| print(" LR: 1e-7 β 3e-4") |
| print("") |
| print("π Cycle 3 (epochs 150-350+):") |
| print(" LR: 3e-4 β 1e-7 (longest cycle, final polish)") |
| print("") |
| print("π― Benefits:") |
| print(" - Multiple exploration cycles find better minima") |
| print(" - Automatic drop + restart (no manual tuning)") |
| print(" - Robust to different datasets/architectures") |
| print("=" * 70) |
| |
| |
| print("\nLoading data...") |
| train_loader, val_loader = get_data_loaders(config) |
| print(f" Train: {len(train_loader.dataset)} samples") |
| print(f" Val: {len(val_loader.dataset)} samples") |
| |
| |
| trainer = Trainer(config) |
| trainer.train(train_loader, val_loader) |
| |
| print("\n" + "=" * 70) |
| print("π― Training complete!") |
| print(" Check TensorBoard to see the warm restart cycles!") |
| print(f" tensorboard --logdir {config.tensorboard_dir}") |
| print("") |
| print(" Look for:") |
| print(" - Smooth LR drops during each cycle") |
| print(" - Sharp LR jumps at restart epochs") |
| print(" - Accuracy improvements across cycles") |
| print("=" * 70) |
|
|
|
|
| if __name__ == "__main__": |
| main() |