LiquidFlow-Gen / liquid_flow /generator.py
krystv's picture
Upload liquid_flow/generator.py
f8a7028 verified
"""
LiquidFlow Generator — Main diffusion model.
Combines:
- LiquidFlowBackbone (CfC + Mamba-2 SSD) as the noise predictor
- DDPM/DDIM diffusion process
- Physics-informed regularization
Supports:
- Training on 128×128 and 512×512 images
- TAESD VAE (lightweight, Colab/Kaggle compatible)
- SD VAE (higher quality)
- Both DDPM and DDIM sampling
The model is designed to be:
- Trainable on Google Colab free tier / Kaggle (T4 GPU, 15GB)
- Exportable to ONNX/CoreML for mobile deployment
- Pure PyTorch — no CUDA kernels needed (Mamba-2 SSD runs on CPU too)
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import numpy as np
from tqdm import tqdm
from typing import Optional, Dict, Tuple
from .liquid_flow_block import LiquidFlowBackbone
from .physics_loss import PhysicsRegularizer, DDIMEstimator
def linear_beta_schedule(timesteps, beta_start=1e-4, beta_end=0.02):
"""Linear noise schedule (DDPM)."""
return torch.linspace(beta_start, beta_end, timesteps)
def cosine_beta_schedule(timesteps, s=0.008):
"""Cosine noise schedule (Improved DDPM)."""
steps = timesteps + 1
x = torch.linspace(0, timesteps, steps)
alphas_cumprod = torch.cos(((x / timesteps) + s) / (1 + s) * math.pi * 0.5) ** 2
alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
return torch.clip(betas, 0.0001, 0.9999)
class LiquidFlowGenerator(nn.Module):
"""
LiquidFlow Generator: Liquid Neural Network + Mamba-2 SSD Diffusion Model.
Uses LiquidFlowBackbone as noise predictor in a DDPM/DDIM framework.
Architecture:
Noise Predictor = LiquidFlowBackbone (CfC + Mamba-2 SSD)
Diffusion = DDPM (forward) + DDIM (sampling)
Regularizer = Physics-Informed Losses (TV, spectral, conservation)
Args:
in_channels: Latent channels from VAE (default 4)
hidden_dim: Hidden dimension in backbone
num_stages: Number of LiquidFlow stages
blocks_per_stage: Blocks per stage
image_size: Target image size (for latent computation)
beta_schedule: 'linear' or 'cosine'
timesteps: Number of diffusion timesteps
physics_weights: Weights for physics regularizers
"""
def __init__(
self,
in_channels=4,
hidden_dim=256,
num_stages=4,
blocks_per_stage=4,
image_size=128,
beta_schedule='cosine',
timesteps=1000,
physics_weights=None,
):
super().__init__()
self.in_channels = in_channels
self.hidden_dim = hidden_dim
self.image_size = image_size # Latent space size = image_size / 8
self.timesteps = timesteps
# Noise predictor (backbone)
self.backbone = LiquidFlowBackbone(
in_channels=in_channels,
hidden_dim=hidden_dim,
num_stages=num_stages,
blocks_per_stage=blocks_per_stage,
d_state=16,
expand=2,
dropout=0.0,
)
# Diffusion schedule
if beta_schedule == 'linear':
betas = linear_beta_schedule(timesteps)
else:
betas = cosine_beta_schedule(timesteps)
self.register_buffer('betas', betas)
self.register_buffer('alphas', 1.0 - betas)
self.register_buffer('alphas_cumprod', torch.cumprod(self.alphas, dim=0))
self.register_buffer('alphas_cumprod_prev', F.pad(self.alphas_cumprod[:-1], (1, 0), value=1.0))
# For DDIM sampling
self.register_buffer('sqrt_alphas_cumprod', torch.sqrt(self.alphas_cumprod))
self.register_buffer('sqrt_one_minus_alphas_cumprod', torch.sqrt(1.0 - self.alphas_cumprod))
# Physics regularizer
if physics_weights is None:
physics_weights = {'tv': 0.01, 'cons': 0.001, 'spec': 0.01, 'grad': 0.001}
self.physics = PhysicsRegularizer(**physics_weights)
self.ddim_estimator = DDIMEstimator()
def q_sample(self, x0, t, noise=None):
"""
Forward diffusion: q(x_t | x_0).
x_t = √(ᾱ_t) * x_0 + √(1 - ᾱ_t) * ε
"""
if noise is None:
noise = torch.randn_like(x0)
sqrt_alpha_bar = self.sqrt_alphas_cumprod[t].reshape(-1, 1, 1, 1)
sqrt_one_minus_alpha_bar = self.sqrt_one_minus_alphas_cumprod[t].reshape(-1, 1, 1, 1)
return sqrt_alpha_bar * x0 + sqrt_one_minus_alpha_bar * noise, noise
def forward(self, x, t):
"""Predict noise from noisy input."""
return self.backbone(x, t)
def training_step(self, x0, optimizer, scaler=None, use_amp=False):
"""
Single training step with physics regularization.
Args:
x0: Clean latents [B, C, H, W]
optimizer: Optimizer
scaler: Optional GradScaler for AMP
use_amp: Whether to use automatic mixed precision
Returns:
loss_dict: Dictionary of losses
"""
B = x0.shape[0]
device = x0.device
# Sample timesteps
t = torch.randint(0, self.timesteps, (B,), device=device)
# Forward diffusion
noise = torch.randn_like(x0)
xt, noise = self.q_sample(x0, t, noise)
if use_amp and scaler is not None:
with torch.cuda.amp.autocast():
# Predict noise
noise_pred = self.forward(xt, t)
# Base diffusion loss (L2 or L1)
diffusion_loss = F.mse_loss(noise_pred, noise)
# Physics regularization on estimated x0
x0_hat = self.ddim_estimator.estimate_x0(
xt, noise_pred, self.alphas_cumprod[t]
)
phys_loss, phys_dict = self.physics(x0_hat, x0)
total_loss = diffusion_loss + phys_loss
else:
noise_pred = self.forward(xt, t)
diffusion_loss = F.mse_loss(noise_pred, noise)
x0_hat = self.ddim_estimator.estimate_x0(
xt, noise_pred, self.alphas_cumprod[t]
)
phys_loss, phys_dict = self.physics(x0_hat, x0)
total_loss = diffusion_loss + phys_loss
# Backward
optimizer.zero_grad()
if scaler is not None:
scaler.scale(total_loss).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(self.parameters(), 1.0)
scaler.step(optimizer)
scaler.update()
else:
total_loss.backward()
torch.nn.utils.clip_grad_norm_(self.parameters(), 1.0)
optimizer.step()
return {
'total': total_loss.item(),
'diffusion': diffusion_loss.item(),
'physics': phys_loss.item(),
**{f'phys_{k}': v.item() for k, v in phys_dict.items()},
}
@torch.no_grad()
def sample(self, batch_size=4, steps=50, ddim=True, eta=0.0, progress=True):
"""
Generate images using DDPM or DDIM sampling.
Args:
batch_size: Number of images
steps: Sampling steps (for DDIM: can be << timesteps)
ddim: Use DDIM sampling (faster)
eta: DDIM stochasticity (0 = deterministic)
progress: Show progress bar
Returns:
Generated latents [B, C, H, W]
"""
device = next(self.parameters()).device
latent_size = self.image_size // 8
# Start from pure noise
x = torch.randn(batch_size, self.in_channels, latent_size, latent_size, device=device)
if ddim:
return self._ddim_sample(x, steps, eta, progress)
else:
return self._ddpm_sample(x, progress)
@torch.no_grad()
def _ddpm_sample(self, x, progress=True):
"""DDPM sampling (full 1000 steps)."""
device = x.device
iterator = tqdm(
reversed(range(0, self.timesteps)),
desc='DDPM Sampling',
total=self.timesteps,
disable=not progress,
)
for t_idx in iterator:
t = torch.full((x.shape[0],), t_idx, device=device, dtype=torch.long)
noise_pred = self.forward(x, t)
alpha = self.alphas[t_idx]
alpha_bar = self.alphas_cumprod[t_idx]
alpha_bar_prev = self.alphas_cumprod_prev[t_idx]
beta = self.betas[t_idx]
if t_idx > 0:
noise = torch.randn_like(x)
else:
noise = 0
# DDPM posterior
x = (1 / torch.sqrt(alpha)) * (
x - (beta / torch.sqrt(1 - alpha_bar)) * noise_pred
) + torch.sqrt(beta) * noise
return x
@torch.no_grad()
def _ddim_sample(self, x, steps=50, eta=0.0, progress=True):
"""
DDIM sampling with fewer steps.
DDIM can produce good samples in 20-50 steps
instead of 1000 DDPM steps.
"""
device = x.device
# Timestep spacing
skip = self.timesteps // steps
seq = list(range(0, self.timesteps, skip))
seq_next = [-1] + seq[:-1]
iterator = tqdm(
zip(reversed(seq), reversed(seq_next)),
desc='DDIM Sampling',
total=len(seq),
disable=not progress,
)
for i, j in iterator:
t = torch.full((x.shape[0],), i, device=device, dtype=torch.long)
noise_pred = self.forward(x, t)
alpha_bar_i = self.alphas_cumprod[i]
alpha_bar_j = self.alphas_cumprod[j] if j >= 0 else torch.tensor(1.0, device=device)
# Predicted x0
x0_pred = (x - torch.sqrt(1 - alpha_bar_i) * noise_pred) / torch.sqrt(alpha_bar_i)
x0_pred = torch.clamp(x0_pred, -1, 1) # Prevent outliers
# Direction pointing to x_t
dir_xt = torch.sqrt(1 - alpha_bar_j - eta * eta * (
(1 - alpha_bar_j) / (1 - alpha_bar_i)
)) * noise_pred
# Random noise
if eta > 0:
noise = torch.randn_like(x)
sigma = eta * torch.sqrt((1 - alpha_bar_j) / (1 - alpha_bar_i) * (1 - alpha_bar_i / alpha_bar_j))
x = torch.sqrt(alpha_bar_j) * x0_pred + dir_xt + sigma * noise
else:
noise = 0
x = torch.sqrt(alpha_bar_j) * x0_pred + dir_xt
return x
def count_parameters(self):
"""Count trainable parameters."""
return sum(p.numel() for p in self.parameters() if p.requires_grad)
def create_liquidflow(
variant='small',
image_size=128,
**kwargs,
):
"""
Create a LiquidFlow model with preset configurations.
Variants:
- 'tiny': ~2M params, 2 stages, 2 blocks each, hidden_dim=128
- 'small': ~8M params, 4 stages, 4 blocks each, hidden_dim=256
- 'base': ~30M params, 6 stages, 6 blocks each, hidden_dim=384
All designed to run on T4 (15GB) with batch_size >= 16 at 128×128.
"""
configs = {
'tiny': {
'hidden_dim': 128,
'num_stages': 2,
'blocks_per_stage': 2,
},
'small': {
'hidden_dim': 256,
'num_stages': 4,
'blocks_per_stage': 4,
},
'base': {
'hidden_dim': 384,
'num_stages': 6,
'blocks_per_stage': 6,
},
}
config = configs.get(variant, configs['small'])
config.update(kwargs)
model = LiquidFlowGenerator(
in_channels=4, # VAE latent channels
image_size=image_size,
**config,
)
return model