| |
| |
|
|
| from typing import Optional |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
|
|
|
|
| def modulate(x, shift, scale): |
| return x * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1) |
|
|
|
|
| def pool_tokens(x: torch.Tensor, mask: torch.Tensor, *, keepdim=False) -> torch.Tensor: |
| """ |
| Pool tokens in x using mask. |
| |
| NOTE: We assume x does not require gradients. |
| |
| Args: |
| x: (B, L, D) tensor of tokens. |
| mask: (B, L) boolean tensor indicating which tokens are not padding. |
| |
| Returns: |
| pooled: (B, D) tensor of pooled tokens. |
| """ |
| assert x.size(1) == mask.size(1) |
| assert x.size(0) == mask.size(0) |
| mask = mask[:, :, None].to(dtype=x.dtype) |
| mask = mask / mask.sum(dim=1, keepdim=True).clamp(min=1) |
| pooled = (x * mask).sum(dim=1, keepdim=keepdim) |
| return pooled |
|
|
|
|
| class AttentionPool(nn.Module): |
| def __init__( |
| self, |
| embed_dim: int, |
| num_heads: int, |
| output_dim: int = None, |
| device: Optional[torch.device] = None, |
| dtype=None, |
| operations=None, |
| ): |
| """ |
| Args: |
| spatial_dim (int): Number of tokens in sequence length. |
| embed_dim (int): Dimensionality of input tokens. |
| num_heads (int): Number of attention heads. |
| output_dim (int): Dimensionality of output tokens. Defaults to embed_dim. |
| """ |
| super().__init__() |
| self.num_heads = num_heads |
| self.to_kv = operations.Linear(embed_dim, 2 * embed_dim, device=device, dtype=dtype) |
| self.to_q = operations.Linear(embed_dim, embed_dim, device=device, dtype=dtype) |
| self.to_out = operations.Linear(embed_dim, output_dim or embed_dim, device=device, dtype=dtype) |
|
|
| def forward(self, x, mask): |
| """ |
| Args: |
| x (torch.Tensor): (B, L, D) tensor of input tokens. |
| mask (torch.Tensor): (B, L) boolean tensor indicating which tokens are not padding. |
| |
| NOTE: We assume x does not require gradients. |
| |
| Returns: |
| x (torch.Tensor): (B, D) tensor of pooled tokens. |
| """ |
| D = x.size(2) |
|
|
| |
| attn_mask = mask[:, None, None, :].bool() |
| attn_mask = F.pad(attn_mask, (1, 0), value=True) |
|
|
| |
| x_pool = pool_tokens(x, mask, keepdim=True) |
|
|
| |
| x = torch.cat([x_pool, x], dim=1) |
|
|
| |
| kv = self.to_kv(x) |
| q = self.to_q(x[:, 0]) |
|
|
| |
| head_dim = D // self.num_heads |
| kv = kv.unflatten(2, (2, self.num_heads, head_dim)) |
| kv = kv.transpose(1, 3) |
| k, v = kv.unbind(2) |
| q = q.unflatten(1, (self.num_heads, head_dim)) |
| q = q.unsqueeze(2) |
|
|
| |
| x = F.scaled_dot_product_attention( |
| q, k, v, attn_mask=attn_mask, dropout_p=0.0 |
| ) |
|
|
| |
| x = x.squeeze(2).flatten(1, 2) |
| x = self.to_out(x) |
| return x |
|
|