| |
| |
|
|
| |
| |
|
|
| from typing import Optional |
|
|
| import torch |
| from torch import nn, Tensor |
|
|
| from sam2.modeling.sam.transformer import RoPEAttention |
|
|
| from sam2.modeling.sam2_utils import get_activation_fn, get_clones |
|
|
|
|
| class MemoryAttentionLayer(nn.Module): |
|
|
| def __init__( |
| self, |
| activation: str, |
| cross_attention: nn.Module, |
| d_model: int, |
| dim_feedforward: int, |
| dropout: float, |
| pos_enc_at_attn: bool, |
| pos_enc_at_cross_attn_keys: bool, |
| pos_enc_at_cross_attn_queries: bool, |
| self_attention: nn.Module, |
| ): |
| super().__init__() |
| self.d_model = d_model |
| self.dim_feedforward = dim_feedforward |
| self.dropout_value = dropout |
| self.self_attn = self_attention |
| self.cross_attn_image = cross_attention |
|
|
| |
| self.linear1 = nn.Linear(d_model, dim_feedforward) |
| self.dropout = nn.Dropout(dropout) |
| self.linear2 = nn.Linear(dim_feedforward, d_model) |
|
|
| self.norm1 = nn.LayerNorm(d_model) |
| self.norm2 = nn.LayerNorm(d_model) |
| self.norm3 = nn.LayerNorm(d_model) |
| self.dropout1 = nn.Dropout(dropout) |
| self.dropout2 = nn.Dropout(dropout) |
| self.dropout3 = nn.Dropout(dropout) |
|
|
| self.activation_str = activation |
| self.activation = get_activation_fn(activation) |
|
|
| |
| self.pos_enc_at_attn = pos_enc_at_attn |
| self.pos_enc_at_cross_attn_queries = pos_enc_at_cross_attn_queries |
| self.pos_enc_at_cross_attn_keys = pos_enc_at_cross_attn_keys |
|
|
| def _forward_sa(self, tgt, query_pos): |
| |
| tgt = tgt.to(dtype = torch.bfloat16) |
| tgt2 = self.norm1(tgt) |
| q = k = tgt2 + query_pos if self.pos_enc_at_attn else tgt2 |
| tgt2 = self.self_attn(q, k, v=tgt2) |
| tgt = tgt + self.dropout1(tgt2) |
| return tgt |
|
|
| def _forward_ca(self, tgt, memory, query_pos, pos, num_k_exclude_rope=0): |
| kwds = {} |
| if num_k_exclude_rope > 0: |
| assert isinstance(self.cross_attn_image, RoPEAttention) |
| kwds = {"num_k_exclude_rope": num_k_exclude_rope} |
|
|
| |
| tgt2 = self.norm2(tgt) |
| tgt2 = self.cross_attn_image( |
| q=tgt2 + query_pos if self.pos_enc_at_cross_attn_queries else tgt2, |
| k=(memory + pos if self.pos_enc_at_cross_attn_keys else memory).to(dtype = torch.bfloat16), |
| v=memory.to(dtype = torch.bfloat16), |
| **kwds, |
| ) |
| tgt = tgt + self.dropout2(tgt2) |
| return tgt |
|
|
| def forward( |
| self, |
| tgt, |
| memory, |
| pos: Optional[Tensor] = None, |
| query_pos: Optional[Tensor] = None, |
| num_k_exclude_rope: int = 0, |
| ) -> torch.Tensor: |
|
|
| |
| tgt = self._forward_sa(tgt, query_pos) |
| tgt = self._forward_ca(tgt, memory, query_pos, pos, num_k_exclude_rope) |
| |
| tgt2 = self.norm3(tgt) |
| tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt2)))) |
| tgt = tgt + self.dropout3(tgt2) |
| return tgt |
|
|
|
|
| class MemoryAttention(nn.Module): |
| def __init__( |
| self, |
| d_model: int, |
| pos_enc_at_input: bool, |
| layer: nn.Module, |
| num_layers: int, |
| batch_first: bool = True, |
| ): |
| super().__init__() |
| self.d_model = d_model |
| self.layers = get_clones(layer, num_layers) |
| self.num_layers = num_layers |
| self.norm = nn.LayerNorm(d_model) |
| self.pos_enc_at_input = pos_enc_at_input |
| self.batch_first = batch_first |
|
|
| def forward( |
| self, |
| curr: torch.Tensor, |
| memory: torch.Tensor, |
| curr_pos: Optional[Tensor] = None, |
| memory_pos: Optional[Tensor] = None, |
| num_obj_ptr_tokens: int = 0, |
| ): |
| if isinstance(curr, list): |
| assert isinstance(curr_pos, list) |
| assert len(curr) == len(curr_pos) == 1 |
| curr, curr_pos = ( |
| curr[0], |
| curr_pos[0], |
| ) |
|
|
| assert ( |
| curr.shape[1] == memory.shape[1] |
| ), "Batch size must be the same for curr and memory" |
|
|
| output = curr |
| if self.pos_enc_at_input and curr_pos is not None: |
| output = output + 0.1 * curr_pos |
|
|
| if self.batch_first: |
| |
| output = output.transpose(0, 1) |
| curr_pos = curr_pos.transpose(0, 1) |
| memory = memory.transpose(0, 1) |
| memory_pos = memory_pos.transpose(0, 1) |
|
|
| for layer in self.layers: |
| kwds = {} |
| if isinstance(layer.cross_attn_image, RoPEAttention): |
| kwds = {"num_k_exclude_rope": num_obj_ptr_tokens} |
|
|
| output = layer( |
| tgt=output, |
| memory=memory, |
| pos=memory_pos, |
| query_pos=curr_pos, |
| **kwds, |
| ) |
| normed_output = self.norm(output) |
|
|
| if self.batch_first: |
| |
| normed_output = normed_output.transpose(0, 1) |
| curr_pos = curr_pos.transpose(0, 1) |
|
|
| return normed_output |
|
|