# train_cantor_fusion_hf.py - WITH GEOMETRIC COALESCENCE LOSS """ Cantor Fusion Classifier with AdamW + Warm Restarts + LR Boost + Coalescence Loss ----------------------------------------------------------------------------------- Features: - AdamW optimizer (best for ViTs) - CosineAnnealingWarmRestarts with configurable LR boost at restarts - GeometricCoalescenceLoss: Unsupervised geometric supervision for shatter-reconstruct - HuggingFace Hub uploads (ONE shared repo, organized by run) - TensorBoard logging (loss, accuracy, fusion metrics, LR tracking, coalescence) - SafeTensors format (ClamAV safe) New Feature: Geometric Coalescence Loss Provides geometric scaffolding during aggressive LR boosting: - Consciousness Anchoring: High-awareness tokens cluster around learned attractors - Distance Preservation: Cantor measure topology guides embedding distances - Volume Preservation: Maintains simplex structural integrity - Adaptive weighting: Increases stabilization during LR spikes (0.1 → 0.8) Author: AbstractPhil License: MIT """ import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import DataLoader from torch.utils.tensorboard import SummaryWriter from torchvision import datasets, transforms from torch.cuda.amp import autocast, GradScaler from safetensors.torch import save_file, load_file import math import os import json from typing import Optional, Dict, List, Tuple, Union from dataclasses import dataclass, asdict import time from pathlib import Path from tqdm import tqdm # HuggingFace from huggingface_hub import HfApi, create_repo, upload_folder, upload_file import yaml # Import from your repo from geovocab2.train.model.layers.attention.cantor_multiheaded_fusion_fp64 import ( CantorMultiheadFusion, CantorFusionConfig ) from geovocab2.shapes.factory.cantor_route_factory import ( CantorRouteFactory, RouteMode, SimplexConfig ) from geovocab2.train.losses.geometric_coalescence_loss import ( GeometricCoalescenceLoss, add_coalescence_loss_to_training ) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Mixing Augmentations (AlphaMix / Fractal AlphaMix) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ def alphamix_data(x, y, alpha_range=(0.3, 0.7), spatial_ratio=0.25): """Standard AlphaMix: Single spatially localized transparent overlay.""" batch_size = x.size(0) index = torch.randperm(batch_size, device=x.device) y_a, y_b = y, y[index] # Sample alpha from Beta distribution alpha_min, alpha_max = alpha_range beta_sample = torch.distributions.Beta(2.0, 2.0).sample().item() alpha = alpha_min + (alpha_max - alpha_min) * beta_sample # Compute overlay region _, _, H, W = x.shape overlay_ratio = torch.sqrt(torch.tensor(spatial_ratio)).item() overlay_h = int(H * overlay_ratio) overlay_w = int(W * overlay_ratio) top = torch.randint(0, H - overlay_h + 1, (1,), device=x.device).item() left = torch.randint(0, W - overlay_w + 1, (1,), device=x.device).item() # Blend composited_x = x.clone() overlay_region = alpha * x[:, :, top:top+overlay_h, left:left+overlay_w] background_region = (1 - alpha) * x[index, :, top:top+overlay_h, left:left+overlay_w] composited_x[:, :, top:top+overlay_h, left:left+overlay_w] = overlay_region + background_region return composited_x, y_a, y_b, alpha def alphamix_fractal( x: torch.Tensor, y: torch.Tensor, alpha_range=(0.3, 0.7), steps_range=(1, 3), triad_scales=(1/3, 1/9, 1/27), beta_shape=(2.0, 2.0), seed: Optional[int] = None, ): """Fractal AlphaMix: Triadic multi-patch overlays aligned to Cantor geometry.""" if seed is not None: torch.manual_seed(seed) B, C, H, W = x.shape device = x.device # Permutation for mixing idx = torch.randperm(B, device=device) y_a, y_b = y, y[idx] x_mix = x.clone() total_area = H * W # Beta distribution for transparency sampling k1, k2 = beta_shape beta_dist = torch.distributions.Beta(k1, k2) alpha_min, alpha_max = alpha_range # Storage for effective alpha calculation alpha_elems = [] area_weights = [] # Sample number of patches steps = torch.randint(steps_range[0], steps_range[1] + 1, (1,), device=device).item() for _ in range(steps): # Choose triadic scale scale_idx = torch.randint(0, len(triad_scales), (1,), device=device).item() scale = triad_scales[scale_idx] # Compute patch dimensions patch_area = max(1, int(total_area * scale)) side = int(torch.sqrt(torch.tensor(patch_area, dtype=torch.float32)).item()) h = max(1, min(H, side)) w = max(1, min(W, side)) # Random position top = torch.randint(0, H - h + 1, (1,), device=device).item() left = torch.randint(0, W - w + 1, (1,), device=device).item() # Sample transparency alpha_raw = beta_dist.sample().item() alpha = alpha_min + (alpha_max - alpha_min) * alpha_raw # Track for effective alpha alpha_elems.append(alpha) area_weights.append(h * w) # Blend patches fg = alpha * x[:, :, top:top + h, left:left + w] bg = (1 - alpha) * x[idx, :, top:top + h, left:left + w] x_mix[:, :, top:top + h, left:left + w] = fg + bg # Compute area-weighted effective alpha alpha_t = torch.tensor(alpha_elems, dtype=torch.float32, device=device) area_t = torch.tensor(area_weights, dtype=torch.float32, device=device) alpha_eff = (alpha_t * area_t).sum() / (area_t.sum() + 1e-12) alpha_eff = alpha_eff.item() return x_mix, y_a, y_b, alpha_eff # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Custom Scheduler with LR Boost at Restarts # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class CosineAnnealingWarmRestartsWithBoost(torch.optim.lr_scheduler._LRScheduler): """Cosine Annealing with Warm Restarts and optional LR boost at restart points.""" def __init__( self, optimizer: torch.optim.Optimizer, T_0: int, T_mult: float = 1, eta_min: float = 0, restart_lr_mult: float = 1.0, last_epoch: int = -1 ): if T_0 <= 0 or not isinstance(T_0, int): raise ValueError(f"Expected positive integer T_0, but got {T_0}") if T_mult < 1: raise ValueError(f"Expected T_mult >= 1, but got {T_mult}") if restart_lr_mult <= 0: raise ValueError(f"Expected positive restart_lr_mult, but got {restart_lr_mult}") self.T_0 = T_0 self.T_i = T_0 self.T_mult = T_mult self.eta_min = eta_min self.restart_lr_mult = restart_lr_mult self.T_cur = last_epoch # Track boosted base LRs and restart count self.current_base_lrs = None self.restart_count = 0 super().__init__(optimizer, last_epoch) def get_lr(self): if self.T_cur == -1: return self.base_lrs # Use boosted base LRs if we've had restarts if self.current_base_lrs is None: base_lrs_to_use = self.base_lrs else: base_lrs_to_use = self.current_base_lrs # Cosine annealing from current base LR to eta_min return [ self.eta_min + (base_lr - self.eta_min) * (1 + math.cos(math.pi * self.T_cur / self.T_i)) / 2 for base_lr in base_lrs_to_use ] def step(self, epoch=None): if epoch is None and self.last_epoch < 0: epoch = 0 if epoch is None: epoch = self.last_epoch + 1 self.T_cur = self.T_cur + 1 # Check if we hit a restart point if self.T_cur >= self.T_i: # APPLY BOOST HERE before reset self.restart_count += 1 if self.current_base_lrs is None: self.current_base_lrs = list(self.base_lrs) # Boost the base LRs self.current_base_lrs = [ base_lr * self.restart_lr_mult for base_lr in self.current_base_lrs ] # Now reset cycle self.T_cur = self.T_cur - self.T_i self.T_i = int(self.T_i * self.T_mult) else: if epoch < 0: raise ValueError(f"Expected non-negative epoch, but got {epoch}") if epoch >= self.T_0: if self.T_mult == 1: self.T_cur = epoch % self.T_0 self.restart_count = epoch // self.T_0 else: n = int(math.log((epoch / self.T_0 * (self.T_mult - 1) + 1), self.T_mult)) self.restart_count = n self.T_cur = epoch - self.T_0 * (self.T_mult ** n - 1) / (self.T_mult - 1) self.T_i = self.T_0 * self.T_mult ** n # Apply cumulative boost if self.current_base_lrs is None: self.current_base_lrs = [ base_lr * (self.restart_lr_mult ** self.restart_count) for base_lr in self.base_lrs ] else: self.T_i = self.T_0 self.T_cur = epoch self.last_epoch = math.floor(epoch) for param_group, lr in zip(self.optimizer.param_groups, self.get_lr()): param_group['lr'] = lr self._last_lr = [group['lr'] for group in self.optimizer.param_groups] # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Configuration # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ @dataclass class CantorTrainingConfig: """Complete configuration for Cantor fusion training with coalescence loss.""" # Dataset dataset: str = "cifar10" num_classes: int = 10 # Architecture image_size: int = 32 patch_size: int = 4 embed_dim: int = 384 num_fusion_blocks: int = 6 num_heads: int = 8 fusion_window: int = 32 fusion_mode: str = "weighted" k_simplex: int = 4 use_beatrix: bool = False beatrix_tau: float = 0.25 # Optimization precompute_geometric: bool = True use_torch_compile: bool = True use_mixed_precision: bool = False # Regularization dropout: float = 0.1 drop_path_rate: float = 0.1 label_smoothing: float = 0.1 # Training - Optimizer (AdamW) optimizer_type: str = "adamw" batch_size: int = 128 num_epochs: int = 300 learning_rate: float = 3e-4 weight_decay: float = 0.05 grad_clip: float = 1.0 # SGD-specific sgd_momentum: float = 0.9 sgd_nesterov: bool = True # AdamW-specific adamw_betas: Tuple[float, float] = (0.9, 0.999) adamw_eps: float = 1e-8 # Learning rate schedule - WARM RESTARTS WITH BOOST scheduler_type: str = "cosine_restarts" restart_period: int = 50 restart_mult: float = 2.0 restart_lr_mult: float = 1.0 # LR multiplier at restarts min_lr: float = 1e-7 # MultiStepLR (fallback) lr_milestones: List[int] = None lr_gamma: float = 0.2 # Cosine annealing (regular) warmup_epochs: int = 0 # Data augmentation use_augmentation: bool = True use_autoaugment: bool = True use_cutout: bool = False cutout_length: int = 16 # Mixing augmentation use_mixing: bool = False mixing_type: str = "alphamix" mixing_alpha_range: Tuple[float, float] = (0.3, 0.7) mixing_spatial_ratio: float = 0.25 mixing_prob: float = 1.0 fractal_steps_range: Tuple[int, int] = (1, 3) fractal_triad_scales: Tuple[float, ...] = (1/3, 1/9, 1/27) # Geometric Coalescence Loss use_coalescence_loss: bool = True lambda_coalescence: float = 0.5 coalescence_num_anchors: int = 64 coalescence_target_variance: float = 0.5 coalescence_base_weight: float = 0.1 coalescence_max_weight: float = 0.8 coalescence_weight_power: float = 2.0 coalescence_consciousness_weight: float = 0.3 coalescence_distance_weight: float = 0.4 coalescence_volume_weight: float = 0.3 coalescence_num_distance_pairs: int = 256 coalescence_num_simplex_samples: int = 32 # System device: str = "cuda" if torch.cuda.is_available() else "cpu" num_workers: int = 8 seed: int = 42 # Paths weights_dir: str = "weights" model_name: str = "vit-beans-v3" run_name: Optional[str] = None # HuggingFace hf_username: str = "AbstractPhil" hf_repo_name: Optional[str] = None upload_to_hf: bool = True hf_token: Optional[str] = None # Logging log_interval: int = 50 save_interval: int = 10 checkpoint_upload_interval: int = 20 def __post_init__(self): # Auto-set num_classes if self.dataset == "cifar10": self.num_classes = 10 elif self.dataset == "cifar100": self.num_classes = 100 else: raise ValueError(f"Unknown dataset: {self.dataset}") # Set default milestones if self.lr_milestones is None: if self.num_epochs >= 200: self.lr_milestones = [60, 120, 160] elif self.num_epochs >= 100: self.lr_milestones = [30, 60, 80] else: self.lr_milestones = [ int(self.num_epochs * 0.5), int(self.num_epochs * 0.75) ] # Auto-generate run name if self.run_name is None: timestamp = time.strftime("%Y%m%d_%H%M%S") opt_name = self.optimizer_type.upper() sched_name = "WarmRestart" if self.scheduler_type == "cosine_restarts" else self.scheduler_type boost_str = f"_boost{self.restart_lr_mult}x" if self.restart_lr_mult > 1.0 else "" coal_str = f"_coal{self.lambda_coalescence}" if self.use_coalescence_loss else "" self.run_name = f"{self.dataset}_{self.fusion_mode}_{opt_name}_{sched_name}{boost_str}{coal_str}_{timestamp}" # ONE SHARED REPO if self.hf_repo_name is None: self.hf_repo_name = self.model_name # Set HF token if self.hf_token is None: self.hf_token = os.environ.get("HF_TOKEN") # Calculate derived values assert self.image_size % self.patch_size == 0 self.num_patches = (self.image_size // self.patch_size) ** 2 self.patch_dim = self.patch_size * self.patch_size * 3 # Create paths self.output_dir = Path(self.weights_dir) / self.model_name / self.run_name self.checkpoint_dir = self.output_dir / "checkpoints" self.tensorboard_dir = self.output_dir / "tensorboard" # Create directories self.output_dir.mkdir(parents=True, exist_ok=True) self.checkpoint_dir.mkdir(parents=True, exist_ok=True) self.tensorboard_dir.mkdir(parents=True, exist_ok=True) def save(self, path: Union[str, Path]): """Save config to YAML file.""" path = Path(path) config_dict = asdict(self) # Convert tuples to lists for YAML if 'adamw_betas' in config_dict: config_dict['adamw_betas'] = list(config_dict['adamw_betas']) if 'mixing_alpha_range' in config_dict: config_dict['mixing_alpha_range'] = list(config_dict['mixing_alpha_range']) if 'fractal_steps_range' in config_dict: config_dict['fractal_steps_range'] = list(config_dict['fractal_steps_range']) if 'fractal_triad_scales' in config_dict: config_dict['fractal_triad_scales'] = list(config_dict['fractal_triad_scales']) with open(path, 'w') as f: yaml.dump(config_dict, f, default_flow_style=False) @classmethod def load(cls, path: Union[str, Path]): """Load config from YAML file.""" path = Path(path) with open(path, 'r') as f: config_dict = yaml.safe_load(f) # Convert lists back to tuples if 'adamw_betas' in config_dict: config_dict['adamw_betas'] = tuple(config_dict['adamw_betas']) if 'mixing_alpha_range' in config_dict: config_dict['mixing_alpha_range'] = tuple(config_dict['mixing_alpha_range']) if 'fractal_steps_range' in config_dict: config_dict['fractal_steps_range'] = tuple(config_dict['fractal_steps_range']) if 'fractal_triad_scales' in config_dict: config_dict['fractal_triad_scales'] = tuple(config_dict['fractal_triad_scales']) return cls(**config_dict) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Model Components # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class PatchEmbedding(nn.Module): """Patch embedding layer.""" def __init__(self, config: CantorTrainingConfig): super().__init__() self.config = config self.proj = nn.Conv2d(3, config.embed_dim, kernel_size=config.patch_size, stride=config.patch_size) self.pos_embed = nn.Parameter(torch.randn(1, config.num_patches, config.embed_dim) * 0.02) def forward(self, x: torch.Tensor) -> torch.Tensor: x = self.proj(x) x = x.flatten(2).transpose(1, 2) x = x + self.pos_embed return x class DropPath(nn.Module): """Stochastic depth.""" def __init__(self, drop_prob: float = 0.0): super().__init__() self.drop_prob = drop_prob def forward(self, x): if self.drop_prob == 0. or not self.training: return x keep_prob = 1 - self.drop_prob shape = (x.shape[0],) + (1,) * (x.ndim - 1) random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device) random_tensor.floor_() return x.div(keep_prob) * random_tensor class CantorFusionBlock(nn.Module): """Cantor fusion block.""" def __init__(self, config: CantorTrainingConfig, drop_path: float = 0.0): super().__init__() self.norm1 = nn.LayerNorm(config.embed_dim) fusion_config = CantorFusionConfig( dim=config.embed_dim, num_heads=config.num_heads, fusion_window=config.fusion_window, fusion_mode=config.fusion_mode, k_simplex=config.k_simplex, use_beatrix_routing=config.use_beatrix, use_consciousness_weighting=(config.fusion_mode == "consciousness"), beatrix_tau=config.beatrix_tau, use_gating=True, dropout=config.dropout, residual=False, precompute_staircase=config.precompute_geometric, precompute_routes=config.precompute_geometric, precompute_distances=config.precompute_geometric, use_optimized_gather=True, staircase_cache_sizes=[config.num_patches], use_torch_compile=config.use_torch_compile ) self.fusion = CantorMultiheadFusion(fusion_config) self.norm2 = nn.LayerNorm(config.embed_dim) mlp_hidden = config.embed_dim * 4 self.mlp = nn.Sequential( nn.Linear(config.embed_dim, mlp_hidden), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(mlp_hidden, config.embed_dim), nn.Dropout(config.dropout) ) self.drop_path = DropPath(drop_path) if drop_path > 0 else nn.Identity() def forward(self, x: torch.Tensor, return_fusion_info: bool = False) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict]]: fusion_result = self.fusion(self.norm1(x)) x = x + self.drop_path(fusion_result['output']) x_after_fusion = x # Save for coalescence loss x = x + self.drop_path(self.mlp(self.norm2(x))) if return_fusion_info: fusion_info = { 'output': x_after_fusion, # Embeddings after fusion (before MLP) 'consciousness': fusion_result.get('consciousness'), 'cantor_measure': fusion_result.get('cantor_measure') } return x, fusion_info return x class CantorClassifier(nn.Module): """Cantor fusion classifier.""" def __init__(self, config: CantorTrainingConfig): super().__init__() self.config = config self.patch_embed = PatchEmbedding(config) dpr = [x.item() for x in torch.linspace(0, config.drop_path_rate, config.num_fusion_blocks)] self.blocks = nn.ModuleList([ CantorFusionBlock(config, drop_path=dpr[i]) for i in range(config.num_fusion_blocks) ]) self.norm = nn.LayerNorm(config.embed_dim) self.head = nn.Linear(config.embed_dim, config.num_classes) self.apply(self._init_weights) def _init_weights(self, m): if isinstance(m, nn.Linear): nn.init.trunc_normal_(m.weight, std=0.02) if m.bias is not None: nn.init.constant_(m.bias, 0) elif isinstance(m, nn.LayerNorm): nn.init.constant_(m.bias, 0) nn.init.constant_(m.weight, 1.0) elif isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') def forward(self, x: torch.Tensor, return_fusion_info: bool = False) -> Union[torch.Tensor, Tuple[torch.Tensor, List[Dict]]]: x = self.patch_embed(x) fusion_infos = [] for i, block in enumerate(self.blocks): if return_fusion_info and i == len(self.blocks) - 1: x, fusion_info = block(x, return_fusion_info=True) fusion_infos.append(fusion_info) else: x = block(x) x = self.norm(x) x = x.mean(dim=1) logits = self.head(x) if return_fusion_info: return logits, fusion_infos return logits # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # HuggingFace Integration (unchanged) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class HuggingFaceUploader: """Manages HuggingFace Hub uploads to ONE shared repo.""" def __init__(self, config: CantorTrainingConfig): self.config = config self.api = HfApi(token=config.hf_token) if config.upload_to_hf else None self.repo_id = f"{config.hf_username}/{config.hf_repo_name}" self.run_prefix = f"runs/{config.run_name}" if config.upload_to_hf: self._create_repo() self._update_main_readme() def _create_repo(self): """Create HuggingFace repo if it doesn't exist.""" try: create_repo( repo_id=self.repo_id, token=self.config.hf_token, exist_ok=True, private=False ) print(f"[HF] Repository: https://huggingface.co/{self.repo_id}") print(f"[HF] Run folder: {self.run_prefix}") except Exception as e: print(f"[HF] Warning: Could not create repo: {e}") def _update_main_readme(self): """Create or update the main shared README at repo root.""" if not self.config.upload_to_hf or self.api is None: return boost_info = "" if self.config.restart_lr_mult > 1.0: boost_info = f""" ### 🚀 LR Boost + Geometric Coalescence This run uses **restart_lr_mult = {self.config.restart_lr_mult}x** with **GeometricCoalescenceLoss**: - LR boosts create aggressive exploration cycles - Coalescence loss provides geometric scaffolding during weight thrashing - Adaptive weighting: {self.config.coalescence_base_weight} → {self.config.coalescence_max_weight} during LR spikes - Model reconstructs from geometric first principles when patterns shatter """ main_readme = f"""--- tags: - image-classification - cantor-fusion - geometric-deep-learning - safetensors - vision-transformer - warm-restarts - geometric-coalescence library_name: pytorch datasets: - cifar10 - cifar100 metrics: - accuracy --- # {self.config.hf_repo_name} **Geometric Deep Learning with Cantor Multihead Fusion + Shatter-Reconstruct Training** This repository contains training runs using Cantor fusion architecture with: - Pentachoron (5-simplex) structures for geometric routing - CosineAnnealingWarmRestarts for exploration cycles - GeometricCoalescenceLoss for shatter-reconstruct training {boost_info} ## Current Run **Latest**: `{self.config.run_name}` - **Dataset**: {self.config.dataset.upper()} - **Fusion Mode**: {self.config.fusion_mode} - **Coalescence**: λ={self.config.lambda_coalescence} {'✓' if self.config.use_coalescence_loss else '✗'} - **LR Boost**: {self.config.restart_lr_mult}x {'🚀' if self.config.restart_lr_mult > 1.0 else ''} --- **Repository maintained by**: [@{self.config.hf_username}](https://huggingface.co/{self.config.hf_username}) """ main_readme_path = Path(self.config.weights_dir) / self.config.model_name / "MAIN_README.md" main_readme_path.parent.mkdir(parents=True, exist_ok=True) with open(main_readme_path, 'w') as f: f.write(main_readme) try: upload_file( path_or_fileobj=str(main_readme_path), path_in_repo="README.md", repo_id=self.repo_id, token=self.config.hf_token ) print(f"[HF] Updated main README") except Exception as e: print(f"[HF] Main README upload failed: {e}") def upload_file(self, file_path: Path, repo_path: str): """Upload single file to HuggingFace.""" if not self.config.upload_to_hf or self.api is None: return try: if not repo_path.startswith(self.run_prefix) and not repo_path.startswith("runs/"): full_path = f"{self.run_prefix}/{repo_path}" else: full_path = repo_path upload_file( path_or_fileobj=str(file_path), path_in_repo=full_path, repo_id=self.repo_id, token=self.config.hf_token ) print(f"[HF] ✓ Uploaded: {full_path}") except Exception as e: print(f"[HF] ✗ Upload failed ({full_path}): {e}") def upload_folder_contents(self, folder_path: Path, repo_folder: str): """Upload entire folder to HuggingFace.""" if not self.config.upload_to_hf or self.api is None: return try: full_path = f"{self.run_prefix}/{repo_folder}" upload_folder( folder_path=str(folder_path), repo_id=self.repo_id, path_in_repo=full_path, token=self.config.hf_token, ignore_patterns=["*.pyc", "__pycache__"] ) print(f"[HF] Uploaded folder: {full_path}") except Exception as e: print(f"[HF] Folder upload failed: {e}") def create_model_card(self, trainer_stats: Dict): """Create and upload run-specific model card.""" if not self.config.upload_to_hf: return # Create run card with coalescence info run_card = f"""# Run: {self.config.run_name} ## Configuration - **Dataset**: {self.config.dataset.upper()} - **Parameters**: {trainer_stats['total_params']:,} - **Coalescence Loss**: {'Enabled' if self.config.use_coalescence_loss else 'Disabled'} - **LR Boost**: {self.config.restart_lr_mult}x ## Performance - **Best Validation Accuracy**: {trainer_stats['best_acc']:.2f}% - **Training Time**: {trainer_stats['training_time']:.1f} hours --- Built with geometric shatter-reconstruct training. **Training completed**: {time.strftime("%Y-%m-%d %H:%M:%S")} """ readme_path = self.config.output_dir / "RUN_README.md" with open(readme_path, 'w') as f: f.write(run_card) try: upload_file( path_or_fileobj=str(readme_path), path_in_repo=f"{self.run_prefix}/README.md", repo_id=self.repo_id, token=self.config.hf_token ) print(f"[HF] Uploaded run README") except Exception as e: print(f"[HF] Run README upload failed: {e}") # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Trainer with Geometric Coalescence Loss # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class Trainer: """Training manager with AdamW + Warm Restarts + Coalescence Loss.""" def __init__(self, config: CantorTrainingConfig): self.config = config self.device = torch.device(config.device) # Set seed torch.manual_seed(config.seed) if torch.cuda.is_available(): torch.cuda.manual_seed(config.seed) # Model print("\n" + "=" * 70) print(f"Initializing Cantor Classifier - {config.dataset.upper()}") print("=" * 70) init_start = time.time() self.model = CantorClassifier(config).to(self.device) init_time = time.time() - init_start print(f"\n[Model] Initialization time: {init_time:.2f}s") self.print_model_info() # Track restart epochs self.restart_epochs = self._calculate_restart_epochs() # Optimizer self.optimizer = self.create_optimizer() # Scheduler self.scheduler = self.create_scheduler() # Loss self.criterion = nn.CrossEntropyLoss(label_smoothing=config.label_smoothing) # Geometric Coalescence Loss if config.use_coalescence_loss: print(f"\n[Coalescence Loss] Initializing...") self.coalescence_loss_fn = GeometricCoalescenceLoss( embed_dim=config.embed_dim, num_anchors=config.coalescence_num_anchors, k_simplex=config.k_simplex, target_variance=config.coalescence_target_variance, num_simplex_samples=config.coalescence_num_simplex_samples, num_distance_pairs=config.coalescence_num_distance_pairs, base_weight=config.coalescence_base_weight, max_weight=config.coalescence_max_weight, weight_power=config.coalescence_weight_power, consciousness_weight=config.coalescence_consciousness_weight, distance_weight=config.coalescence_distance_weight, volume_weight=config.coalescence_volume_weight ).to(self.device) print(f"[Coalescence] λ={config.lambda_coalescence}") print(f"[Coalescence] Adaptive weight: {config.coalescence_base_weight} → {config.coalescence_max_weight}") print(f"[Coalescence] Components: anchor={config.coalescence_consciousness_weight:.1f}, " f"dist={config.coalescence_distance_weight:.1f}, vol={config.coalescence_volume_weight:.1f}") else: self.coalescence_loss_fn = None print(f"\n[Coalescence Loss] Disabled") # Mixing info self.use_mixing = config.use_mixing self.mixing_type = config.mixing_type self.mixing_prob = config.mixing_prob # Mixed precision self.use_amp = config.use_mixed_precision and config.device == "cuda" self.scaler = GradScaler() if self.use_amp else None if self.use_amp: print(f"[Training] Mixed precision enabled") # TensorBoard self.writer = SummaryWriter(log_dir=str(config.tensorboard_dir)) print(f"[TensorBoard] Logging to: {config.tensorboard_dir}") # HuggingFace self.hf_uploader = HuggingFaceUploader(config) if config.upload_to_hf else None # Save config config.save(config.output_dir / "config.yaml") # Metrics self.best_acc = 0.0 self.global_step = 0 self.start_time = time.time() self.upload_count = 0 def apply_mixing(self, images: torch.Tensor, labels: torch.Tensor): """Apply mixing augmentation if enabled.""" if not self.use_mixing or torch.rand(1).item() > self.mixing_prob: return images, labels, None if self.mixing_type == "alphamix": mixed_images, y_a, y_b, alpha = alphamix_data( images, labels, alpha_range=self.config.mixing_alpha_range, spatial_ratio=self.config.mixing_spatial_ratio ) elif self.mixing_type == "fractal": mixed_images, y_a, y_b, alpha = alphamix_fractal( images, labels, alpha_range=self.config.mixing_alpha_range, steps_range=self.config.fractal_steps_range, triad_scales=self.config.fractal_triad_scales ) else: raise ValueError(f"Unknown mixing type: {self.mixing_type}") return mixed_images, (y_a, y_b, alpha), alpha def compute_mixed_loss(self, logits: torch.Tensor, mixed_labels): """Compute loss for mixed labels.""" if mixed_labels is None: return None y_a, y_b, alpha = mixed_labels loss_a = self.criterion(logits, y_a) loss_b = self.criterion(logits, y_b) loss = alpha * loss_a + (1 - alpha) * loss_b return loss def _calculate_restart_epochs(self) -> List[int]: """Calculate when restarts will occur.""" if self.config.scheduler_type != "cosine_restarts": return [] restarts = [] current = self.config.restart_period period = self.config.restart_period while current < self.config.num_epochs: restarts.append(current) period *= self.config.restart_mult current += period return restarts def create_optimizer(self): """Create optimizer based on config.""" if self.config.optimizer_type == "adamw": print(f"\n[Optimizer] AdamW") print(f" LR: {self.config.learning_rate}") print(f" Betas: {self.config.adamw_betas}") print(f" Weight decay: {self.config.weight_decay}") return torch.optim.AdamW( self.model.parameters(), lr=self.config.learning_rate, betas=self.config.adamw_betas, eps=self.config.adamw_eps, weight_decay=self.config.weight_decay ) else: raise ValueError(f"Unknown optimizer: {self.config.optimizer_type}") def create_scheduler(self): """Create LR scheduler based on config.""" if self.config.scheduler_type == "cosine_restarts": print(f"\n[Scheduler] CosineAnnealingWarmRestarts with LR Boost") print(f" T_0: {self.config.restart_period} epochs") print(f" T_mult: {self.config.restart_mult}x") print(f" Restart LR mult: {self.config.restart_lr_mult}x {'🚀' if self.config.restart_lr_mult > 1.0 else ''}") print(f" Min LR: {self.config.min_lr}") if self.config.restart_lr_mult > 1.0: print(f"\n 🚀 BOOST MODE ENABLED!") print(f" Creates wider exploration curves to escape local minima") print(f" Coalescence loss provides geometric scaffolding during thrashing") return CosineAnnealingWarmRestartsWithBoost( self.optimizer, T_0=self.config.restart_period, T_mult=self.config.restart_mult, eta_min=self.config.min_lr, restart_lr_mult=self.config.restart_lr_mult ) else: raise ValueError(f"Unknown scheduler: {self.config.scheduler_type}") def print_model_info(self): """Print model info.""" total_params = sum(p.numel() for p in self.model.parameters()) print(f"\nParameters: {total_params:,}") print(f"Dataset: {self.config.dataset.upper()}") print(f"Fusion mode: {self.config.fusion_mode}") print(f"Optimizer: {self.config.optimizer_type.upper()}") print(f"Scheduler: {self.config.scheduler_type}") if self.config.restart_lr_mult > 1.0: print(f"LR Boost: {self.config.restart_lr_mult}x at restarts 🚀") if self.config.use_coalescence_loss: print(f"Coalescence Loss: λ={self.config.lambda_coalescence} ✓") print(f"Output: {self.config.output_dir}") def train_epoch(self, train_loader: DataLoader, epoch: int) -> Tuple[float, float]: """Train one epoch with coalescence loss.""" self.model.train() total_loss, total_task_loss, total_coal_loss = 0.0, 0.0, 0.0 correct, total = 0, 0 mixing_applied_count = 0 total_batches = 0 # Check if this is a restart epoch is_restart = (epoch in self.restart_epochs) epoch_desc = f"Epoch {epoch+1}/{self.config.num_epochs}" if is_restart: restart_num = self.restart_epochs.index(epoch) + 1 boost_mult = self.config.restart_lr_mult ** restart_num if self.config.restart_lr_mult > 1.0 else 1.0 epoch_desc += f" 🔄 RESTART #{restart_num}" if self.config.restart_lr_mult > 1.0: epoch_desc += f" ({boost_mult:.2f}x)" pbar = tqdm(train_loader, desc=f"{epoch_desc} [Train]") for batch_idx, (images, labels) in enumerate(pbar): images, labels = images.to(self.device, non_blocking=True), labels.to(self.device, non_blocking=True) # Apply mixing augmentation original_labels = labels mixed_images, mixed_labels_info, mixing_alpha = self.apply_mixing(images, labels) if mixing_alpha is not None: mixing_applied_count += 1 images = mixed_images total_batches += 1 # Forward WITH fusion info for coalescence loss return_fusion = (self.coalescence_loss_fn is not None) if self.use_amp: with autocast(): if return_fusion: logits, fusion_infos = self.model(images, return_fusion_info=True) else: logits = self.model(images) # Task loss if mixing_alpha is not None: task_loss = self.compute_mixed_loss(logits, mixed_labels_info) else: task_loss = self.criterion(logits, labels) # Add coalescence loss coal_loss = torch.tensor(0.0, device=self.device) coal_metrics = {} # Initialize empty dict if self.coalescence_loss_fn and fusion_infos: coal_loss, coal_metrics = add_coalescence_loss_to_training( fusion_infos[-1], # Last layer self.coalescence_loss_fn, current_lr=self.scheduler.get_last_lr()[0], baseline_lr=self.config.learning_rate, lambda_coal=self.config.lambda_coalescence ) # Log coalescence metrics (FIXED: check if dict not empty) if batch_idx % self.config.log_interval == 0 and coal_metrics: self.writer.add_scalar('train/coalescence_loss', coal_loss.item(), self.global_step) self.writer.add_scalar('train/coalescence_weight', coal_metrics['adaptive_weight'], self.global_step) self.writer.add_scalar('train/anchor_loss', coal_metrics['anchor_loss'], self.global_step) self.writer.add_scalar('train/distance_loss', coal_metrics['distance_loss'], self.global_step) self.writer.add_scalar('train/volume_loss', coal_metrics['volume_loss'], self.global_step) # Total loss loss = task_loss + coal_loss self.optimizer.zero_grad(set_to_none=True) self.scaler.scale(loss).backward() self.scaler.unscale_(self.optimizer) torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_clip) self.scaler.step(self.optimizer) self.scaler.update() else: if return_fusion: logits, fusion_infos = self.model(images, return_fusion_info=True) else: logits = self.model(images) # Task loss if mixing_alpha is not None: task_loss = self.compute_mixed_loss(logits, mixed_labels_info) else: task_loss = self.criterion(logits, labels) # Add coalescence loss coal_loss = torch.tensor(0.0, device=self.device) coal_metrics = {} # Initialize empty dict if self.coalescence_loss_fn and fusion_infos: coal_loss, coal_metrics = add_coalescence_loss_to_training( fusion_infos[-1], self.coalescence_loss_fn, current_lr=self.scheduler.get_last_lr()[0], baseline_lr=self.config.learning_rate, lambda_coal=self.config.lambda_coalescence ) # Log coalescence metrics (FIXED: check if dict not empty) if batch_idx % self.config.log_interval == 0 and coal_metrics: self.writer.add_scalar('train/coalescence_loss', coal_loss.item(), self.global_step) self.writer.add_scalar('train/coalescence_weight', coal_metrics['adaptive_weight'], self.global_step) self.writer.add_scalar('train/anchor_loss', coal_metrics['anchor_loss'], self.global_step) self.writer.add_scalar('train/distance_loss', coal_metrics['distance_loss'], self.global_step) self.writer.add_scalar('train/volume_loss', coal_metrics['volume_loss'], self.global_step) # Total loss loss = task_loss + coal_loss self.optimizer.zero_grad(set_to_none=True) loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_clip) self.optimizer.step() # Metrics total_loss += loss.item() total_task_loss += task_loss.item() total_coal_loss += coal_loss.item() _, predicted = logits.max(1) correct += predicted.eq(original_labels).sum().item() total += original_labels.size(0) # TensorBoard logging if batch_idx % self.config.log_interval == 0: current_lr = self.scheduler.get_last_lr()[0] self.writer.add_scalar('train/total_loss', loss.item(), self.global_step) self.writer.add_scalar('train/task_loss', task_loss.item(), self.global_step) self.writer.add_scalar('train/accuracy', 100. * correct / total, self.global_step) self.writer.add_scalar('train/learning_rate', current_lr, self.global_step) if mixing_alpha is not None: self.writer.add_scalar('train/mixing_alpha', mixing_alpha, self.global_step) self.global_step += 1 # Progress bar postfix postfix_dict = { 'loss': f'{loss.item():.4f}', 'task': f'{task_loss.item():.4f}', 'acc': f'{100. * correct / total:.2f}%', 'lr': f'{self.scheduler.get_last_lr()[0]:.6f}' } if self.coalescence_loss_fn and coal_loss.item() > 0: postfix_dict['coal'] = f'{coal_loss.item():.4f}' if self.use_mixing: mix_pct = 100.0 * mixing_applied_count / total_batches postfix_dict['mix'] = f'{mix_pct:.0f}%' pbar.set_postfix(postfix_dict) return total_loss / len(train_loader), 100. * correct / total @torch.no_grad() def evaluate(self, val_loader: DataLoader, epoch: int) -> Tuple[float, Dict]: """Evaluate.""" self.model.eval() total_loss, correct, total = 0.0, 0, 0 consciousness_values = [] pbar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{self.config.num_epochs} [Val] ") for batch_idx, (images, labels) in enumerate(pbar): images, labels = images.to(self.device, non_blocking=True), labels.to(self.device, non_blocking=True) # Forward with fusion info on last batch return_info = (batch_idx == len(val_loader) - 1) if self.use_amp: with autocast(): if return_info: logits, fusion_infos = self.model(images, return_fusion_info=True) if fusion_infos and fusion_infos[0].get('consciousness') is not None: consciousness_values.append(fusion_infos[0]['consciousness'].mean().item()) else: logits = self.model(images) loss = self.criterion(logits, labels) else: if return_info: logits, fusion_infos = self.model(images, return_fusion_info=True) if fusion_infos and fusion_infos[0].get('consciousness') is not None: consciousness_values.append(fusion_infos[0]['consciousness'].mean().item()) else: logits = self.model(images) loss = self.criterion(logits, labels) total_loss += loss.item() _, predicted = logits.max(1) correct += predicted.eq(labels).sum().item() total += labels.size(0) pbar.set_postfix({ 'loss': f'{total_loss / (batch_idx + 1):.4f}', 'acc': f'{100. * correct / total:.2f}%' }) avg_loss = total_loss / len(val_loader) accuracy = 100. * correct / total # TensorBoard logging self.writer.add_scalar('val/loss', avg_loss, epoch) self.writer.add_scalar('val/accuracy', accuracy, epoch) if consciousness_values: self.writer.add_scalar('val/consciousness', sum(consciousness_values) / len(consciousness_values), epoch) metrics = { 'loss': avg_loss, 'accuracy': accuracy, 'consciousness': sum(consciousness_values) / len(consciousness_values) if consciousness_values else None } return accuracy, metrics def train(self, train_loader: DataLoader, val_loader: DataLoader): """Full training loop.""" print("\n" + "=" * 70) print("Starting training with Geometric Coalescence Loss") if self.config.restart_lr_mult > 1.0: print("🚀 LR Boost Mode + Geometric Scaffolding") print("=" * 70 + "\n") for epoch in range(self.config.num_epochs): # Train train_loss, train_acc = self.train_epoch(train_loader, epoch) # Evaluate val_acc, val_metrics = self.evaluate(val_loader, epoch) # Update scheduler self.scheduler.step() # Check restart status is_restart = (epoch in self.restart_epochs) next_is_restart = ((epoch + 1) in self.restart_epochs) next_lr = self.scheduler.get_last_lr()[0] # Print summary print(f"\n{'='*70}") print(f"Epoch [{epoch + 1}/{self.config.num_epochs}] Summary:") print(f" Train: Loss={train_loss:.4f}, Acc={train_acc:.2f}%") print(f" Val: Loss={val_metrics['loss']:.4f}, Acc={val_acc:.2f}%") if next_is_restart and self.config.restart_lr_mult > 1.0: print(f" ⚠️ RESTART COMING! Coalescence weight will increase for stabilization") elif is_restart and self.config.restart_lr_mult > 1.0: print(f" 🔄 WARM RESTART! Geometric scaffolding active") print(f" Current LR: {next_lr:.6f}") # Checkpoint is_best = val_acc > self.best_acc should_upload = ((epoch + 1) % self.config.checkpoint_upload_interval == 0) if is_best: self.best_acc = val_acc print(f" ✓ New best model! Accuracy: {val_acc:.2f}%") self.save_checkpoint(epoch, val_acc, prefix="best", upload=should_upload) print(f"{'='*70}\n") # Training complete training_time = (time.time() - self.start_time) / 3600 print("\n" + "=" * 70) print("Training Complete!") print(f"Best Validation Accuracy: {self.best_acc:.2f}%") print(f"Training Time: {training_time:.2f} hours") if self.config.restart_lr_mult > 1.0 and self.config.use_coalescence_loss: print("🚀 Shatter-reconstruct training successful!") print("=" * 70) # Upload to HuggingFace if self.hf_uploader: trainer_stats = { 'total_params': sum(p.numel() for p in self.model.parameters()), 'best_acc': self.best_acc, 'training_time': training_time, 'final_epoch': self.config.num_epochs, 'batch_size': self.config.batch_size, 'mixed_precision': self.use_amp } self.hf_uploader.create_model_card(trainer_stats) self.writer.close() def save_checkpoint(self, epoch: int, accuracy: float, prefix: str = "checkpoint", upload: bool = False): """Save checkpoint.""" checkpoint_dir = self.config.checkpoint_dir checkpoint_dir.mkdir(parents=True, exist_ok=True) # Save model weights model_path = checkpoint_dir / f"{prefix}_model.safetensors" save_file(self.model.state_dict(), str(model_path)) # Save training state training_state = { 'optimizer_state_dict': self.optimizer.state_dict(), 'scheduler_state_dict': self.scheduler.state_dict(), } if self.scaler is not None: training_state['scaler_state_dict'] = self.scaler.state_dict() if self.coalescence_loss_fn is not None: training_state['coalescence_anchors'] = self.coalescence_loss_fn.anchors.data training_state_path = checkpoint_dir / f"{prefix}_training_state.pt" torch.save(training_state, training_state_path) # Save metadata metadata = { 'epoch': epoch, 'accuracy': accuracy, 'best_accuracy': self.best_acc, 'global_step': self.global_step, 'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"), 'coalescence_enabled': self.config.use_coalescence_loss, 'restart_lr_mult': self.config.restart_lr_mult } metadata_path = checkpoint_dir / f"{prefix}_metadata.json" with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) print(f" 💾 Saved: {prefix}_model.safetensors") # Upload if self.hf_uploader and upload: self.hf_uploader.upload_file(model_path, f"checkpoints/{prefix}_model.safetensors") self.upload_count += 1 # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Data Loading # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class Cutout: """Cutout data augmentation.""" def __init__(self, length: int): self.length = length def __call__(self, img): h, w = img.size(1), img.size(2) mask = torch.ones((h, w), dtype=torch.float32) y = torch.randint(h, (1,)).item() x = torch.randint(w, (1,)).item() y1 = max(0, y - self.length // 2) y2 = min(h, y + self.length // 2) x1 = max(0, x - self.length // 2) x2 = min(w, x + self.length // 2) mask[y1:y2, x1:x2] = 0. mask = mask.expand_as(img) return img * mask def get_data_loaders(config: CantorTrainingConfig) -> Tuple[DataLoader, DataLoader]: """Create data loaders.""" mean = (0.4914, 0.4822, 0.4465) std = (0.2470, 0.2435, 0.2616) if config.use_augmentation: transforms_list = [] if config.use_autoaugment: transforms_list.append(transforms.AutoAugment(transforms.AutoAugmentPolicy.CIFAR10)) else: transforms_list.extend([ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), ]) transforms_list.append(transforms.ToTensor()) transforms_list.append(transforms.Normalize(mean, std)) if config.use_cutout: transforms_list.append(Cutout(config.cutout_length)) train_transform = transforms.Compose(transforms_list) else: train_transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize(mean, std) ]) val_transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize(mean, std) ]) if config.dataset == "cifar10": train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=train_transform) val_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=val_transform) elif config.dataset == "cifar100": train_dataset = datasets.CIFAR100(root='./data', train=True, download=True, transform=train_transform) val_dataset = datasets.CIFAR100(root='./data', train=False, download=True, transform=val_transform) else: raise ValueError(f"Unknown dataset: {config.dataset}") train_loader = DataLoader( train_dataset, batch_size=config.batch_size, shuffle=True, num_workers=config.num_workers, pin_memory=(config.device == "cuda") ) val_loader = DataLoader( val_dataset, batch_size=config.batch_size, shuffle=False, num_workers=config.num_workers, pin_memory=(config.device == "cuda") ) return train_loader, val_loader # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Main # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ def main(): """Main training function with Geometric Coalescence Loss.""" config = CantorTrainingConfig( # Dataset dataset="cifar100", # Architecture embed_dim=486, num_fusion_blocks=9, num_heads=9, fusion_mode="learned", k_simplex=8, use_beatrix=False, fusion_window=27, # Optimizer: AdamW optimizer_type="adamw", learning_rate=3e-4, weight_decay=0.05, adamw_betas=(0.9, 0.999), # Scheduler: Warm Restarts + LR BOOST scheduler_type="cosine_restarts", restart_period=12, restart_mult=1.5, restart_lr_mult=1.15, # 🚀 Aggressive exploration min_lr=1e-7, # Training num_epochs=300, batch_size=512, grad_clip=1.0, label_smoothing=0.15, # Augmentation use_augmentation=True, use_autoaugment=True, use_cutout=True, cutout_length=16, # Mixing use_mixing=True, mixing_type="alphamix", mixing_alpha_range=(0.3, 0.7), mixing_spatial_ratio=0.25, mixing_prob=0.5, # Geometric Coalescence Loss use_coalescence_loss=True, lambda_coalescence=0.5, coalescence_num_anchors=64, coalescence_target_variance=0.5, coalescence_base_weight=0.1, coalescence_max_weight=0.8, coalescence_weight_power=2.0, # Regularization dropout=0.1, drop_path_rate=0.15, # System device="cuda", use_mixed_precision=False, # HuggingFace hf_username="AbstractPhil", upload_to_hf=True, checkpoint_upload_interval=25, ) print("=" * 70) print(f"Cantor Fusion Classifier - {config.dataset.upper()}") print("Shatter-Reconstruct Training") print("=" * 70) print(f"\n🚀 LR Boost: {config.restart_lr_mult}x at restarts") print(f"🧬 Coalescence Loss: λ={config.lambda_coalescence}") print(f" Adaptive weight: {config.coalescence_base_weight} → {config.coalescence_max_weight}") print(f" Philosophy: Geometric truth survives when patterns shatter") print("=" * 70) # Load data print("\nLoading data...") train_loader, val_loader = get_data_loaders(config) print(f" Train: {len(train_loader.dataset)} samples") print(f" Val: {len(val_loader.dataset)} samples") # Train trainer = Trainer(config) trainer.train(train_loader, val_loader) print("\n" + "=" * 70) print("🎯 Shatter-reconstruct training complete!") print(f" tensorboard --logdir {config.tensorboard_dir}") print("=" * 70) if __name__ == "__main__": main()