| import math |
| from typing import Any |
|
|
| import torch |
| from torch import nn |
| from torch.nn.functional import scaled_dot_product_attention |
| from torch.nn.attention import SDPBackend, sdpa_kernel |
| from .model_config import CoDAConfig |
|
|
|
|
|
|
| def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor: |
| """ |
| This is the equivalent of torch.repeat_interleave(x, dim=1, repeats=n_rep). The hidden states go from (batch, |
| num_key_value_heads, seqlen, head_dim) to (batch, num_attention_heads, seqlen, head_dim) |
| """ |
| batch, num_key_value_heads, slen, head_dim = hidden_states.shape |
| if n_rep == 1: |
| return hidden_states |
| hidden_states = hidden_states[:, :, None, :, :].expand( |
| batch, num_key_value_heads, n_rep, slen, head_dim |
| ) |
| return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim) |
|
|
|
|
| class AttentionModule(nn.Module): |
| def __init__(self, config: CoDAConfig, kernel_config: dict[str, Any] | None = None): |
| super().__init__() |
| self.config = config |
| self.kernel_config = kernel_config |
| self.partition_spec = None |
|
|
| def forward( |
| self, |
| query_states: torch.Tensor, |
| key_states: torch.Tensor, |
| value_states: torch.Tensor, |
| attention_mask: torch.Tensor | None = None, |
| ): |
| """GPU-optimized PyTorch implementation""" |
|
|
| if self.config.attention_kernel != "splash_attention": |
| num_key_value_groups = ( |
| self.config.num_attention_heads // self.config.num_key_value_heads |
| ) |
| key_states = repeat_kv(key_states, num_key_value_groups) |
| value_states = repeat_kv(value_states, num_key_value_groups) |
|
|
| bsz, num_heads, q_len, head_dim = query_states.size() |
| head_dim = value_states.shape[-1] |
| kv_seq_len = key_states.shape[-2] |
|
|
| |
| match self.config.attention_kernel: |
| case "splash_attention": |
| raise NotImplementedError( |
| "Splash Attention is not supported in GPU environment" |
| ) |
|
|
| case "flash_attention": |
| |
| with sdpa_kernel(SDPBackend.FLASH_ATTENTION): |
| attn_output = scaled_dot_product_attention( |
| query_states, |
| key_states, |
| value_states, |
| dropout_p=( |
| self.config.attention_dropout if self.training else 0.0 |
| ), |
| is_causal=False, |
| ) |
| case _: |
| |
| with sdpa_kernel(SDPBackend.MATH): |
| attn_output = scaled_dot_product_attention( |
| query_states, |
| key_states, |
| value_states, |
| dropout_p=( |
| self.config.attention_dropout if self.training else 0.0 |
| ), |
| is_causal=False, |
| ) |
|
|
| if attn_output.size() != (bsz, num_heads, q_len, head_dim): |
| raise ValueError( |
| f"`attn_output` should be of size {(bsz, num_heads, q_len, head_dim)}, but is" |
| f" {attn_output.size()}" |
| ) |
| return attn_output |
|
|