| """ |
| MotionCLIP - Motion-Text CLIP Model |
| Load and use the MotionCLIP model for motion-text retrieval and similarity computation. |
| |
| Usage: |
| from motion_clip_hf import MotionCLIP |
| |
| # Load from HuggingFace Hub |
| model = MotionCLIP.from_pretrained("khania/motion-clip") |
| |
| # Encode text and motion |
| text_emb = model.encode_text(["a person walks forward"]) |
| motion_emb = model.encode_motion(motion_array) # (T, 272) numpy array |
| |
| # Compute similarity |
| similarity = model.compute_similarity(motion_array, ["walking", "running"]) |
| """ |
|
|
| import os |
| import json |
| import math |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from typing import List, Optional, Union |
| from pathlib import Path |
|
|
| try: |
| from transformers import CLIPTextModel, CLIPTokenizer |
| TRANSFORMERS_AVAILABLE = True |
| except ImportError: |
| TRANSFORMERS_AVAILABLE = False |
|
|
| try: |
| from huggingface_hub import hf_hub_download |
| HF_HUB_AVAILABLE = True |
| except ImportError: |
| HF_HUB_AVAILABLE = False |
|
|
|
|
| def sinusoidal_positional_encoding(seq_len: int, dim: int, device: torch.device) -> torch.Tensor: |
| """Generate sinusoidal positional encoding (matches original training code).""" |
| pe = torch.zeros(seq_len, dim, device=device) |
| position = torch.arange(0, seq_len, dtype=torch.float32, device=device).unsqueeze(1) |
| div_term = torch.exp( |
| torch.arange(0, dim, 2, dtype=torch.float32, device=device) * (-math.log(10000.0) / dim) |
| ) |
| pe[:, 0::2] = torch.sin(position * div_term) |
| pe[:, 1::2] = torch.cos(position * div_term) |
| return pe |
|
|
|
|
| class MotionTransformerEncoder(nn.Module): |
| """Transformer encoder for motion sequences. |
| |
| Architecture matches original training code exactly: |
| - Sinusoidal positional encoding (not learnable) |
| - Masked mean pooling (no cls token) |
| - Simple Linear output projection |
| - Pre-LayerNorm architecture (norm_first=True to match _SDPATransformerEncoderLayer) |
| """ |
| |
| def __init__( |
| self, |
| input_dim: int = 272, |
| hidden_dim: int = 768, |
| embed_dim: int = 512, |
| num_heads: int = 12, |
| num_layers: int = 8, |
| max_seq_len: int = 1024, |
| dropout: float = 0.1 |
| ): |
| super().__init__() |
| self.input_dim = input_dim |
| self.hidden_dim = hidden_dim |
| self.embed_dim = embed_dim |
| self.max_seq_len = max_seq_len |
| |
| self.input_proj = nn.Linear(input_dim, hidden_dim) |
| |
| encoder_layer = nn.TransformerEncoderLayer( |
| d_model=hidden_dim, |
| nhead=num_heads, |
| dim_feedforward=hidden_dim * 4, |
| dropout=dropout, |
| activation='gelu', |
| batch_first=True, |
| norm_first=True |
| ) |
| self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) |
| self.output_proj = nn.Linear(hidden_dim, embed_dim) |
| |
| def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor: |
| B, T, _ = x.shape |
| x = self.input_proj(x) |
| |
| pe = sinusoidal_positional_encoding(T, self.hidden_dim, x.device) |
| x = x + pe.unsqueeze(0) |
| |
| if mask is not None: |
| key_padding_mask = ~mask |
| else: |
| key_padding_mask = None |
| |
| x = self.transformer(x, src_key_padding_mask=key_padding_mask) |
| |
| if mask is not None: |
| mask_expanded = mask.unsqueeze(-1).float() |
| x = (x * mask_expanded).sum(dim=1) / mask_expanded.sum(dim=1).clamp(min=1e-6) |
| else: |
| x = x.mean(dim=1) |
| |
| output = self.output_proj(x) |
| return output |
|
|
|
|
| class TextEncoderCLIP(nn.Module): |
| """HuggingFace CLIP text encoder - matches original training code.""" |
| |
| def __init__(self, model_name: str = "openai/clip-vit-base-patch32", output_dim: int = 512): |
| super().__init__() |
| if not TRANSFORMERS_AVAILABLE: |
| raise ImportError("transformers required: pip install transformers") |
| |
| self.tokenizer = CLIPTokenizer.from_pretrained(model_name) |
| self.model = CLIPTextModel.from_pretrained(model_name) |
| self.hidden_size = self.model.config.hidden_size |
| self.output_dim = output_dim |
| |
| if self.hidden_size != output_dim: |
| self.proj = nn.Linear(self.hidden_size, output_dim) |
| else: |
| self.proj = nn.Identity() |
| |
| def forward(self, texts: List[str], device: torch.device) -> torch.Tensor: |
| inputs = self.tokenizer( |
| texts, |
| padding=True, |
| truncation=True, |
| max_length=self.tokenizer.model_max_length, |
| return_tensors="pt" |
| ) |
| inputs = {k: v.to(device) for k, v in inputs.items()} |
| out = self.model(**inputs) |
| |
| if hasattr(out, "pooler_output") and out.pooler_output is not None: |
| feat = out.pooler_output |
| else: |
| feat = out.last_hidden_state[:, 0] |
| |
| return self.proj(feat) |
|
|
|
|
| class MotionCLIP(nn.Module): |
| """Motion-Text CLIP Model with fine-tuned text encoder.""" |
| |
| DEFAULT_CONFIG = { |
| "motion_input_dim": 272, |
| "motion_hidden_dim": 768, |
| "embed_dim": 512, |
| "motion_num_heads": 12, |
| "motion_num_layers": 8, |
| "motion_max_seq_len": 1024, |
| "motion_dropout": 0.1, |
| "text_encoder_name": "openai/clip-vit-base-patch32" |
| } |
| |
| def __init__(self, config: dict = None): |
| super().__init__() |
| self.config = {**self.DEFAULT_CONFIG, **(config or {})} |
| |
| self.motion_encoder = MotionTransformerEncoder( |
| input_dim=self.config["motion_input_dim"], |
| hidden_dim=self.config["motion_hidden_dim"], |
| embed_dim=self.config["embed_dim"], |
| num_heads=self.config["motion_num_heads"], |
| num_layers=self.config["motion_num_layers"], |
| max_seq_len=self.config["motion_max_seq_len"], |
| dropout=self.config["motion_dropout"] |
| ) |
| |
| self.text_encoder = TextEncoderCLIP( |
| model_name=self.config["text_encoder_name"], |
| output_dim=self.config["embed_dim"] |
| ) |
| |
| self.logit_scale = nn.Parameter(torch.ones([]) * np.log(1 / 0.07)) |
| self.register_buffer("mean", torch.zeros(self.config["motion_input_dim"])) |
| self.register_buffer("std", torch.ones(self.config["motion_input_dim"])) |
| |
| def encode_text(self, texts: List[str], normalize: bool = True) -> torch.Tensor: |
| device = next(self.parameters()).device |
| text_embeds = self.text_encoder(texts, device) |
| if normalize: |
| text_embeds = F.normalize(text_embeds, dim=-1) |
| return text_embeds |
| |
| def encode_motion( |
| self, |
| motion: Union[np.ndarray, torch.Tensor, List[np.ndarray]], |
| normalize: bool = True, |
| mask: Optional[torch.Tensor] = None, |
| apply_motion_norm: bool = True |
| ) -> torch.Tensor: |
| """Encode motion sequences to embeddings. |
| |
| Args: |
| motion: Motion input as numpy array, torch tensor, or list of arrays. |
| Shape: (T, 272) for single motion or (B, T, 272) for batch. |
| normalize: Whether to L2-normalize the output embeddings. |
| mask: Optional boolean mask for padded sequences. Shape: (B, T). |
| apply_motion_norm: Whether to apply mean/std normalization to input. |
| Set to False if input is already normalized. |
| |
| Returns: |
| Motion embeddings of shape (B, embed_dim) or (embed_dim,) for single input. |
| """ |
| device = next(self.parameters()).device |
| |
| if isinstance(motion, list): |
| max_len = max(m.shape[0] for m in motion) |
| batch = torch.zeros(len(motion), max_len, motion[0].shape[-1]) |
| mask = torch.zeros(len(motion), max_len, dtype=torch.bool) |
| for i, m in enumerate(motion): |
| if isinstance(m, np.ndarray): |
| m = torch.from_numpy(m) |
| batch[i, :m.shape[0]] = m |
| mask[i, :m.shape[0]] = True |
| motion = batch |
| mask = mask.to(device) |
| elif isinstance(motion, np.ndarray): |
| motion = torch.from_numpy(motion) |
| |
| if motion.dim() == 2: |
| motion = motion.unsqueeze(0) |
| |
| motion = motion.float().to(device) |
| |
| if apply_motion_norm: |
| motion = (motion - self.mean) / (self.std + 1e-8) |
| |
| motion_embeds = self.motion_encoder(motion, mask=mask) |
| |
| if normalize: |
| motion_embeds = F.normalize(motion_embeds, dim=-1) |
| return motion_embeds |
| |
| def compute_similarity( |
| self, |
| motion: Union[np.ndarray, torch.Tensor, List[np.ndarray]], |
| texts: List[str] |
| ) -> torch.Tensor: |
| motion_embeds = self.encode_motion(motion, normalize=True) |
| text_embeds = self.encode_text(texts, normalize=True) |
| logit_scale = self.logit_scale.exp() |
| similarity = logit_scale * motion_embeds @ text_embeds.T |
| return similarity |
| |
| def forward( |
| self, |
| motion: torch.Tensor, |
| texts: List[str], |
| motion_mask: Optional[torch.Tensor] = None |
| ) -> dict: |
| motion_embeds = self.encode_motion(motion, normalize=True, mask=motion_mask) |
| text_embeds = self.encode_text(texts, normalize=True) |
| logit_scale = self.logit_scale.exp() |
| logits_per_motion = logit_scale * motion_embeds @ text_embeds.T |
| logits_per_text = logits_per_motion.T |
| return {"logits_per_motion": logits_per_motion, "logits_per_text": logits_per_text} |
| |
| @classmethod |
| def from_pretrained(cls, path_or_repo: str, device: str = None, **kwargs): |
| if device is None: |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| |
| path = Path(path_or_repo) |
| if path.exists(): |
| config_file = path / "config.json" |
| weights_file = path / "pytorch_model.bin" |
| else: |
| if not HF_HUB_AVAILABLE: |
| raise ImportError("huggingface_hub required: pip install huggingface_hub") |
| config_file = hf_hub_download(path_or_repo, "config.json", **kwargs) |
| weights_file = hf_hub_download(path_or_repo, "pytorch_model.bin", **kwargs) |
| config_file = Path(config_file) |
| weights_file = Path(weights_file) |
| |
| with open(config_file, 'r') as f: |
| config = json.load(f) |
| |
| model = cls(config) |
| |
| print(f"Loading weights from: {weights_file.name}") |
| state_dict = torch.load(weights_file, map_location="cpu") |
| |
| missing, unexpected = model.load_state_dict(state_dict, strict=False) |
| if missing: |
| print(f"Missing keys: {len(missing)}") |
| if unexpected: |
| print(f"Unexpected keys: {len(unexpected)}") |
| |
| model = model.to(device) |
| model.eval() |
| |
| print(f"Loaded MotionCLIP (embed_dim={config.get('embed_dim', 512)}) on {device}") |
| return model |
| |
| def save_pretrained(self, save_dir: str): |
| save_dir = Path(save_dir) |
| save_dir.mkdir(parents=True, exist_ok=True) |
| |
| with open(save_dir / "config.json", 'w') as f: |
| json.dump(self.config, f, indent=2) |
| |
| torch.save(self.state_dict(), save_dir / "pytorch_model.bin") |
| print(f"Saved MotionCLIP to {save_dir}") |
|
|
|
|
| if __name__ == "__main__": |
| model = MotionCLIP() |
| print(f"MotionCLIP created with {sum(p.numel() for p in model.parameters()):,} parameters") |
| |
| dummy_motion = torch.randn(2, 64, 272) |
| motion_emb = model.encode_motion(dummy_motion) |
| print(f"Motion embedding shape: {motion_emb.shape}") |
|
|