| """ |
| LiquidFlow Generator — Main diffusion model. |
| |
| Combines: |
| - LiquidFlowBackbone (CfC + Mamba-2 SSD) as the noise predictor |
| - DDPM/DDIM diffusion process |
| - Physics-informed regularization |
| |
| Supports: |
| - Training on 128×128 and 512×512 images |
| - TAESD VAE (lightweight, Colab/Kaggle compatible) |
| - SD VAE (higher quality) |
| - Both DDPM and DDIM sampling |
| |
| The model is designed to be: |
| - Trainable on Google Colab free tier / Kaggle (T4 GPU, 15GB) |
| - Exportable to ONNX/CoreML for mobile deployment |
| - Pure PyTorch — no CUDA kernels needed (Mamba-2 SSD runs on CPU too) |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import math |
| import numpy as np |
| from tqdm import tqdm |
| from typing import Optional, Dict, Tuple |
|
|
| from .liquid_flow_block import LiquidFlowBackbone |
| from .physics_loss import PhysicsRegularizer, DDIMEstimator |
|
|
|
|
| def linear_beta_schedule(timesteps, beta_start=1e-4, beta_end=0.02): |
| """Linear noise schedule (DDPM).""" |
| return torch.linspace(beta_start, beta_end, timesteps) |
|
|
|
|
| def cosine_beta_schedule(timesteps, s=0.008): |
| """Cosine noise schedule (Improved DDPM).""" |
| steps = timesteps + 1 |
| x = torch.linspace(0, timesteps, steps) |
| alphas_cumprod = torch.cos(((x / timesteps) + s) / (1 + s) * math.pi * 0.5) ** 2 |
| alphas_cumprod = alphas_cumprod / alphas_cumprod[0] |
| betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1]) |
| return torch.clip(betas, 0.0001, 0.9999) |
|
|
|
|
| class LiquidFlowGenerator(nn.Module): |
| """ |
| LiquidFlow Generator: Liquid Neural Network + Mamba-2 SSD Diffusion Model. |
| |
| Uses LiquidFlowBackbone as noise predictor in a DDPM/DDIM framework. |
| |
| Architecture: |
| Noise Predictor = LiquidFlowBackbone (CfC + Mamba-2 SSD) |
| Diffusion = DDPM (forward) + DDIM (sampling) |
| Regularizer = Physics-Informed Losses (TV, spectral, conservation) |
| |
| Args: |
| in_channels: Latent channels from VAE (default 4) |
| hidden_dim: Hidden dimension in backbone |
| num_stages: Number of LiquidFlow stages |
| blocks_per_stage: Blocks per stage |
| image_size: Target image size (for latent computation) |
| beta_schedule: 'linear' or 'cosine' |
| timesteps: Number of diffusion timesteps |
| physics_weights: Weights for physics regularizers |
| """ |
| |
| def __init__( |
| self, |
| in_channels=4, |
| hidden_dim=256, |
| num_stages=4, |
| blocks_per_stage=4, |
| image_size=128, |
| beta_schedule='cosine', |
| timesteps=1000, |
| physics_weights=None, |
| ): |
| super().__init__() |
| self.in_channels = in_channels |
| self.hidden_dim = hidden_dim |
| self.image_size = image_size |
| self.timesteps = timesteps |
| |
| |
| self.backbone = LiquidFlowBackbone( |
| in_channels=in_channels, |
| hidden_dim=hidden_dim, |
| num_stages=num_stages, |
| blocks_per_stage=blocks_per_stage, |
| d_state=16, |
| expand=2, |
| dropout=0.0, |
| ) |
| |
| |
| if beta_schedule == 'linear': |
| betas = linear_beta_schedule(timesteps) |
| else: |
| betas = cosine_beta_schedule(timesteps) |
| |
| self.register_buffer('betas', betas) |
| self.register_buffer('alphas', 1.0 - betas) |
| self.register_buffer('alphas_cumprod', torch.cumprod(self.alphas, dim=0)) |
| self.register_buffer('alphas_cumprod_prev', F.pad(self.alphas_cumprod[:-1], (1, 0), value=1.0)) |
| |
| |
| self.register_buffer('sqrt_alphas_cumprod', torch.sqrt(self.alphas_cumprod)) |
| self.register_buffer('sqrt_one_minus_alphas_cumprod', torch.sqrt(1.0 - self.alphas_cumprod)) |
| |
| |
| if physics_weights is None: |
| physics_weights = {'tv': 0.01, 'cons': 0.001, 'spec': 0.01, 'grad': 0.001} |
| self.physics = PhysicsRegularizer(**physics_weights) |
| self.ddim_estimator = DDIMEstimator() |
| |
| def q_sample(self, x0, t, noise=None): |
| """ |
| Forward diffusion: q(x_t | x_0). |
| |
| x_t = √(ᾱ_t) * x_0 + √(1 - ᾱ_t) * ε |
| """ |
| if noise is None: |
| noise = torch.randn_like(x0) |
| |
| sqrt_alpha_bar = self.sqrt_alphas_cumprod[t].reshape(-1, 1, 1, 1) |
| sqrt_one_minus_alpha_bar = self.sqrt_one_minus_alphas_cumprod[t].reshape(-1, 1, 1, 1) |
| |
| return sqrt_alpha_bar * x0 + sqrt_one_minus_alpha_bar * noise, noise |
| |
| def forward(self, x, t): |
| """Predict noise from noisy input.""" |
| return self.backbone(x, t) |
| |
| def training_step(self, x0, optimizer, scaler=None, use_amp=False): |
| """ |
| Single training step with physics regularization. |
| |
| Args: |
| x0: Clean latents [B, C, H, W] |
| optimizer: Optimizer |
| scaler: Optional GradScaler for AMP |
| use_amp: Whether to use automatic mixed precision |
| |
| Returns: |
| loss_dict: Dictionary of losses |
| """ |
| B = x0.shape[0] |
| device = x0.device |
| |
| |
| t = torch.randint(0, self.timesteps, (B,), device=device) |
| |
| |
| noise = torch.randn_like(x0) |
| xt, noise = self.q_sample(x0, t, noise) |
| |
| if use_amp and scaler is not None: |
| with torch.cuda.amp.autocast(): |
| |
| noise_pred = self.forward(xt, t) |
| |
| |
| diffusion_loss = F.mse_loss(noise_pred, noise) |
| |
| |
| x0_hat = self.ddim_estimator.estimate_x0( |
| xt, noise_pred, self.alphas_cumprod[t] |
| ) |
| phys_loss, phys_dict = self.physics(x0_hat, x0) |
| |
| total_loss = diffusion_loss + phys_loss |
| else: |
| noise_pred = self.forward(xt, t) |
| diffusion_loss = F.mse_loss(noise_pred, noise) |
| |
| x0_hat = self.ddim_estimator.estimate_x0( |
| xt, noise_pred, self.alphas_cumprod[t] |
| ) |
| phys_loss, phys_dict = self.physics(x0_hat, x0) |
| |
| total_loss = diffusion_loss + phys_loss |
| |
| |
| optimizer.zero_grad() |
| if scaler is not None: |
| scaler.scale(total_loss).backward() |
| scaler.unscale_(optimizer) |
| torch.nn.utils.clip_grad_norm_(self.parameters(), 1.0) |
| scaler.step(optimizer) |
| scaler.update() |
| else: |
| total_loss.backward() |
| torch.nn.utils.clip_grad_norm_(self.parameters(), 1.0) |
| optimizer.step() |
| |
| return { |
| 'total': total_loss.item(), |
| 'diffusion': diffusion_loss.item(), |
| 'physics': phys_loss.item(), |
| **{f'phys_{k}': v.item() for k, v in phys_dict.items()}, |
| } |
| |
| @torch.no_grad() |
| def sample(self, batch_size=4, steps=50, ddim=True, eta=0.0, progress=True): |
| """ |
| Generate images using DDPM or DDIM sampling. |
| |
| Args: |
| batch_size: Number of images |
| steps: Sampling steps (for DDIM: can be << timesteps) |
| ddim: Use DDIM sampling (faster) |
| eta: DDIM stochasticity (0 = deterministic) |
| progress: Show progress bar |
| |
| Returns: |
| Generated latents [B, C, H, W] |
| """ |
| device = next(self.parameters()).device |
| latent_size = self.image_size // 8 |
| |
| |
| x = torch.randn(batch_size, self.in_channels, latent_size, latent_size, device=device) |
| |
| if ddim: |
| return self._ddim_sample(x, steps, eta, progress) |
| else: |
| return self._ddpm_sample(x, progress) |
| |
| @torch.no_grad() |
| def _ddpm_sample(self, x, progress=True): |
| """DDPM sampling (full 1000 steps).""" |
| device = x.device |
| |
| iterator = tqdm( |
| reversed(range(0, self.timesteps)), |
| desc='DDPM Sampling', |
| total=self.timesteps, |
| disable=not progress, |
| ) |
| |
| for t_idx in iterator: |
| t = torch.full((x.shape[0],), t_idx, device=device, dtype=torch.long) |
| |
| noise_pred = self.forward(x, t) |
| |
| alpha = self.alphas[t_idx] |
| alpha_bar = self.alphas_cumprod[t_idx] |
| alpha_bar_prev = self.alphas_cumprod_prev[t_idx] |
| beta = self.betas[t_idx] |
| |
| if t_idx > 0: |
| noise = torch.randn_like(x) |
| else: |
| noise = 0 |
| |
| |
| x = (1 / torch.sqrt(alpha)) * ( |
| x - (beta / torch.sqrt(1 - alpha_bar)) * noise_pred |
| ) + torch.sqrt(beta) * noise |
| |
| return x |
| |
| @torch.no_grad() |
| def _ddim_sample(self, x, steps=50, eta=0.0, progress=True): |
| """ |
| DDIM sampling with fewer steps. |
| |
| DDIM can produce good samples in 20-50 steps |
| instead of 1000 DDPM steps. |
| """ |
| device = x.device |
| |
| |
| skip = self.timesteps // steps |
| seq = list(range(0, self.timesteps, skip)) |
| seq_next = [-1] + seq[:-1] |
| |
| iterator = tqdm( |
| zip(reversed(seq), reversed(seq_next)), |
| desc='DDIM Sampling', |
| total=len(seq), |
| disable=not progress, |
| ) |
| |
| for i, j in iterator: |
| t = torch.full((x.shape[0],), i, device=device, dtype=torch.long) |
| |
| noise_pred = self.forward(x, t) |
| |
| alpha_bar_i = self.alphas_cumprod[i] |
| alpha_bar_j = self.alphas_cumprod[j] if j >= 0 else torch.tensor(1.0, device=device) |
| |
| |
| x0_pred = (x - torch.sqrt(1 - alpha_bar_i) * noise_pred) / torch.sqrt(alpha_bar_i) |
| x0_pred = torch.clamp(x0_pred, -1, 1) |
| |
| |
| dir_xt = torch.sqrt(1 - alpha_bar_j - eta * eta * ( |
| (1 - alpha_bar_j) / (1 - alpha_bar_i) |
| )) * noise_pred |
| |
| |
| if eta > 0: |
| noise = torch.randn_like(x) |
| sigma = eta * torch.sqrt((1 - alpha_bar_j) / (1 - alpha_bar_i) * (1 - alpha_bar_i / alpha_bar_j)) |
| x = torch.sqrt(alpha_bar_j) * x0_pred + dir_xt + sigma * noise |
| else: |
| noise = 0 |
| x = torch.sqrt(alpha_bar_j) * x0_pred + dir_xt |
| |
| return x |
| |
| def count_parameters(self): |
| """Count trainable parameters.""" |
| return sum(p.numel() for p in self.parameters() if p.requires_grad) |
|
|
|
|
| def create_liquidflow( |
| variant='small', |
| image_size=128, |
| **kwargs, |
| ): |
| """ |
| Create a LiquidFlow model with preset configurations. |
| |
| Variants: |
| - 'tiny': ~2M params, 2 stages, 2 blocks each, hidden_dim=128 |
| - 'small': ~8M params, 4 stages, 4 blocks each, hidden_dim=256 |
| - 'base': ~30M params, 6 stages, 6 blocks each, hidden_dim=384 |
| |
| All designed to run on T4 (15GB) with batch_size >= 16 at 128×128. |
| """ |
| configs = { |
| 'tiny': { |
| 'hidden_dim': 128, |
| 'num_stages': 2, |
| 'blocks_per_stage': 2, |
| }, |
| 'small': { |
| 'hidden_dim': 256, |
| 'num_stages': 4, |
| 'blocks_per_stage': 4, |
| }, |
| 'base': { |
| 'hidden_dim': 384, |
| 'num_stages': 6, |
| 'blocks_per_stage': 6, |
| }, |
| } |
| |
| config = configs.get(variant, configs['small']) |
| config.update(kwargs) |
| |
| model = LiquidFlowGenerator( |
| in_channels=4, |
| image_size=image_size, |
| **config, |
| ) |
| |
| return model |
|
|