iris-image-gen / iris /blocks.py
asdf98's picture
Fix conv2d bf16 crash on T4: iris/blocks.py
dd9c2aa verified
"""
Core building blocks for IRIS: attention, FFN, cross-attention, embeddings.
Design principles:
- MQA (Multi-Query Attention) everywhere — shared K,V across heads
- UIB-FFN (Universal Inverted Bottleneck) — depthwise separable, expansion=2
- QK-RMSNorm for training stability (from SANA-Sprint)
- 2D RoPE for spatial position encoding
- Timestep addition (not AdaLN) — saves params (from HTH)
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from typing import Optional
class RMSNorm(nn.Module):
def __init__(self, dim: int, eps: float = 1e-6):
super().__init__()
self.eps = eps
self.weight = nn.Parameter(torch.ones(dim))
def forward(self, x):
rms = torch.sqrt(x.float().pow(2).mean(-1, keepdim=True) + self.eps)
return (x.float() / rms * self.weight.float()).to(x.dtype)
class RotaryEmbedding2D(nn.Module):
def __init__(self, dim: int, max_size: int = 64):
super().__init__()
self.dim = dim
half_dim = dim // 4
inv_freq = 1.0 / (10000.0 ** (torch.arange(0, half_dim, dtype=torch.float32) / half_dim))
self.register_buffer("inv_freq", inv_freq, persistent=False)
def _build_cache(self, H, W, device, dtype):
h_pos = torch.arange(H, device=device, dtype=torch.float32)
w_pos = torch.arange(W, device=device, dtype=torch.float32)
inv = self.inv_freq.to(device)
h_freqs = torch.outer(h_pos, inv)[:, None, :].expand(H, W, -1)
w_freqs = torch.outer(w_pos, inv)[None, :, :].expand(H, W, -1)
freqs = torch.cat([h_freqs, w_freqs], dim=-1).reshape(H * W, -1)
return freqs.cos().to(dtype), freqs.sin().to(dtype)
def forward(self, x, H, W):
N = H * W
cos_c, sin_c = self._build_cache(H, W, x.device, x.dtype)
if x.dim() == 4:
cos_c = cos_c[None, None, :N, :]
sin_c = sin_c[None, None, :N, :]
else:
cos_c = cos_c[None, :N, :]
sin_c = sin_c[None, :N, :]
d = cos_c.shape[-1]
x1, x2, xr = x[..., :d], x[..., d:2*d], x[..., 2*d:]
return torch.cat([x1*cos_c - x2*sin_c, x1*sin_c + x2*cos_c, xr], dim=-1)
class MultiQueryCrossAttention(nn.Module):
def __init__(self, dim, num_heads=4, qk_norm=True):
super().__init__()
assert dim % num_heads == 0
self.num_heads = num_heads
self.head_dim = dim // num_heads
self.q_proj = nn.Linear(dim, dim, bias=False)
self.k_proj = nn.Linear(dim, self.head_dim, bias=False)
self.v_proj = nn.Linear(dim, self.head_dim, bias=False)
self.out_proj = nn.Linear(dim, dim, bias=False)
self.q_norm = RMSNorm(self.head_dim) if qk_norm else nn.Identity()
self.k_norm = RMSNorm(self.head_dim) if qk_norm else nn.Identity()
self.norm = nn.LayerNorm(dim)
def forward(self, x, context):
B, N, D = x.shape
S = context.shape[1]
residual = x
x = self.norm(x)
q = self.q_proj(x).view(B, N, self.num_heads, self.head_dim).transpose(1, 2)
k = self.k_proj(context).view(B, S, 1, self.head_dim).transpose(1, 2)
v = self.v_proj(context).view(B, S, 1, self.head_dim).transpose(1, 2)
q, k = self.q_norm(q), self.k_norm(k)
k = k.expand(-1, self.num_heads, -1, -1)
v = v.expand(-1, self.num_heads, -1, -1)
attn = F.scaled_dot_product_attention(q, k, v, scale=1.0/math.sqrt(self.head_dim))
return residual + self.out_proj(attn.transpose(1, 2).reshape(B, N, D))
class MultiQuerySelfAttention(nn.Module):
def __init__(self, dim, num_heads=4, qk_norm=True):
super().__init__()
assert dim % num_heads == 0
self.num_heads = num_heads
self.head_dim = dim // num_heads
self.q_proj = nn.Linear(dim, dim, bias=False)
self.k_proj = nn.Linear(dim, self.head_dim, bias=False)
self.v_proj = nn.Linear(dim, self.head_dim, bias=False)
self.out_proj = nn.Linear(dim, dim, bias=False)
self.q_norm = RMSNorm(self.head_dim) if qk_norm else nn.Identity()
self.k_norm = RMSNorm(self.head_dim) if qk_norm else nn.Identity()
self.norm = nn.LayerNorm(dim)
self.rope = RotaryEmbedding2D(self.head_dim)
def forward(self, x, H, W):
B, N, D = x.shape
residual = x
x = self.norm(x)
q = self.q_proj(x).view(B, N, self.num_heads, self.head_dim).transpose(1, 2)
k = self.k_proj(x).view(B, N, 1, self.head_dim).transpose(1, 2)
v = self.v_proj(x).view(B, N, 1, self.head_dim).transpose(1, 2)
q, k = self.q_norm(q), self.k_norm(k)
q, k = self.rope(q, H, W), self.rope(k, H, W)
k = k.expand(-1, self.num_heads, -1, -1)
v = v.expand(-1, self.num_heads, -1, -1)
attn = F.scaled_dot_product_attention(q, k, v, scale=1.0/math.sqrt(self.head_dim))
return residual + self.out_proj(attn.transpose(1, 2).reshape(B, N, D))
class UIBFFN(nn.Module):
def __init__(self, dim, expansion=2, spatial_size=4):
super().__init__()
hidden = dim * expansion
self.norm = nn.LayerNorm(dim)
self.pw_up = nn.Linear(dim, hidden, bias=False)
self.gate = nn.Linear(dim, hidden, bias=False)
self.dw_conv = nn.Conv2d(hidden, hidden, 3, padding=1, groups=hidden, bias=True)
self.pw_down = nn.Linear(hidden, dim, bias=False)
def forward(self, x, H, W):
B, N, D = x.shape
residual = x
x = self.norm(x)
h = self.pw_up(x)
g = F.silu(self.gate(x))
h_2d = h.view(B, H, W, -1).permute(0, 3, 1, 2)
# Run depthwise conv in float32 — grouped convs lack bf16 cuDNN kernels on T4
with torch.amp.autocast(device_type='cuda', enabled=False):
h = self.dw_conv(h_2d.float()).permute(0, 2, 3, 1).reshape(B, N, -1)
h = h.to(g.dtype)
return residual + self.pw_down(h * g)
class TimestepEmbedding(nn.Module):
def __init__(self, dim, max_period=10000):
super().__init__()
self.dim = dim
self.max_period = max_period
self.mlp = nn.Sequential(nn.Linear(dim, dim*4), nn.SiLU(), nn.Linear(dim*4, dim))
def forward(self, t):
half = self.dim // 2
freqs = torch.exp(-math.log(self.max_period) * torch.arange(half, device=t.device, dtype=torch.float32) / half)
args = t[:, None].float() * freqs[None, :]
emb = torch.cat([torch.cos(args), torch.sin(args)], dim=-1)
if self.dim % 2:
emb = F.pad(emb, (0, 1))
return self.mlp(emb.to(t.dtype))
class IterationEmbedding(nn.Module):
def __init__(self, dim, max_iterations=8):
super().__init__()
self.embed = nn.Embedding(max_iterations, dim)
def forward(self, iter_idx, batch_size, device):
return self.embed(torch.full((batch_size,), iter_idx, device=device, dtype=torch.long))