""" MotionCLIP - Motion-Text CLIP Model Load and use the MotionCLIP model for motion-text retrieval and similarity computation. Usage: from motion_clip_hf import MotionCLIP # Load from HuggingFace Hub model = MotionCLIP.from_pretrained("khania/motion-clip") # Encode text and motion text_emb = model.encode_text(["a person walks forward"]) motion_emb = model.encode_motion(motion_array) # (T, 272) numpy array # Compute similarity similarity = model.compute_similarity(motion_array, ["walking", "running"]) """ import os import json import math import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from typing import List, Optional, Union from pathlib import Path try: from transformers import CLIPTextModel, CLIPTokenizer TRANSFORMERS_AVAILABLE = True except ImportError: TRANSFORMERS_AVAILABLE = False try: from huggingface_hub import hf_hub_download HF_HUB_AVAILABLE = True except ImportError: HF_HUB_AVAILABLE = False def sinusoidal_positional_encoding(seq_len: int, dim: int, device: torch.device) -> torch.Tensor: """Generate sinusoidal positional encoding (matches original training code).""" pe = torch.zeros(seq_len, dim, device=device) position = torch.arange(0, seq_len, dtype=torch.float32, device=device).unsqueeze(1) div_term = torch.exp( torch.arange(0, dim, 2, dtype=torch.float32, device=device) * (-math.log(10000.0) / dim) ) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) return pe class MotionTransformerEncoder(nn.Module): """Transformer encoder for motion sequences. Architecture matches original training code exactly: - Sinusoidal positional encoding (not learnable) - Masked mean pooling (no cls token) - Simple Linear output projection - Pre-LayerNorm architecture (norm_first=True to match _SDPATransformerEncoderLayer) """ def __init__( self, input_dim: int = 272, hidden_dim: int = 768, embed_dim: int = 512, num_heads: int = 12, num_layers: int = 8, max_seq_len: int = 1024, dropout: float = 0.1 ): super().__init__() self.input_dim = input_dim self.hidden_dim = hidden_dim self.embed_dim = embed_dim self.max_seq_len = max_seq_len self.input_proj = nn.Linear(input_dim, hidden_dim) encoder_layer = nn.TransformerEncoderLayer( d_model=hidden_dim, nhead=num_heads, dim_feedforward=hidden_dim * 4, dropout=dropout, activation='gelu', batch_first=True, norm_first=True # Pre-LayerNorm to match _SDPATransformerEncoderLayer ) self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) self.output_proj = nn.Linear(hidden_dim, embed_dim) def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor: B, T, _ = x.shape x = self.input_proj(x) pe = sinusoidal_positional_encoding(T, self.hidden_dim, x.device) x = x + pe.unsqueeze(0) if mask is not None: key_padding_mask = ~mask else: key_padding_mask = None x = self.transformer(x, src_key_padding_mask=key_padding_mask) if mask is not None: mask_expanded = mask.unsqueeze(-1).float() x = (x * mask_expanded).sum(dim=1) / mask_expanded.sum(dim=1).clamp(min=1e-6) else: x = x.mean(dim=1) output = self.output_proj(x) return output class TextEncoderCLIP(nn.Module): """HuggingFace CLIP text encoder - matches original training code.""" def __init__(self, model_name: str = "openai/clip-vit-base-patch32", output_dim: int = 512): super().__init__() if not TRANSFORMERS_AVAILABLE: raise ImportError("transformers required: pip install transformers") self.tokenizer = CLIPTokenizer.from_pretrained(model_name) self.model = CLIPTextModel.from_pretrained(model_name) self.hidden_size = self.model.config.hidden_size self.output_dim = output_dim if self.hidden_size != output_dim: self.proj = nn.Linear(self.hidden_size, output_dim) else: self.proj = nn.Identity() def forward(self, texts: List[str], device: torch.device) -> torch.Tensor: inputs = self.tokenizer( texts, padding=True, truncation=True, max_length=self.tokenizer.model_max_length, return_tensors="pt" ) inputs = {k: v.to(device) for k, v in inputs.items()} out = self.model(**inputs) if hasattr(out, "pooler_output") and out.pooler_output is not None: feat = out.pooler_output else: feat = out.last_hidden_state[:, 0] return self.proj(feat) class MotionCLIP(nn.Module): """Motion-Text CLIP Model with fine-tuned text encoder.""" DEFAULT_CONFIG = { "motion_input_dim": 272, "motion_hidden_dim": 768, "embed_dim": 512, "motion_num_heads": 12, "motion_num_layers": 8, "motion_max_seq_len": 1024, "motion_dropout": 0.1, "text_encoder_name": "openai/clip-vit-base-patch32" } def __init__(self, config: dict = None): super().__init__() self.config = {**self.DEFAULT_CONFIG, **(config or {})} self.motion_encoder = MotionTransformerEncoder( input_dim=self.config["motion_input_dim"], hidden_dim=self.config["motion_hidden_dim"], embed_dim=self.config["embed_dim"], num_heads=self.config["motion_num_heads"], num_layers=self.config["motion_num_layers"], max_seq_len=self.config["motion_max_seq_len"], dropout=self.config["motion_dropout"] ) self.text_encoder = TextEncoderCLIP( model_name=self.config["text_encoder_name"], output_dim=self.config["embed_dim"] ) self.logit_scale = nn.Parameter(torch.ones([]) * np.log(1 / 0.07)) self.register_buffer("mean", torch.zeros(self.config["motion_input_dim"])) self.register_buffer("std", torch.ones(self.config["motion_input_dim"])) def encode_text(self, texts: List[str], normalize: bool = True) -> torch.Tensor: device = next(self.parameters()).device text_embeds = self.text_encoder(texts, device) if normalize: text_embeds = F.normalize(text_embeds, dim=-1) return text_embeds def encode_motion( self, motion: Union[np.ndarray, torch.Tensor, List[np.ndarray]], normalize: bool = True, mask: Optional[torch.Tensor] = None, apply_motion_norm: bool = True ) -> torch.Tensor: """Encode motion sequences to embeddings. Args: motion: Motion input as numpy array, torch tensor, or list of arrays. Shape: (T, 272) for single motion or (B, T, 272) for batch. normalize: Whether to L2-normalize the output embeddings. mask: Optional boolean mask for padded sequences. Shape: (B, T). apply_motion_norm: Whether to apply mean/std normalization to input. Set to False if input is already normalized. Returns: Motion embeddings of shape (B, embed_dim) or (embed_dim,) for single input. """ device = next(self.parameters()).device if isinstance(motion, list): max_len = max(m.shape[0] for m in motion) batch = torch.zeros(len(motion), max_len, motion[0].shape[-1]) mask = torch.zeros(len(motion), max_len, dtype=torch.bool) for i, m in enumerate(motion): if isinstance(m, np.ndarray): m = torch.from_numpy(m) batch[i, :m.shape[0]] = m mask[i, :m.shape[0]] = True motion = batch mask = mask.to(device) elif isinstance(motion, np.ndarray): motion = torch.from_numpy(motion) if motion.dim() == 2: motion = motion.unsqueeze(0) motion = motion.float().to(device) if apply_motion_norm: motion = (motion - self.mean) / (self.std + 1e-8) motion_embeds = self.motion_encoder(motion, mask=mask) if normalize: motion_embeds = F.normalize(motion_embeds, dim=-1) return motion_embeds def compute_similarity( self, motion: Union[np.ndarray, torch.Tensor, List[np.ndarray]], texts: List[str] ) -> torch.Tensor: motion_embeds = self.encode_motion(motion, normalize=True) text_embeds = self.encode_text(texts, normalize=True) logit_scale = self.logit_scale.exp() similarity = logit_scale * motion_embeds @ text_embeds.T return similarity def forward( self, motion: torch.Tensor, texts: List[str], motion_mask: Optional[torch.Tensor] = None ) -> dict: motion_embeds = self.encode_motion(motion, normalize=True, mask=motion_mask) text_embeds = self.encode_text(texts, normalize=True) logit_scale = self.logit_scale.exp() logits_per_motion = logit_scale * motion_embeds @ text_embeds.T logits_per_text = logits_per_motion.T return {"logits_per_motion": logits_per_motion, "logits_per_text": logits_per_text} @classmethod def from_pretrained(cls, path_or_repo: str, device: str = None, **kwargs): if device is None: device = "cuda" if torch.cuda.is_available() else "cpu" path = Path(path_or_repo) if path.exists(): config_file = path / "config.json" weights_file = path / "pytorch_model.bin" else: if not HF_HUB_AVAILABLE: raise ImportError("huggingface_hub required: pip install huggingface_hub") config_file = hf_hub_download(path_or_repo, "config.json", **kwargs) weights_file = hf_hub_download(path_or_repo, "pytorch_model.bin", **kwargs) config_file = Path(config_file) weights_file = Path(weights_file) with open(config_file, 'r') as f: config = json.load(f) model = cls(config) print(f"Loading weights from: {weights_file.name}") state_dict = torch.load(weights_file, map_location="cpu") missing, unexpected = model.load_state_dict(state_dict, strict=False) if missing: print(f"Missing keys: {len(missing)}") if unexpected: print(f"Unexpected keys: {len(unexpected)}") model = model.to(device) model.eval() print(f"Loaded MotionCLIP (embed_dim={config.get('embed_dim', 512)}) on {device}") return model def save_pretrained(self, save_dir: str): save_dir = Path(save_dir) save_dir.mkdir(parents=True, exist_ok=True) with open(save_dir / "config.json", 'w') as f: json.dump(self.config, f, indent=2) torch.save(self.state_dict(), save_dir / "pytorch_model.bin") print(f"Saved MotionCLIP to {save_dir}") if __name__ == "__main__": model = MotionCLIP() print(f"MotionCLIP created with {sum(p.numel() for p in model.parameters()):,} parameters") dummy_motion = torch.randn(2, 64, 272) motion_emb = model.encode_motion(dummy_motion) print(f"Motion embedding shape: {motion_emb.shape}")