vit-beans-v3 / trainer_v6_fp64_geometric_coalescence.py
AbstractPhil's picture
Create trainer_v6_fp64_geometric_coalescence.py
0f1f862 verified
# train_cantor_fusion_hf.py - WITH GEOMETRIC COALESCENCE LOSS
"""
Cantor Fusion Classifier with AdamW + Warm Restarts + LR Boost + Coalescence Loss
-----------------------------------------------------------------------------------
Features:
- AdamW optimizer (best for ViTs)
- CosineAnnealingWarmRestarts with configurable LR boost at restarts
- GeometricCoalescenceLoss: Unsupervised geometric supervision for shatter-reconstruct
- HuggingFace Hub uploads (ONE shared repo, organized by run)
- TensorBoard logging (loss, accuracy, fusion metrics, LR tracking, coalescence)
- SafeTensors format (ClamAV safe)
New Feature: Geometric Coalescence Loss
Provides geometric scaffolding during aggressive LR boosting:
- Consciousness Anchoring: High-awareness tokens cluster around learned attractors
- Distance Preservation: Cantor measure topology guides embedding distances
- Volume Preservation: Maintains simplex structural integrity
- Adaptive weighting: Increases stabilization during LR spikes (0.1 β†’ 0.8)
Author: AbstractPhil
License: MIT
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
from torch.cuda.amp import autocast, GradScaler
from safetensors.torch import save_file, load_file
import math
import os
import json
from typing import Optional, Dict, List, Tuple, Union
from dataclasses import dataclass, asdict
import time
from pathlib import Path
from tqdm import tqdm
# HuggingFace
from huggingface_hub import HfApi, create_repo, upload_folder, upload_file
import yaml
# Import from your repo
from geovocab2.train.model.layers.attention.cantor_multiheaded_fusion_fp64 import (
CantorMultiheadFusion,
CantorFusionConfig
)
from geovocab2.shapes.factory.cantor_route_factory import (
CantorRouteFactory,
RouteMode,
SimplexConfig
)
from geovocab2.train.losses.geometric_coalescence_loss import (
GeometricCoalescenceLoss,
add_coalescence_loss_to_training
)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Mixing Augmentations (AlphaMix / Fractal AlphaMix)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def alphamix_data(x, y, alpha_range=(0.3, 0.7), spatial_ratio=0.25):
"""Standard AlphaMix: Single spatially localized transparent overlay."""
batch_size = x.size(0)
index = torch.randperm(batch_size, device=x.device)
y_a, y_b = y, y[index]
# Sample alpha from Beta distribution
alpha_min, alpha_max = alpha_range
beta_sample = torch.distributions.Beta(2.0, 2.0).sample().item()
alpha = alpha_min + (alpha_max - alpha_min) * beta_sample
# Compute overlay region
_, _, H, W = x.shape
overlay_ratio = torch.sqrt(torch.tensor(spatial_ratio)).item()
overlay_h = int(H * overlay_ratio)
overlay_w = int(W * overlay_ratio)
top = torch.randint(0, H - overlay_h + 1, (1,), device=x.device).item()
left = torch.randint(0, W - overlay_w + 1, (1,), device=x.device).item()
# Blend
composited_x = x.clone()
overlay_region = alpha * x[:, :, top:top+overlay_h, left:left+overlay_w]
background_region = (1 - alpha) * x[index, :, top:top+overlay_h, left:left+overlay_w]
composited_x[:, :, top:top+overlay_h, left:left+overlay_w] = overlay_region + background_region
return composited_x, y_a, y_b, alpha
def alphamix_fractal(
x: torch.Tensor,
y: torch.Tensor,
alpha_range=(0.3, 0.7),
steps_range=(1, 3),
triad_scales=(1/3, 1/9, 1/27),
beta_shape=(2.0, 2.0),
seed: Optional[int] = None,
):
"""Fractal AlphaMix: Triadic multi-patch overlays aligned to Cantor geometry."""
if seed is not None:
torch.manual_seed(seed)
B, C, H, W = x.shape
device = x.device
# Permutation for mixing
idx = torch.randperm(B, device=device)
y_a, y_b = y, y[idx]
x_mix = x.clone()
total_area = H * W
# Beta distribution for transparency sampling
k1, k2 = beta_shape
beta_dist = torch.distributions.Beta(k1, k2)
alpha_min, alpha_max = alpha_range
# Storage for effective alpha calculation
alpha_elems = []
area_weights = []
# Sample number of patches
steps = torch.randint(steps_range[0], steps_range[1] + 1, (1,), device=device).item()
for _ in range(steps):
# Choose triadic scale
scale_idx = torch.randint(0, len(triad_scales), (1,), device=device).item()
scale = triad_scales[scale_idx]
# Compute patch dimensions
patch_area = max(1, int(total_area * scale))
side = int(torch.sqrt(torch.tensor(patch_area, dtype=torch.float32)).item())
h = max(1, min(H, side))
w = max(1, min(W, side))
# Random position
top = torch.randint(0, H - h + 1, (1,), device=device).item()
left = torch.randint(0, W - w + 1, (1,), device=device).item()
# Sample transparency
alpha_raw = beta_dist.sample().item()
alpha = alpha_min + (alpha_max - alpha_min) * alpha_raw
# Track for effective alpha
alpha_elems.append(alpha)
area_weights.append(h * w)
# Blend patches
fg = alpha * x[:, :, top:top + h, left:left + w]
bg = (1 - alpha) * x[idx, :, top:top + h, left:left + w]
x_mix[:, :, top:top + h, left:left + w] = fg + bg
# Compute area-weighted effective alpha
alpha_t = torch.tensor(alpha_elems, dtype=torch.float32, device=device)
area_t = torch.tensor(area_weights, dtype=torch.float32, device=device)
alpha_eff = (alpha_t * area_t).sum() / (area_t.sum() + 1e-12)
alpha_eff = alpha_eff.item()
return x_mix, y_a, y_b, alpha_eff
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Custom Scheduler with LR Boost at Restarts
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class CosineAnnealingWarmRestartsWithBoost(torch.optim.lr_scheduler._LRScheduler):
"""Cosine Annealing with Warm Restarts and optional LR boost at restart points."""
def __init__(
self,
optimizer: torch.optim.Optimizer,
T_0: int,
T_mult: float = 1,
eta_min: float = 0,
restart_lr_mult: float = 1.0,
last_epoch: int = -1
):
if T_0 <= 0 or not isinstance(T_0, int):
raise ValueError(f"Expected positive integer T_0, but got {T_0}")
if T_mult < 1:
raise ValueError(f"Expected T_mult >= 1, but got {T_mult}")
if restart_lr_mult <= 0:
raise ValueError(f"Expected positive restart_lr_mult, but got {restart_lr_mult}")
self.T_0 = T_0
self.T_i = T_0
self.T_mult = T_mult
self.eta_min = eta_min
self.restart_lr_mult = restart_lr_mult
self.T_cur = last_epoch
# Track boosted base LRs and restart count
self.current_base_lrs = None
self.restart_count = 0
super().__init__(optimizer, last_epoch)
def get_lr(self):
if self.T_cur == -1:
return self.base_lrs
# Use boosted base LRs if we've had restarts
if self.current_base_lrs is None:
base_lrs_to_use = self.base_lrs
else:
base_lrs_to_use = self.current_base_lrs
# Cosine annealing from current base LR to eta_min
return [
self.eta_min + (base_lr - self.eta_min) *
(1 + math.cos(math.pi * self.T_cur / self.T_i)) / 2
for base_lr in base_lrs_to_use
]
def step(self, epoch=None):
if epoch is None and self.last_epoch < 0:
epoch = 0
if epoch is None:
epoch = self.last_epoch + 1
self.T_cur = self.T_cur + 1
# Check if we hit a restart point
if self.T_cur >= self.T_i:
# APPLY BOOST HERE before reset
self.restart_count += 1
if self.current_base_lrs is None:
self.current_base_lrs = list(self.base_lrs)
# Boost the base LRs
self.current_base_lrs = [
base_lr * self.restart_lr_mult
for base_lr in self.current_base_lrs
]
# Now reset cycle
self.T_cur = self.T_cur - self.T_i
self.T_i = int(self.T_i * self.T_mult)
else:
if epoch < 0:
raise ValueError(f"Expected non-negative epoch, but got {epoch}")
if epoch >= self.T_0:
if self.T_mult == 1:
self.T_cur = epoch % self.T_0
self.restart_count = epoch // self.T_0
else:
n = int(math.log((epoch / self.T_0 * (self.T_mult - 1) + 1), self.T_mult))
self.restart_count = n
self.T_cur = epoch - self.T_0 * (self.T_mult ** n - 1) / (self.T_mult - 1)
self.T_i = self.T_0 * self.T_mult ** n
# Apply cumulative boost
if self.current_base_lrs is None:
self.current_base_lrs = [
base_lr * (self.restart_lr_mult ** self.restart_count)
for base_lr in self.base_lrs
]
else:
self.T_i = self.T_0
self.T_cur = epoch
self.last_epoch = math.floor(epoch)
for param_group, lr in zip(self.optimizer.param_groups, self.get_lr()):
param_group['lr'] = lr
self._last_lr = [group['lr'] for group in self.optimizer.param_groups]
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Configuration
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@dataclass
class CantorTrainingConfig:
"""Complete configuration for Cantor fusion training with coalescence loss."""
# Dataset
dataset: str = "cifar10"
num_classes: int = 10
# Architecture
image_size: int = 32
patch_size: int = 4
embed_dim: int = 384
num_fusion_blocks: int = 6
num_heads: int = 8
fusion_window: int = 32
fusion_mode: str = "weighted"
k_simplex: int = 4
use_beatrix: bool = False
beatrix_tau: float = 0.25
# Optimization
precompute_geometric: bool = True
use_torch_compile: bool = True
use_mixed_precision: bool = False
# Regularization
dropout: float = 0.1
drop_path_rate: float = 0.1
label_smoothing: float = 0.1
# Training - Optimizer (AdamW)
optimizer_type: str = "adamw"
batch_size: int = 128
num_epochs: int = 300
learning_rate: float = 3e-4
weight_decay: float = 0.05
grad_clip: float = 1.0
# SGD-specific
sgd_momentum: float = 0.9
sgd_nesterov: bool = True
# AdamW-specific
adamw_betas: Tuple[float, float] = (0.9, 0.999)
adamw_eps: float = 1e-8
# Learning rate schedule - WARM RESTARTS WITH BOOST
scheduler_type: str = "cosine_restarts"
restart_period: int = 50
restart_mult: float = 2.0
restart_lr_mult: float = 1.0 # LR multiplier at restarts
min_lr: float = 1e-7
# MultiStepLR (fallback)
lr_milestones: List[int] = None
lr_gamma: float = 0.2
# Cosine annealing (regular)
warmup_epochs: int = 0
# Data augmentation
use_augmentation: bool = True
use_autoaugment: bool = True
use_cutout: bool = False
cutout_length: int = 16
# Mixing augmentation
use_mixing: bool = False
mixing_type: str = "alphamix"
mixing_alpha_range: Tuple[float, float] = (0.3, 0.7)
mixing_spatial_ratio: float = 0.25
mixing_prob: float = 1.0
fractal_steps_range: Tuple[int, int] = (1, 3)
fractal_triad_scales: Tuple[float, ...] = (1/3, 1/9, 1/27)
# Geometric Coalescence Loss
use_coalescence_loss: bool = True
lambda_coalescence: float = 0.5
coalescence_num_anchors: int = 64
coalescence_target_variance: float = 0.5
coalescence_base_weight: float = 0.1
coalescence_max_weight: float = 0.8
coalescence_weight_power: float = 2.0
coalescence_consciousness_weight: float = 0.3
coalescence_distance_weight: float = 0.4
coalescence_volume_weight: float = 0.3
coalescence_num_distance_pairs: int = 256
coalescence_num_simplex_samples: int = 32
# System
device: str = "cuda" if torch.cuda.is_available() else "cpu"
num_workers: int = 8
seed: int = 42
# Paths
weights_dir: str = "weights"
model_name: str = "vit-beans-v3"
run_name: Optional[str] = None
# HuggingFace
hf_username: str = "AbstractPhil"
hf_repo_name: Optional[str] = None
upload_to_hf: bool = True
hf_token: Optional[str] = None
# Logging
log_interval: int = 50
save_interval: int = 10
checkpoint_upload_interval: int = 20
def __post_init__(self):
# Auto-set num_classes
if self.dataset == "cifar10":
self.num_classes = 10
elif self.dataset == "cifar100":
self.num_classes = 100
else:
raise ValueError(f"Unknown dataset: {self.dataset}")
# Set default milestones
if self.lr_milestones is None:
if self.num_epochs >= 200:
self.lr_milestones = [60, 120, 160]
elif self.num_epochs >= 100:
self.lr_milestones = [30, 60, 80]
else:
self.lr_milestones = [
int(self.num_epochs * 0.5),
int(self.num_epochs * 0.75)
]
# Auto-generate run name
if self.run_name is None:
timestamp = time.strftime("%Y%m%d_%H%M%S")
opt_name = self.optimizer_type.upper()
sched_name = "WarmRestart" if self.scheduler_type == "cosine_restarts" else self.scheduler_type
boost_str = f"_boost{self.restart_lr_mult}x" if self.restart_lr_mult > 1.0 else ""
coal_str = f"_coal{self.lambda_coalescence}" if self.use_coalescence_loss else ""
self.run_name = f"{self.dataset}_{self.fusion_mode}_{opt_name}_{sched_name}{boost_str}{coal_str}_{timestamp}"
# ONE SHARED REPO
if self.hf_repo_name is None:
self.hf_repo_name = self.model_name
# Set HF token
if self.hf_token is None:
self.hf_token = os.environ.get("HF_TOKEN")
# Calculate derived values
assert self.image_size % self.patch_size == 0
self.num_patches = (self.image_size // self.patch_size) ** 2
self.patch_dim = self.patch_size * self.patch_size * 3
# Create paths
self.output_dir = Path(self.weights_dir) / self.model_name / self.run_name
self.checkpoint_dir = self.output_dir / "checkpoints"
self.tensorboard_dir = self.output_dir / "tensorboard"
# Create directories
self.output_dir.mkdir(parents=True, exist_ok=True)
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
self.tensorboard_dir.mkdir(parents=True, exist_ok=True)
def save(self, path: Union[str, Path]):
"""Save config to YAML file."""
path = Path(path)
config_dict = asdict(self)
# Convert tuples to lists for YAML
if 'adamw_betas' in config_dict:
config_dict['adamw_betas'] = list(config_dict['adamw_betas'])
if 'mixing_alpha_range' in config_dict:
config_dict['mixing_alpha_range'] = list(config_dict['mixing_alpha_range'])
if 'fractal_steps_range' in config_dict:
config_dict['fractal_steps_range'] = list(config_dict['fractal_steps_range'])
if 'fractal_triad_scales' in config_dict:
config_dict['fractal_triad_scales'] = list(config_dict['fractal_triad_scales'])
with open(path, 'w') as f:
yaml.dump(config_dict, f, default_flow_style=False)
@classmethod
def load(cls, path: Union[str, Path]):
"""Load config from YAML file."""
path = Path(path)
with open(path, 'r') as f:
config_dict = yaml.safe_load(f)
# Convert lists back to tuples
if 'adamw_betas' in config_dict:
config_dict['adamw_betas'] = tuple(config_dict['adamw_betas'])
if 'mixing_alpha_range' in config_dict:
config_dict['mixing_alpha_range'] = tuple(config_dict['mixing_alpha_range'])
if 'fractal_steps_range' in config_dict:
config_dict['fractal_steps_range'] = tuple(config_dict['fractal_steps_range'])
if 'fractal_triad_scales' in config_dict:
config_dict['fractal_triad_scales'] = tuple(config_dict['fractal_triad_scales'])
return cls(**config_dict)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Model Components
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class PatchEmbedding(nn.Module):
"""Patch embedding layer."""
def __init__(self, config: CantorTrainingConfig):
super().__init__()
self.config = config
self.proj = nn.Conv2d(3, config.embed_dim, kernel_size=config.patch_size, stride=config.patch_size)
self.pos_embed = nn.Parameter(torch.randn(1, config.num_patches, config.embed_dim) * 0.02)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.proj(x)
x = x.flatten(2).transpose(1, 2)
x = x + self.pos_embed
return x
class DropPath(nn.Module):
"""Stochastic depth."""
def __init__(self, drop_prob: float = 0.0):
super().__init__()
self.drop_prob = drop_prob
def forward(self, x):
if self.drop_prob == 0. or not self.training:
return x
keep_prob = 1 - self.drop_prob
shape = (x.shape[0],) + (1,) * (x.ndim - 1)
random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
random_tensor.floor_()
return x.div(keep_prob) * random_tensor
class CantorFusionBlock(nn.Module):
"""Cantor fusion block."""
def __init__(self, config: CantorTrainingConfig, drop_path: float = 0.0):
super().__init__()
self.norm1 = nn.LayerNorm(config.embed_dim)
fusion_config = CantorFusionConfig(
dim=config.embed_dim,
num_heads=config.num_heads,
fusion_window=config.fusion_window,
fusion_mode=config.fusion_mode,
k_simplex=config.k_simplex,
use_beatrix_routing=config.use_beatrix,
use_consciousness_weighting=(config.fusion_mode == "consciousness"),
beatrix_tau=config.beatrix_tau,
use_gating=True,
dropout=config.dropout,
residual=False,
precompute_staircase=config.precompute_geometric,
precompute_routes=config.precompute_geometric,
precompute_distances=config.precompute_geometric,
use_optimized_gather=True,
staircase_cache_sizes=[config.num_patches],
use_torch_compile=config.use_torch_compile
)
self.fusion = CantorMultiheadFusion(fusion_config)
self.norm2 = nn.LayerNorm(config.embed_dim)
mlp_hidden = config.embed_dim * 4
self.mlp = nn.Sequential(
nn.Linear(config.embed_dim, mlp_hidden),
nn.GELU(),
nn.Dropout(config.dropout),
nn.Linear(mlp_hidden, config.embed_dim),
nn.Dropout(config.dropout)
)
self.drop_path = DropPath(drop_path) if drop_path > 0 else nn.Identity()
def forward(self, x: torch.Tensor, return_fusion_info: bool = False) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict]]:
fusion_result = self.fusion(self.norm1(x))
x = x + self.drop_path(fusion_result['output'])
x_after_fusion = x # Save for coalescence loss
x = x + self.drop_path(self.mlp(self.norm2(x)))
if return_fusion_info:
fusion_info = {
'output': x_after_fusion, # Embeddings after fusion (before MLP)
'consciousness': fusion_result.get('consciousness'),
'cantor_measure': fusion_result.get('cantor_measure')
}
return x, fusion_info
return x
class CantorClassifier(nn.Module):
"""Cantor fusion classifier."""
def __init__(self, config: CantorTrainingConfig):
super().__init__()
self.config = config
self.patch_embed = PatchEmbedding(config)
dpr = [x.item() for x in torch.linspace(0, config.drop_path_rate, config.num_fusion_blocks)]
self.blocks = nn.ModuleList([
CantorFusionBlock(config, drop_path=dpr[i])
for i in range(config.num_fusion_blocks)
])
self.norm = nn.LayerNorm(config.embed_dim)
self.head = nn.Linear(config.embed_dim, config.num_classes)
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
nn.init.trunc_normal_(m.weight, std=0.02)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
elif isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
def forward(self, x: torch.Tensor, return_fusion_info: bool = False) -> Union[torch.Tensor, Tuple[torch.Tensor, List[Dict]]]:
x = self.patch_embed(x)
fusion_infos = []
for i, block in enumerate(self.blocks):
if return_fusion_info and i == len(self.blocks) - 1:
x, fusion_info = block(x, return_fusion_info=True)
fusion_infos.append(fusion_info)
else:
x = block(x)
x = self.norm(x)
x = x.mean(dim=1)
logits = self.head(x)
if return_fusion_info:
return logits, fusion_infos
return logits
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# HuggingFace Integration (unchanged)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class HuggingFaceUploader:
"""Manages HuggingFace Hub uploads to ONE shared repo."""
def __init__(self, config: CantorTrainingConfig):
self.config = config
self.api = HfApi(token=config.hf_token) if config.upload_to_hf else None
self.repo_id = f"{config.hf_username}/{config.hf_repo_name}"
self.run_prefix = f"runs/{config.run_name}"
if config.upload_to_hf:
self._create_repo()
self._update_main_readme()
def _create_repo(self):
"""Create HuggingFace repo if it doesn't exist."""
try:
create_repo(
repo_id=self.repo_id,
token=self.config.hf_token,
exist_ok=True,
private=False
)
print(f"[HF] Repository: https://huggingface.co/{self.repo_id}")
print(f"[HF] Run folder: {self.run_prefix}")
except Exception as e:
print(f"[HF] Warning: Could not create repo: {e}")
def _update_main_readme(self):
"""Create or update the main shared README at repo root."""
if not self.config.upload_to_hf or self.api is None:
return
boost_info = ""
if self.config.restart_lr_mult > 1.0:
boost_info = f"""
### πŸš€ LR Boost + Geometric Coalescence
This run uses **restart_lr_mult = {self.config.restart_lr_mult}x** with **GeometricCoalescenceLoss**:
- LR boosts create aggressive exploration cycles
- Coalescence loss provides geometric scaffolding during weight thrashing
- Adaptive weighting: {self.config.coalescence_base_weight} β†’ {self.config.coalescence_max_weight} during LR spikes
- Model reconstructs from geometric first principles when patterns shatter
"""
main_readme = f"""---
tags:
- image-classification
- cantor-fusion
- geometric-deep-learning
- safetensors
- vision-transformer
- warm-restarts
- geometric-coalescence
library_name: pytorch
datasets:
- cifar10
- cifar100
metrics:
- accuracy
---
# {self.config.hf_repo_name}
**Geometric Deep Learning with Cantor Multihead Fusion + Shatter-Reconstruct Training**
This repository contains training runs using Cantor fusion architecture with:
- Pentachoron (5-simplex) structures for geometric routing
- CosineAnnealingWarmRestarts for exploration cycles
- GeometricCoalescenceLoss for shatter-reconstruct training
{boost_info}
## Current Run
**Latest**: `{self.config.run_name}`
- **Dataset**: {self.config.dataset.upper()}
- **Fusion Mode**: {self.config.fusion_mode}
- **Coalescence**: Ξ»={self.config.lambda_coalescence} {'βœ“' if self.config.use_coalescence_loss else 'βœ—'}
- **LR Boost**: {self.config.restart_lr_mult}x {'πŸš€' if self.config.restart_lr_mult > 1.0 else ''}
---
**Repository maintained by**: [@{self.config.hf_username}](https://huggingface.co/{self.config.hf_username})
"""
main_readme_path = Path(self.config.weights_dir) / self.config.model_name / "MAIN_README.md"
main_readme_path.parent.mkdir(parents=True, exist_ok=True)
with open(main_readme_path, 'w') as f:
f.write(main_readme)
try:
upload_file(
path_or_fileobj=str(main_readme_path),
path_in_repo="README.md",
repo_id=self.repo_id,
token=self.config.hf_token
)
print(f"[HF] Updated main README")
except Exception as e:
print(f"[HF] Main README upload failed: {e}")
def upload_file(self, file_path: Path, repo_path: str):
"""Upload single file to HuggingFace."""
if not self.config.upload_to_hf or self.api is None:
return
try:
if not repo_path.startswith(self.run_prefix) and not repo_path.startswith("runs/"):
full_path = f"{self.run_prefix}/{repo_path}"
else:
full_path = repo_path
upload_file(
path_or_fileobj=str(file_path),
path_in_repo=full_path,
repo_id=self.repo_id,
token=self.config.hf_token
)
print(f"[HF] βœ“ Uploaded: {full_path}")
except Exception as e:
print(f"[HF] βœ— Upload failed ({full_path}): {e}")
def upload_folder_contents(self, folder_path: Path, repo_folder: str):
"""Upload entire folder to HuggingFace."""
if not self.config.upload_to_hf or self.api is None:
return
try:
full_path = f"{self.run_prefix}/{repo_folder}"
upload_folder(
folder_path=str(folder_path),
repo_id=self.repo_id,
path_in_repo=full_path,
token=self.config.hf_token,
ignore_patterns=["*.pyc", "__pycache__"]
)
print(f"[HF] Uploaded folder: {full_path}")
except Exception as e:
print(f"[HF] Folder upload failed: {e}")
def create_model_card(self, trainer_stats: Dict):
"""Create and upload run-specific model card."""
if not self.config.upload_to_hf:
return
# Create run card with coalescence info
run_card = f"""# Run: {self.config.run_name}
## Configuration
- **Dataset**: {self.config.dataset.upper()}
- **Parameters**: {trainer_stats['total_params']:,}
- **Coalescence Loss**: {'Enabled' if self.config.use_coalescence_loss else 'Disabled'}
- **LR Boost**: {self.config.restart_lr_mult}x
## Performance
- **Best Validation Accuracy**: {trainer_stats['best_acc']:.2f}%
- **Training Time**: {trainer_stats['training_time']:.1f} hours
---
Built with geometric shatter-reconstruct training.
**Training completed**: {time.strftime("%Y-%m-%d %H:%M:%S")}
"""
readme_path = self.config.output_dir / "RUN_README.md"
with open(readme_path, 'w') as f:
f.write(run_card)
try:
upload_file(
path_or_fileobj=str(readme_path),
path_in_repo=f"{self.run_prefix}/README.md",
repo_id=self.repo_id,
token=self.config.hf_token
)
print(f"[HF] Uploaded run README")
except Exception as e:
print(f"[HF] Run README upload failed: {e}")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Trainer with Geometric Coalescence Loss
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class Trainer:
"""Training manager with AdamW + Warm Restarts + Coalescence Loss."""
def __init__(self, config: CantorTrainingConfig):
self.config = config
self.device = torch.device(config.device)
# Set seed
torch.manual_seed(config.seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(config.seed)
# Model
print("\n" + "=" * 70)
print(f"Initializing Cantor Classifier - {config.dataset.upper()}")
print("=" * 70)
init_start = time.time()
self.model = CantorClassifier(config).to(self.device)
init_time = time.time() - init_start
print(f"\n[Model] Initialization time: {init_time:.2f}s")
self.print_model_info()
# Track restart epochs
self.restart_epochs = self._calculate_restart_epochs()
# Optimizer
self.optimizer = self.create_optimizer()
# Scheduler
self.scheduler = self.create_scheduler()
# Loss
self.criterion = nn.CrossEntropyLoss(label_smoothing=config.label_smoothing)
# Geometric Coalescence Loss
if config.use_coalescence_loss:
print(f"\n[Coalescence Loss] Initializing...")
self.coalescence_loss_fn = GeometricCoalescenceLoss(
embed_dim=config.embed_dim,
num_anchors=config.coalescence_num_anchors,
k_simplex=config.k_simplex,
target_variance=config.coalescence_target_variance,
num_simplex_samples=config.coalescence_num_simplex_samples,
num_distance_pairs=config.coalescence_num_distance_pairs,
base_weight=config.coalescence_base_weight,
max_weight=config.coalescence_max_weight,
weight_power=config.coalescence_weight_power,
consciousness_weight=config.coalescence_consciousness_weight,
distance_weight=config.coalescence_distance_weight,
volume_weight=config.coalescence_volume_weight
).to(self.device)
print(f"[Coalescence] Ξ»={config.lambda_coalescence}")
print(f"[Coalescence] Adaptive weight: {config.coalescence_base_weight} β†’ {config.coalescence_max_weight}")
print(f"[Coalescence] Components: anchor={config.coalescence_consciousness_weight:.1f}, "
f"dist={config.coalescence_distance_weight:.1f}, vol={config.coalescence_volume_weight:.1f}")
else:
self.coalescence_loss_fn = None
print(f"\n[Coalescence Loss] Disabled")
# Mixing info
self.use_mixing = config.use_mixing
self.mixing_type = config.mixing_type
self.mixing_prob = config.mixing_prob
# Mixed precision
self.use_amp = config.use_mixed_precision and config.device == "cuda"
self.scaler = GradScaler() if self.use_amp else None
if self.use_amp:
print(f"[Training] Mixed precision enabled")
# TensorBoard
self.writer = SummaryWriter(log_dir=str(config.tensorboard_dir))
print(f"[TensorBoard] Logging to: {config.tensorboard_dir}")
# HuggingFace
self.hf_uploader = HuggingFaceUploader(config) if config.upload_to_hf else None
# Save config
config.save(config.output_dir / "config.yaml")
# Metrics
self.best_acc = 0.0
self.global_step = 0
self.start_time = time.time()
self.upload_count = 0
def apply_mixing(self, images: torch.Tensor, labels: torch.Tensor):
"""Apply mixing augmentation if enabled."""
if not self.use_mixing or torch.rand(1).item() > self.mixing_prob:
return images, labels, None
if self.mixing_type == "alphamix":
mixed_images, y_a, y_b, alpha = alphamix_data(
images, labels,
alpha_range=self.config.mixing_alpha_range,
spatial_ratio=self.config.mixing_spatial_ratio
)
elif self.mixing_type == "fractal":
mixed_images, y_a, y_b, alpha = alphamix_fractal(
images, labels,
alpha_range=self.config.mixing_alpha_range,
steps_range=self.config.fractal_steps_range,
triad_scales=self.config.fractal_triad_scales
)
else:
raise ValueError(f"Unknown mixing type: {self.mixing_type}")
return mixed_images, (y_a, y_b, alpha), alpha
def compute_mixed_loss(self, logits: torch.Tensor, mixed_labels):
"""Compute loss for mixed labels."""
if mixed_labels is None:
return None
y_a, y_b, alpha = mixed_labels
loss_a = self.criterion(logits, y_a)
loss_b = self.criterion(logits, y_b)
loss = alpha * loss_a + (1 - alpha) * loss_b
return loss
def _calculate_restart_epochs(self) -> List[int]:
"""Calculate when restarts will occur."""
if self.config.scheduler_type != "cosine_restarts":
return []
restarts = []
current = self.config.restart_period
period = self.config.restart_period
while current < self.config.num_epochs:
restarts.append(current)
period *= self.config.restart_mult
current += period
return restarts
def create_optimizer(self):
"""Create optimizer based on config."""
if self.config.optimizer_type == "adamw":
print(f"\n[Optimizer] AdamW")
print(f" LR: {self.config.learning_rate}")
print(f" Betas: {self.config.adamw_betas}")
print(f" Weight decay: {self.config.weight_decay}")
return torch.optim.AdamW(
self.model.parameters(),
lr=self.config.learning_rate,
betas=self.config.adamw_betas,
eps=self.config.adamw_eps,
weight_decay=self.config.weight_decay
)
else:
raise ValueError(f"Unknown optimizer: {self.config.optimizer_type}")
def create_scheduler(self):
"""Create LR scheduler based on config."""
if self.config.scheduler_type == "cosine_restarts":
print(f"\n[Scheduler] CosineAnnealingWarmRestarts with LR Boost")
print(f" T_0: {self.config.restart_period} epochs")
print(f" T_mult: {self.config.restart_mult}x")
print(f" Restart LR mult: {self.config.restart_lr_mult}x {'πŸš€' if self.config.restart_lr_mult > 1.0 else ''}")
print(f" Min LR: {self.config.min_lr}")
if self.config.restart_lr_mult > 1.0:
print(f"\n πŸš€ BOOST MODE ENABLED!")
print(f" Creates wider exploration curves to escape local minima")
print(f" Coalescence loss provides geometric scaffolding during thrashing")
return CosineAnnealingWarmRestartsWithBoost(
self.optimizer,
T_0=self.config.restart_period,
T_mult=self.config.restart_mult,
eta_min=self.config.min_lr,
restart_lr_mult=self.config.restart_lr_mult
)
else:
raise ValueError(f"Unknown scheduler: {self.config.scheduler_type}")
def print_model_info(self):
"""Print model info."""
total_params = sum(p.numel() for p in self.model.parameters())
print(f"\nParameters: {total_params:,}")
print(f"Dataset: {self.config.dataset.upper()}")
print(f"Fusion mode: {self.config.fusion_mode}")
print(f"Optimizer: {self.config.optimizer_type.upper()}")
print(f"Scheduler: {self.config.scheduler_type}")
if self.config.restart_lr_mult > 1.0:
print(f"LR Boost: {self.config.restart_lr_mult}x at restarts πŸš€")
if self.config.use_coalescence_loss:
print(f"Coalescence Loss: Ξ»={self.config.lambda_coalescence} βœ“")
print(f"Output: {self.config.output_dir}")
def train_epoch(self, train_loader: DataLoader, epoch: int) -> Tuple[float, float]:
"""Train one epoch with coalescence loss."""
self.model.train()
total_loss, total_task_loss, total_coal_loss = 0.0, 0.0, 0.0
correct, total = 0, 0
mixing_applied_count = 0
total_batches = 0
# Check if this is a restart epoch
is_restart = (epoch in self.restart_epochs)
epoch_desc = f"Epoch {epoch+1}/{self.config.num_epochs}"
if is_restart:
restart_num = self.restart_epochs.index(epoch) + 1
boost_mult = self.config.restart_lr_mult ** restart_num if self.config.restart_lr_mult > 1.0 else 1.0
epoch_desc += f" πŸ”„ RESTART #{restart_num}"
if self.config.restart_lr_mult > 1.0:
epoch_desc += f" ({boost_mult:.2f}x)"
pbar = tqdm(train_loader, desc=f"{epoch_desc} [Train]")
for batch_idx, (images, labels) in enumerate(pbar):
images, labels = images.to(self.device, non_blocking=True), labels.to(self.device, non_blocking=True)
# Apply mixing augmentation
original_labels = labels
mixed_images, mixed_labels_info, mixing_alpha = self.apply_mixing(images, labels)
if mixing_alpha is not None:
mixing_applied_count += 1
images = mixed_images
total_batches += 1
# Forward WITH fusion info for coalescence loss
return_fusion = (self.coalescence_loss_fn is not None)
if self.use_amp:
with autocast():
if return_fusion:
logits, fusion_infos = self.model(images, return_fusion_info=True)
else:
logits = self.model(images)
# Task loss
if mixing_alpha is not None:
task_loss = self.compute_mixed_loss(logits, mixed_labels_info)
else:
task_loss = self.criterion(logits, labels)
# Add coalescence loss
coal_loss = torch.tensor(0.0, device=self.device)
coal_metrics = {} # Initialize empty dict
if self.coalescence_loss_fn and fusion_infos:
coal_loss, coal_metrics = add_coalescence_loss_to_training(
fusion_infos[-1], # Last layer
self.coalescence_loss_fn,
current_lr=self.scheduler.get_last_lr()[0],
baseline_lr=self.config.learning_rate,
lambda_coal=self.config.lambda_coalescence
)
# Log coalescence metrics (FIXED: check if dict not empty)
if batch_idx % self.config.log_interval == 0 and coal_metrics:
self.writer.add_scalar('train/coalescence_loss', coal_loss.item(), self.global_step)
self.writer.add_scalar('train/coalescence_weight', coal_metrics['adaptive_weight'], self.global_step)
self.writer.add_scalar('train/anchor_loss', coal_metrics['anchor_loss'], self.global_step)
self.writer.add_scalar('train/distance_loss', coal_metrics['distance_loss'], self.global_step)
self.writer.add_scalar('train/volume_loss', coal_metrics['volume_loss'], self.global_step)
# Total loss
loss = task_loss + coal_loss
self.optimizer.zero_grad(set_to_none=True)
self.scaler.scale(loss).backward()
self.scaler.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_clip)
self.scaler.step(self.optimizer)
self.scaler.update()
else:
if return_fusion:
logits, fusion_infos = self.model(images, return_fusion_info=True)
else:
logits = self.model(images)
# Task loss
if mixing_alpha is not None:
task_loss = self.compute_mixed_loss(logits, mixed_labels_info)
else:
task_loss = self.criterion(logits, labels)
# Add coalescence loss
coal_loss = torch.tensor(0.0, device=self.device)
coal_metrics = {} # Initialize empty dict
if self.coalescence_loss_fn and fusion_infos:
coal_loss, coal_metrics = add_coalescence_loss_to_training(
fusion_infos[-1],
self.coalescence_loss_fn,
current_lr=self.scheduler.get_last_lr()[0],
baseline_lr=self.config.learning_rate,
lambda_coal=self.config.lambda_coalescence
)
# Log coalescence metrics (FIXED: check if dict not empty)
if batch_idx % self.config.log_interval == 0 and coal_metrics:
self.writer.add_scalar('train/coalescence_loss', coal_loss.item(), self.global_step)
self.writer.add_scalar('train/coalescence_weight', coal_metrics['adaptive_weight'], self.global_step)
self.writer.add_scalar('train/anchor_loss', coal_metrics['anchor_loss'], self.global_step)
self.writer.add_scalar('train/distance_loss', coal_metrics['distance_loss'], self.global_step)
self.writer.add_scalar('train/volume_loss', coal_metrics['volume_loss'], self.global_step)
# Total loss
loss = task_loss + coal_loss
self.optimizer.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_clip)
self.optimizer.step()
# Metrics
total_loss += loss.item()
total_task_loss += task_loss.item()
total_coal_loss += coal_loss.item()
_, predicted = logits.max(1)
correct += predicted.eq(original_labels).sum().item()
total += original_labels.size(0)
# TensorBoard logging
if batch_idx % self.config.log_interval == 0:
current_lr = self.scheduler.get_last_lr()[0]
self.writer.add_scalar('train/total_loss', loss.item(), self.global_step)
self.writer.add_scalar('train/task_loss', task_loss.item(), self.global_step)
self.writer.add_scalar('train/accuracy', 100. * correct / total, self.global_step)
self.writer.add_scalar('train/learning_rate', current_lr, self.global_step)
if mixing_alpha is not None:
self.writer.add_scalar('train/mixing_alpha', mixing_alpha, self.global_step)
self.global_step += 1
# Progress bar postfix
postfix_dict = {
'loss': f'{loss.item():.4f}',
'task': f'{task_loss.item():.4f}',
'acc': f'{100. * correct / total:.2f}%',
'lr': f'{self.scheduler.get_last_lr()[0]:.6f}'
}
if self.coalescence_loss_fn and coal_loss.item() > 0:
postfix_dict['coal'] = f'{coal_loss.item():.4f}'
if self.use_mixing:
mix_pct = 100.0 * mixing_applied_count / total_batches
postfix_dict['mix'] = f'{mix_pct:.0f}%'
pbar.set_postfix(postfix_dict)
return total_loss / len(train_loader), 100. * correct / total
@torch.no_grad()
def evaluate(self, val_loader: DataLoader, epoch: int) -> Tuple[float, Dict]:
"""Evaluate."""
self.model.eval()
total_loss, correct, total = 0.0, 0, 0
consciousness_values = []
pbar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{self.config.num_epochs} [Val] ")
for batch_idx, (images, labels) in enumerate(pbar):
images, labels = images.to(self.device, non_blocking=True), labels.to(self.device, non_blocking=True)
# Forward with fusion info on last batch
return_info = (batch_idx == len(val_loader) - 1)
if self.use_amp:
with autocast():
if return_info:
logits, fusion_infos = self.model(images, return_fusion_info=True)
if fusion_infos and fusion_infos[0].get('consciousness') is not None:
consciousness_values.append(fusion_infos[0]['consciousness'].mean().item())
else:
logits = self.model(images)
loss = self.criterion(logits, labels)
else:
if return_info:
logits, fusion_infos = self.model(images, return_fusion_info=True)
if fusion_infos and fusion_infos[0].get('consciousness') is not None:
consciousness_values.append(fusion_infos[0]['consciousness'].mean().item())
else:
logits = self.model(images)
loss = self.criterion(logits, labels)
total_loss += loss.item()
_, predicted = logits.max(1)
correct += predicted.eq(labels).sum().item()
total += labels.size(0)
pbar.set_postfix({
'loss': f'{total_loss / (batch_idx + 1):.4f}',
'acc': f'{100. * correct / total:.2f}%'
})
avg_loss = total_loss / len(val_loader)
accuracy = 100. * correct / total
# TensorBoard logging
self.writer.add_scalar('val/loss', avg_loss, epoch)
self.writer.add_scalar('val/accuracy', accuracy, epoch)
if consciousness_values:
self.writer.add_scalar('val/consciousness', sum(consciousness_values) / len(consciousness_values), epoch)
metrics = {
'loss': avg_loss,
'accuracy': accuracy,
'consciousness': sum(consciousness_values) / len(consciousness_values) if consciousness_values else None
}
return accuracy, metrics
def train(self, train_loader: DataLoader, val_loader: DataLoader):
"""Full training loop."""
print("\n" + "=" * 70)
print("Starting training with Geometric Coalescence Loss")
if self.config.restart_lr_mult > 1.0:
print("πŸš€ LR Boost Mode + Geometric Scaffolding")
print("=" * 70 + "\n")
for epoch in range(self.config.num_epochs):
# Train
train_loss, train_acc = self.train_epoch(train_loader, epoch)
# Evaluate
val_acc, val_metrics = self.evaluate(val_loader, epoch)
# Update scheduler
self.scheduler.step()
# Check restart status
is_restart = (epoch in self.restart_epochs)
next_is_restart = ((epoch + 1) in self.restart_epochs)
next_lr = self.scheduler.get_last_lr()[0]
# Print summary
print(f"\n{'='*70}")
print(f"Epoch [{epoch + 1}/{self.config.num_epochs}] Summary:")
print(f" Train: Loss={train_loss:.4f}, Acc={train_acc:.2f}%")
print(f" Val: Loss={val_metrics['loss']:.4f}, Acc={val_acc:.2f}%")
if next_is_restart and self.config.restart_lr_mult > 1.0:
print(f" ⚠️ RESTART COMING! Coalescence weight will increase for stabilization")
elif is_restart and self.config.restart_lr_mult > 1.0:
print(f" πŸ”„ WARM RESTART! Geometric scaffolding active")
print(f" Current LR: {next_lr:.6f}")
# Checkpoint
is_best = val_acc > self.best_acc
should_upload = ((epoch + 1) % self.config.checkpoint_upload_interval == 0)
if is_best:
self.best_acc = val_acc
print(f" βœ“ New best model! Accuracy: {val_acc:.2f}%")
self.save_checkpoint(epoch, val_acc, prefix="best", upload=should_upload)
print(f"{'='*70}\n")
# Training complete
training_time = (time.time() - self.start_time) / 3600
print("\n" + "=" * 70)
print("Training Complete!")
print(f"Best Validation Accuracy: {self.best_acc:.2f}%")
print(f"Training Time: {training_time:.2f} hours")
if self.config.restart_lr_mult > 1.0 and self.config.use_coalescence_loss:
print("πŸš€ Shatter-reconstruct training successful!")
print("=" * 70)
# Upload to HuggingFace
if self.hf_uploader:
trainer_stats = {
'total_params': sum(p.numel() for p in self.model.parameters()),
'best_acc': self.best_acc,
'training_time': training_time,
'final_epoch': self.config.num_epochs,
'batch_size': self.config.batch_size,
'mixed_precision': self.use_amp
}
self.hf_uploader.create_model_card(trainer_stats)
self.writer.close()
def save_checkpoint(self, epoch: int, accuracy: float, prefix: str = "checkpoint", upload: bool = False):
"""Save checkpoint."""
checkpoint_dir = self.config.checkpoint_dir
checkpoint_dir.mkdir(parents=True, exist_ok=True)
# Save model weights
model_path = checkpoint_dir / f"{prefix}_model.safetensors"
save_file(self.model.state_dict(), str(model_path))
# Save training state
training_state = {
'optimizer_state_dict': self.optimizer.state_dict(),
'scheduler_state_dict': self.scheduler.state_dict(),
}
if self.scaler is not None:
training_state['scaler_state_dict'] = self.scaler.state_dict()
if self.coalescence_loss_fn is not None:
training_state['coalescence_anchors'] = self.coalescence_loss_fn.anchors.data
training_state_path = checkpoint_dir / f"{prefix}_training_state.pt"
torch.save(training_state, training_state_path)
# Save metadata
metadata = {
'epoch': epoch,
'accuracy': accuracy,
'best_accuracy': self.best_acc,
'global_step': self.global_step,
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"),
'coalescence_enabled': self.config.use_coalescence_loss,
'restart_lr_mult': self.config.restart_lr_mult
}
metadata_path = checkpoint_dir / f"{prefix}_metadata.json"
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
print(f" πŸ’Ύ Saved: {prefix}_model.safetensors")
# Upload
if self.hf_uploader and upload:
self.hf_uploader.upload_file(model_path, f"checkpoints/{prefix}_model.safetensors")
self.upload_count += 1
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Data Loading
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class Cutout:
"""Cutout data augmentation."""
def __init__(self, length: int):
self.length = length
def __call__(self, img):
h, w = img.size(1), img.size(2)
mask = torch.ones((h, w), dtype=torch.float32)
y = torch.randint(h, (1,)).item()
x = torch.randint(w, (1,)).item()
y1 = max(0, y - self.length // 2)
y2 = min(h, y + self.length // 2)
x1 = max(0, x - self.length // 2)
x2 = min(w, x + self.length // 2)
mask[y1:y2, x1:x2] = 0.
mask = mask.expand_as(img)
return img * mask
def get_data_loaders(config: CantorTrainingConfig) -> Tuple[DataLoader, DataLoader]:
"""Create data loaders."""
mean = (0.4914, 0.4822, 0.4465)
std = (0.2470, 0.2435, 0.2616)
if config.use_augmentation:
transforms_list = []
if config.use_autoaugment:
transforms_list.append(transforms.AutoAugment(transforms.AutoAugmentPolicy.CIFAR10))
else:
transforms_list.extend([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
])
transforms_list.append(transforms.ToTensor())
transforms_list.append(transforms.Normalize(mean, std))
if config.use_cutout:
transforms_list.append(Cutout(config.cutout_length))
train_transform = transforms.Compose(transforms_list)
else:
train_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean, std)
])
val_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean, std)
])
if config.dataset == "cifar10":
train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=train_transform)
val_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=val_transform)
elif config.dataset == "cifar100":
train_dataset = datasets.CIFAR100(root='./data', train=True, download=True, transform=train_transform)
val_dataset = datasets.CIFAR100(root='./data', train=False, download=True, transform=val_transform)
else:
raise ValueError(f"Unknown dataset: {config.dataset}")
train_loader = DataLoader(
train_dataset,
batch_size=config.batch_size,
shuffle=True,
num_workers=config.num_workers,
pin_memory=(config.device == "cuda")
)
val_loader = DataLoader(
val_dataset,
batch_size=config.batch_size,
shuffle=False,
num_workers=config.num_workers,
pin_memory=(config.device == "cuda")
)
return train_loader, val_loader
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Main
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def main():
"""Main training function with Geometric Coalescence Loss."""
config = CantorTrainingConfig(
# Dataset
dataset="cifar100",
# Architecture
embed_dim=486,
num_fusion_blocks=9,
num_heads=9,
fusion_mode="learned",
k_simplex=8,
use_beatrix=False,
fusion_window=27,
# Optimizer: AdamW
optimizer_type="adamw",
learning_rate=3e-4,
weight_decay=0.05,
adamw_betas=(0.9, 0.999),
# Scheduler: Warm Restarts + LR BOOST
scheduler_type="cosine_restarts",
restart_period=12,
restart_mult=1.5,
restart_lr_mult=1.15, # πŸš€ Aggressive exploration
min_lr=1e-7,
# Training
num_epochs=300,
batch_size=512,
grad_clip=1.0,
label_smoothing=0.15,
# Augmentation
use_augmentation=True,
use_autoaugment=True,
use_cutout=True,
cutout_length=16,
# Mixing
use_mixing=True,
mixing_type="alphamix",
mixing_alpha_range=(0.3, 0.7),
mixing_spatial_ratio=0.25,
mixing_prob=0.5,
# Geometric Coalescence Loss
use_coalescence_loss=True,
lambda_coalescence=0.5,
coalescence_num_anchors=64,
coalescence_target_variance=0.5,
coalescence_base_weight=0.1,
coalescence_max_weight=0.8,
coalescence_weight_power=2.0,
# Regularization
dropout=0.1,
drop_path_rate=0.15,
# System
device="cuda",
use_mixed_precision=False,
# HuggingFace
hf_username="AbstractPhil",
upload_to_hf=True,
checkpoint_upload_interval=25,
)
print("=" * 70)
print(f"Cantor Fusion Classifier - {config.dataset.upper()}")
print("Shatter-Reconstruct Training")
print("=" * 70)
print(f"\nπŸš€ LR Boost: {config.restart_lr_mult}x at restarts")
print(f"🧬 Coalescence Loss: λ={config.lambda_coalescence}")
print(f" Adaptive weight: {config.coalescence_base_weight} β†’ {config.coalescence_max_weight}")
print(f" Philosophy: Geometric truth survives when patterns shatter")
print("=" * 70)
# Load data
print("\nLoading data...")
train_loader, val_loader = get_data_loaders(config)
print(f" Train: {len(train_loader.dataset)} samples")
print(f" Val: {len(val_loader.dataset)} samples")
# Train
trainer = Trainer(config)
trainer.train(train_loader, val_loader)
print("\n" + "=" * 70)
print("🎯 Shatter-reconstruct training complete!")
print(f" tensorboard --logdir {config.tensorboard_dir}")
print("=" * 70)
if __name__ == "__main__":
main()