| """ |
| Core building blocks for IRIS: attention, FFN, cross-attention, embeddings. |
| |
| Design principles: |
| - MQA (Multi-Query Attention) everywhere — shared K,V across heads |
| - UIB-FFN (Universal Inverted Bottleneck) — depthwise separable, expansion=2 |
| - QK-RMSNorm for training stability (from SANA-Sprint) |
| - 2D RoPE for spatial position encoding |
| - Timestep addition (not AdaLN) — saves params (from HTH) |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import math |
| from typing import Optional |
|
|
|
|
| class RMSNorm(nn.Module): |
| def __init__(self, dim: int, eps: float = 1e-6): |
| super().__init__() |
| self.eps = eps |
| self.weight = nn.Parameter(torch.ones(dim)) |
|
|
| def forward(self, x): |
| rms = torch.sqrt(x.float().pow(2).mean(-1, keepdim=True) + self.eps) |
| return (x.float() / rms * self.weight.float()).to(x.dtype) |
|
|
|
|
| class RotaryEmbedding2D(nn.Module): |
| def __init__(self, dim: int, max_size: int = 64): |
| super().__init__() |
| self.dim = dim |
| half_dim = dim // 4 |
| inv_freq = 1.0 / (10000.0 ** (torch.arange(0, half_dim, dtype=torch.float32) / half_dim)) |
| self.register_buffer("inv_freq", inv_freq, persistent=False) |
|
|
| def _build_cache(self, H, W, device, dtype): |
| h_pos = torch.arange(H, device=device, dtype=torch.float32) |
| w_pos = torch.arange(W, device=device, dtype=torch.float32) |
| inv = self.inv_freq.to(device) |
| h_freqs = torch.outer(h_pos, inv)[:, None, :].expand(H, W, -1) |
| w_freqs = torch.outer(w_pos, inv)[None, :, :].expand(H, W, -1) |
| freqs = torch.cat([h_freqs, w_freqs], dim=-1).reshape(H * W, -1) |
| return freqs.cos().to(dtype), freqs.sin().to(dtype) |
|
|
| def forward(self, x, H, W): |
| N = H * W |
| cos_c, sin_c = self._build_cache(H, W, x.device, x.dtype) |
| if x.dim() == 4: |
| cos_c = cos_c[None, None, :N, :] |
| sin_c = sin_c[None, None, :N, :] |
| else: |
| cos_c = cos_c[None, :N, :] |
| sin_c = sin_c[None, :N, :] |
| d = cos_c.shape[-1] |
| x1, x2, xr = x[..., :d], x[..., d:2*d], x[..., 2*d:] |
| return torch.cat([x1*cos_c - x2*sin_c, x1*sin_c + x2*cos_c, xr], dim=-1) |
|
|
|
|
| class MultiQueryCrossAttention(nn.Module): |
| def __init__(self, dim, num_heads=4, qk_norm=True): |
| super().__init__() |
| assert dim % num_heads == 0 |
| self.num_heads = num_heads |
| self.head_dim = dim // num_heads |
| self.q_proj = nn.Linear(dim, dim, bias=False) |
| self.k_proj = nn.Linear(dim, self.head_dim, bias=False) |
| self.v_proj = nn.Linear(dim, self.head_dim, bias=False) |
| self.out_proj = nn.Linear(dim, dim, bias=False) |
| self.q_norm = RMSNorm(self.head_dim) if qk_norm else nn.Identity() |
| self.k_norm = RMSNorm(self.head_dim) if qk_norm else nn.Identity() |
| self.norm = nn.LayerNorm(dim) |
|
|
| def forward(self, x, context): |
| B, N, D = x.shape |
| S = context.shape[1] |
| residual = x |
| x = self.norm(x) |
| q = self.q_proj(x).view(B, N, self.num_heads, self.head_dim).transpose(1, 2) |
| k = self.k_proj(context).view(B, S, 1, self.head_dim).transpose(1, 2) |
| v = self.v_proj(context).view(B, S, 1, self.head_dim).transpose(1, 2) |
| q, k = self.q_norm(q), self.k_norm(k) |
| k = k.expand(-1, self.num_heads, -1, -1) |
| v = v.expand(-1, self.num_heads, -1, -1) |
| attn = F.scaled_dot_product_attention(q, k, v, scale=1.0/math.sqrt(self.head_dim)) |
| return residual + self.out_proj(attn.transpose(1, 2).reshape(B, N, D)) |
|
|
|
|
| class MultiQuerySelfAttention(nn.Module): |
| def __init__(self, dim, num_heads=4, qk_norm=True): |
| super().__init__() |
| assert dim % num_heads == 0 |
| self.num_heads = num_heads |
| self.head_dim = dim // num_heads |
| self.q_proj = nn.Linear(dim, dim, bias=False) |
| self.k_proj = nn.Linear(dim, self.head_dim, bias=False) |
| self.v_proj = nn.Linear(dim, self.head_dim, bias=False) |
| self.out_proj = nn.Linear(dim, dim, bias=False) |
| self.q_norm = RMSNorm(self.head_dim) if qk_norm else nn.Identity() |
| self.k_norm = RMSNorm(self.head_dim) if qk_norm else nn.Identity() |
| self.norm = nn.LayerNorm(dim) |
| self.rope = RotaryEmbedding2D(self.head_dim) |
|
|
| def forward(self, x, H, W): |
| B, N, D = x.shape |
| residual = x |
| x = self.norm(x) |
| q = self.q_proj(x).view(B, N, self.num_heads, self.head_dim).transpose(1, 2) |
| k = self.k_proj(x).view(B, N, 1, self.head_dim).transpose(1, 2) |
| v = self.v_proj(x).view(B, N, 1, self.head_dim).transpose(1, 2) |
| q, k = self.q_norm(q), self.k_norm(k) |
| q, k = self.rope(q, H, W), self.rope(k, H, W) |
| k = k.expand(-1, self.num_heads, -1, -1) |
| v = v.expand(-1, self.num_heads, -1, -1) |
| attn = F.scaled_dot_product_attention(q, k, v, scale=1.0/math.sqrt(self.head_dim)) |
| return residual + self.out_proj(attn.transpose(1, 2).reshape(B, N, D)) |
|
|
|
|
| class UIBFFN(nn.Module): |
| def __init__(self, dim, expansion=2, spatial_size=4): |
| super().__init__() |
| hidden = dim * expansion |
| self.norm = nn.LayerNorm(dim) |
| self.pw_up = nn.Linear(dim, hidden, bias=False) |
| self.gate = nn.Linear(dim, hidden, bias=False) |
| self.dw_conv = nn.Conv2d(hidden, hidden, 3, padding=1, groups=hidden, bias=True) |
| self.pw_down = nn.Linear(hidden, dim, bias=False) |
|
|
| def forward(self, x, H, W): |
| B, N, D = x.shape |
| residual = x |
| x = self.norm(x) |
| h = self.pw_up(x) |
| g = F.silu(self.gate(x)) |
| h_2d = h.view(B, H, W, -1).permute(0, 3, 1, 2) |
| h = self.dw_conv(h_2d).permute(0, 2, 3, 1).reshape(B, N, -1) |
| return residual + self.pw_down(h * g) |
|
|
|
|
| class TimestepEmbedding(nn.Module): |
| def __init__(self, dim, max_period=10000): |
| super().__init__() |
| self.dim = dim |
| self.max_period = max_period |
| self.mlp = nn.Sequential(nn.Linear(dim, dim*4), nn.SiLU(), nn.Linear(dim*4, dim)) |
|
|
| def forward(self, t): |
| half = self.dim // 2 |
| freqs = torch.exp(-math.log(self.max_period) * torch.arange(half, device=t.device, dtype=torch.float32) / half) |
| args = t[:, None].float() * freqs[None, :] |
| emb = torch.cat([torch.cos(args), torch.sin(args)], dim=-1) |
| if self.dim % 2: |
| emb = F.pad(emb, (0, 1)) |
| return self.mlp(emb.to(t.dtype)) |
|
|
|
|
| class IterationEmbedding(nn.Module): |
| def __init__(self, dim, max_iterations=8): |
| super().__init__() |
| self.embed = nn.Embedding(max_iterations, dim) |
|
|
| def forward(self, iter_idx, batch_size, device): |
| return self.embed(torch.full((batch_size,), iter_idx, device=device, dtype=torch.long)) |
|
|