| import torch |
| from typing import Optional, Tuple, Union, Any |
| from diffusers import UNet2DConditionModel |
| from diffusers.models.attention_processor import Attention |
| from diffusers.models.unets.unet_2d_condition import UNet2DConditionOutput |
|
|
|
|
| def construct_pix2pix_attention(hidden_states_dim, norm_type="none"): |
| if norm_type == "layernorm": |
| norm = torch.nn.LayerNorm(hidden_states_dim) |
| else: |
| norm = torch.nn.Identity() |
| attention = Attention( |
| query_dim=hidden_states_dim, |
| heads=8, |
| dim_head=hidden_states_dim // 8, |
| bias=True, |
| ) |
| |
| attention.xformers_not_supported = True |
| return norm, attention |
|
|
|
|
| def switch_extra_processor(model, enable_filter=lambda x:True): |
| def recursive_add_processors(name: str, module: torch.nn.Module): |
| for sub_name, child in module.named_children(): |
| recursive_add_processors(f"{name}.{sub_name}", child) |
|
|
| if isinstance(module, ExtraAttnProc): |
| module.enabled = enable_filter(name) |
|
|
| for name, module in model.named_children(): |
| recursive_add_processors(name, module) |
|
|
|
|
| def add_extra_processor(model: torch.nn.Module, enable_filter=lambda x:True, **kwargs): |
| return_dict = torch.nn.ModuleDict() |
| proj_in_dim = kwargs.get('proj_in_dim', False) |
| kwargs.pop('proj_in_dim', None) |
|
|
| def recursive_add_processors(name: str, module: torch.nn.Module): |
| for sub_name, child in module.named_children(): |
| if "ref_unet" not in (sub_name + name): |
| recursive_add_processors(f"{name}.{sub_name}", child) |
|
|
| if isinstance(module, Attention): |
| new_processor = ExtraAttnProc( |
| chained_proc=module.get_processor(), |
| enabled=enable_filter(f"{name}.processor"), |
| name=f"{name}.processor", |
| proj_in_dim=proj_in_dim if proj_in_dim else module.cross_attention_dim, |
| target_dim=module.cross_attention_dim, |
| **kwargs |
| ) |
| module.set_processor(new_processor) |
| return_dict[f"{name}.processor".replace(".", "__")] = new_processor |
|
|
| for name, module in model.named_children(): |
| recursive_add_processors(name, module) |
| return return_dict |
|
|
|
|
|
|
| class ExtraAttnProc(torch.nn.Module): |
| def __init__( |
| self, |
| chained_proc, |
| enabled=False, |
| name=None, |
| mode='extract', |
| with_proj_in=False, |
| proj_in_dim=768, |
| target_dim=None, |
| pixel_wise_crosspond=False, |
| norm_type="none", |
| crosspond_effect_on="all", |
| crosspond_chain_pos="parralle", |
| simple_3d=False, |
| views=4, |
| ) -> None: |
| super().__init__() |
| self.enabled = enabled |
| self.chained_proc = chained_proc |
| self.name = name |
| self.mode = mode |
| self.with_proj_in=with_proj_in |
| self.proj_in_dim = proj_in_dim |
| self.target_dim = target_dim or proj_in_dim |
| self.hidden_states_dim = self.target_dim |
| self.pixel_wise_crosspond = pixel_wise_crosspond |
| self.crosspond_effect_on = crosspond_effect_on |
| self.crosspond_chain_pos = crosspond_chain_pos |
| self.views = views |
| self.simple_3d = simple_3d |
| if self.with_proj_in and self.enabled: |
| self.in_linear = torch.nn.Linear(self.proj_in_dim, self.target_dim, bias=False) |
| if self.target_dim == self.proj_in_dim: |
| self.in_linear.weight.data = torch.eye(proj_in_dim) |
| else: |
| self.in_linear = None |
| if self.pixel_wise_crosspond and self.enabled: |
| self.crosspond_norm, self.crosspond_attention = construct_pix2pix_attention(self.hidden_states_dim, norm_type=norm_type) |
| |
| def do_crosspond_attention(self, hidden_states: torch.FloatTensor, other_states: torch.FloatTensor): |
| hidden_states = self.crosspond_norm(hidden_states) |
| |
| batch, L, D = hidden_states.shape |
| assert hidden_states.shape == other_states.shape, f"got {hidden_states.shape} and {other_states.shape}" |
| |
| hidden_states = hidden_states.reshape(batch * L, 1, D) |
| other_states = other_states.reshape(batch * L, 1, D) |
| hidden_states_catted = other_states |
| hidden_states = self.crosspond_attention( |
| hidden_states, |
| encoder_hidden_states=hidden_states_catted, |
| ) |
| return hidden_states.reshape(batch, L, D) |
| |
| def __call__( |
| self, attn: Attention, hidden_states, encoder_hidden_states=None, attention_mask=None, |
| ref_dict: dict = None, mode=None, **kwargs |
| ) -> Any: |
| if not self.enabled: |
| return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) |
| if encoder_hidden_states is None: |
| encoder_hidden_states = hidden_states |
| assert ref_dict is not None |
| if (mode or self.mode) == 'extract': |
| ref_dict[self.name] = hidden_states |
| hidden_states1 = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) |
| if self.pixel_wise_crosspond and self.crosspond_chain_pos == "after": |
| ref_dict[self.name] = hidden_states1 |
| return hidden_states1 |
| elif (mode or self.mode) == 'inject': |
| ref_state = ref_dict.pop(self.name) |
| if self.with_proj_in: |
| ref_state = self.in_linear(ref_state) |
| |
| B, L, D = ref_state.shape |
| if hidden_states.shape[0] == B: |
| modalities = 1 |
| views = 1 |
| else: |
| modalities = hidden_states.shape[0] // B // self.views |
| views = self.views |
| if self.pixel_wise_crosspond: |
| if self.crosspond_effect_on == "all": |
| ref_state = ref_state[:, None].expand(-1, modalities * views, -1, -1).reshape(-1, *ref_state.shape[-2:]) |
| |
| if self.crosspond_chain_pos == "before": |
| hidden_states = hidden_states + self.do_crosspond_attention(hidden_states, ref_state) |
| |
| hidden_states1 = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) |
| |
| if self.crosspond_chain_pos == "parralle": |
| hidden_states1 = hidden_states1 + self.do_crosspond_attention(hidden_states, ref_state) |
| |
| if self.crosspond_chain_pos == "after": |
| hidden_states1 = hidden_states1 + self.do_crosspond_attention(hidden_states1, ref_state) |
| return hidden_states1 |
| else: |
| assert self.crosspond_effect_on == "first" |
| |
| |
| ref_state = ref_state[:, None].expand(-1, modalities, -1, -1).reshape(-1, ref_state.shape[-2], ref_state.shape[-1]) |
| |
| def do_paritial_crosspond(hidden_states, ref_state): |
| first_view_hidden_states = hidden_states.view(-1, views, hidden_states.shape[1], hidden_states.shape[2])[:, 0] |
| hidden_states2 = self.do_crosspond_attention(first_view_hidden_states, ref_state) |
| hidden_states2_padded = torch.zeros_like(hidden_states).reshape(-1, views, hidden_states.shape[1], hidden_states.shape[2]) |
| hidden_states2_padded[:, 0] = hidden_states2 |
| hidden_states2_padded = hidden_states2_padded.reshape(-1, hidden_states.shape[1], hidden_states.shape[2]) |
| return hidden_states2_padded |
| |
| if self.crosspond_chain_pos == "before": |
| hidden_states = hidden_states + do_paritial_crosspond(hidden_states, ref_state) |
| |
| hidden_states1 = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) |
| if self.crosspond_chain_pos == "parralle": |
| hidden_states1 = hidden_states1 + do_paritial_crosspond(hidden_states, ref_state) |
| if self.crosspond_chain_pos == "after": |
| hidden_states1 = hidden_states1 + do_paritial_crosspond(hidden_states1, ref_state) |
| return hidden_states1 |
| elif self.simple_3d: |
| B, L, C = encoder_hidden_states.shape |
| mv = self.views |
| encoder_hidden_states = encoder_hidden_states.reshape(B // mv, mv, L, C) |
| ref_state = ref_state[:, None] |
| encoder_hidden_states = torch.cat([encoder_hidden_states, ref_state], dim=1) |
| encoder_hidden_states = encoder_hidden_states.reshape(B // mv, 1, (mv+1) * L, C) |
| encoder_hidden_states = encoder_hidden_states.repeat(1, mv, 1, 1).reshape(-1, (mv+1) * L, C) |
| return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) |
| else: |
| ref_state = ref_state[:, None].expand(-1, modalities * views, -1, -1).reshape(-1, ref_state.shape[-2], ref_state.shape[-1]) |
| encoder_hidden_states = torch.cat([encoder_hidden_states, ref_state], dim=1) |
| return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) |
| else: |
| raise NotImplementedError("mode or self.mode is required to be 'extract' or 'inject'") |
|
|
|
|
| class UnifieldWrappedUNet(UNet2DConditionModel): |
| def __init__( |
| self, |
| sample_size: Optional[int] = None, |
| in_channels: int = 4, |
| out_channels: int = 4, |
| center_input_sample: bool = False, |
| flip_sin_to_cos: bool = True, |
| freq_shift: int = 0, |
| down_block_types: Tuple[str] = ( |
| "CrossAttnDownBlock2D", |
| "CrossAttnDownBlock2D", |
| "CrossAttnDownBlock2D", |
| "DownBlock2D", |
| ), |
| mid_block_type: Optional[str] = "UNetMidBlock2DCrossAttn", |
| up_block_types: Tuple[str] = ("UpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D"), |
| only_cross_attention: Union[bool, Tuple[bool]] = False, |
| block_out_channels: Tuple[int] = (320, 640, 1280, 1280), |
| layers_per_block: Union[int, Tuple[int]] = 2, |
| downsample_padding: int = 1, |
| mid_block_scale_factor: float = 1, |
| dropout: float = 0.0, |
| act_fn: str = "silu", |
| norm_num_groups: Optional[int] = 32, |
| norm_eps: float = 1e-5, |
| cross_attention_dim: Union[int, Tuple[int]] = 1280, |
| transformer_layers_per_block: Union[int, Tuple[int], Tuple[Tuple]] = 1, |
| reverse_transformer_layers_per_block: Optional[Tuple[Tuple[int]]] = None, |
| encoder_hid_dim: Optional[int] = None, |
| encoder_hid_dim_type: Optional[str] = None, |
| attention_head_dim: Union[int, Tuple[int]] = 8, |
| num_attention_heads: Optional[Union[int, Tuple[int]]] = None, |
| dual_cross_attention: bool = False, |
| use_linear_projection: bool = False, |
| class_embed_type: Optional[str] = None, |
| addition_embed_type: Optional[str] = None, |
| addition_time_embed_dim: Optional[int] = None, |
| num_class_embeds: Optional[int] = None, |
| upcast_attention: bool = False, |
| resnet_time_scale_shift: str = "default", |
| resnet_skip_time_act: bool = False, |
| resnet_out_scale_factor: float = 1.0, |
| time_embedding_type: str = "positional", |
| time_embedding_dim: Optional[int] = None, |
| time_embedding_act_fn: Optional[str] = None, |
| timestep_post_act: Optional[str] = None, |
| time_cond_proj_dim: Optional[int] = None, |
| conv_in_kernel: int = 3, |
| conv_out_kernel: int = 3, |
| projection_class_embeddings_input_dim: Optional[int] = None, |
| attention_type: str = "default", |
| class_embeddings_concat: bool = False, |
| mid_block_only_cross_attention: Optional[bool] = None, |
| cross_attention_norm: Optional[str] = None, |
| addition_embed_type_num_heads: int = 64, |
| |
| init_self_attn_ref: bool = False, |
| self_attn_ref_other_model_name: str = 'lambdalabs/sd-image-variations-diffusers', |
| self_attn_ref_position: str = "attn1", |
| self_attn_ref_pixel_wise_crosspond: bool = False, |
| self_attn_ref_effect_on: str = "all", |
| self_attn_ref_chain_pos: str = "parralle", |
| use_simple3d_attn: bool = False, |
| **kwargs |
| ): |
| super().__init__(**{ |
| k: v for k, v in locals().items() if k not in |
| ["self", "kwargs", "__class__", |
| "init_self_attn_ref", "self_attn_ref_other_model_name", "self_attn_ref_position", "self_attn_ref_pixel_wise_crosspond", |
| "self_attn_ref_effect_on", "self_attn_ref_chain_pos", "use_simple3d_attn" |
| ] |
| }) |
| |
| |
| self.ref_unet: UNet2DConditionModel = UNet2DConditionModel.from_pretrained( |
| self_attn_ref_other_model_name, subfolder="unet", torch_dtype=self.dtype |
| ) |
| add_extra_processor( |
| model=self.ref_unet, |
| enable_filter=lambda name: name.endswith(f"{self_attn_ref_position}.processor"), |
| mode='extract', |
| with_proj_in=False, |
| pixel_wise_crosspond=False, |
| ) |
| add_extra_processor( |
| model=self, |
| enable_filter=lambda name: name.endswith(f"{self_attn_ref_position}.processor"), |
| mode='inject', |
| with_proj_in=False, |
| pixel_wise_crosspond=self_attn_ref_pixel_wise_crosspond, |
| crosspond_effect_on=self_attn_ref_effect_on, |
| crosspond_chain_pos=self_attn_ref_chain_pos, |
| simple_3d=use_simple3d_attn, |
| ) |
| switch_extra_processor(self, enable_filter=lambda name: name.endswith(f"{self_attn_ref_position}.processor")) |
| |
| def __call__( |
| self, |
| sample: torch.Tensor, |
| timestep: Union[torch.Tensor, float, int], |
| encoder_hidden_states: torch.Tensor, |
| condition_latens: torch.Tensor = None, |
| class_labels: Optional[torch.Tensor] = None, |
| ) -> Union[UNet2DConditionOutput, Tuple]: |
| |
| ref_dict = {} |
| self.ref_unet(condition_latens, timestep, encoder_hidden_states, cross_attention_kwargs=dict(ref_dict=ref_dict)) |
| return self.forward( |
| sample, timestep, encoder_hidden_states, |
| class_labels=class_labels, |
| cross_attention_kwargs=dict(ref_dict=ref_dict, mode='inject'), |
| ) |
| |