motion-clip / motion_clip_hf.py
khania's picture
MotionCLIP model update
873cf13 verified
"""
MotionCLIP - Motion-Text CLIP Model
Load and use the MotionCLIP model for motion-text retrieval and similarity computation.
Usage:
from motion_clip_hf import MotionCLIP
# Load from HuggingFace Hub
model = MotionCLIP.from_pretrained("khania/motion-clip")
# Encode text and motion
text_emb = model.encode_text(["a person walks forward"])
motion_emb = model.encode_motion(motion_array) # (T, 272) numpy array
# Compute similarity
similarity = model.compute_similarity(motion_array, ["walking", "running"])
"""
import os
import json
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Optional, Union
from pathlib import Path
try:
from transformers import CLIPTextModel, CLIPTokenizer
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
try:
from huggingface_hub import hf_hub_download
HF_HUB_AVAILABLE = True
except ImportError:
HF_HUB_AVAILABLE = False
def sinusoidal_positional_encoding(seq_len: int, dim: int, device: torch.device) -> torch.Tensor:
"""Generate sinusoidal positional encoding (matches original training code)."""
pe = torch.zeros(seq_len, dim, device=device)
position = torch.arange(0, seq_len, dtype=torch.float32, device=device).unsqueeze(1)
div_term = torch.exp(
torch.arange(0, dim, 2, dtype=torch.float32, device=device) * (-math.log(10000.0) / dim)
)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
return pe
class MotionTransformerEncoder(nn.Module):
"""Transformer encoder for motion sequences.
Architecture matches original training code exactly:
- Sinusoidal positional encoding (not learnable)
- Masked mean pooling (no cls token)
- Simple Linear output projection
- Pre-LayerNorm architecture (norm_first=True to match _SDPATransformerEncoderLayer)
"""
def __init__(
self,
input_dim: int = 272,
hidden_dim: int = 768,
embed_dim: int = 512,
num_heads: int = 12,
num_layers: int = 8,
max_seq_len: int = 1024,
dropout: float = 0.1
):
super().__init__()
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.embed_dim = embed_dim
self.max_seq_len = max_seq_len
self.input_proj = nn.Linear(input_dim, hidden_dim)
encoder_layer = nn.TransformerEncoderLayer(
d_model=hidden_dim,
nhead=num_heads,
dim_feedforward=hidden_dim * 4,
dropout=dropout,
activation='gelu',
batch_first=True,
norm_first=True # Pre-LayerNorm to match _SDPATransformerEncoderLayer
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
self.output_proj = nn.Linear(hidden_dim, embed_dim)
def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor:
B, T, _ = x.shape
x = self.input_proj(x)
pe = sinusoidal_positional_encoding(T, self.hidden_dim, x.device)
x = x + pe.unsqueeze(0)
if mask is not None:
key_padding_mask = ~mask
else:
key_padding_mask = None
x = self.transformer(x, src_key_padding_mask=key_padding_mask)
if mask is not None:
mask_expanded = mask.unsqueeze(-1).float()
x = (x * mask_expanded).sum(dim=1) / mask_expanded.sum(dim=1).clamp(min=1e-6)
else:
x = x.mean(dim=1)
output = self.output_proj(x)
return output
class TextEncoderCLIP(nn.Module):
"""HuggingFace CLIP text encoder - matches original training code."""
def __init__(self, model_name: str = "openai/clip-vit-base-patch32", output_dim: int = 512):
super().__init__()
if not TRANSFORMERS_AVAILABLE:
raise ImportError("transformers required: pip install transformers")
self.tokenizer = CLIPTokenizer.from_pretrained(model_name)
self.model = CLIPTextModel.from_pretrained(model_name)
self.hidden_size = self.model.config.hidden_size
self.output_dim = output_dim
if self.hidden_size != output_dim:
self.proj = nn.Linear(self.hidden_size, output_dim)
else:
self.proj = nn.Identity()
def forward(self, texts: List[str], device: torch.device) -> torch.Tensor:
inputs = self.tokenizer(
texts,
padding=True,
truncation=True,
max_length=self.tokenizer.model_max_length,
return_tensors="pt"
)
inputs = {k: v.to(device) for k, v in inputs.items()}
out = self.model(**inputs)
if hasattr(out, "pooler_output") and out.pooler_output is not None:
feat = out.pooler_output
else:
feat = out.last_hidden_state[:, 0]
return self.proj(feat)
class MotionCLIP(nn.Module):
"""Motion-Text CLIP Model with fine-tuned text encoder."""
DEFAULT_CONFIG = {
"motion_input_dim": 272,
"motion_hidden_dim": 768,
"embed_dim": 512,
"motion_num_heads": 12,
"motion_num_layers": 8,
"motion_max_seq_len": 1024,
"motion_dropout": 0.1,
"text_encoder_name": "openai/clip-vit-base-patch32"
}
def __init__(self, config: dict = None):
super().__init__()
self.config = {**self.DEFAULT_CONFIG, **(config or {})}
self.motion_encoder = MotionTransformerEncoder(
input_dim=self.config["motion_input_dim"],
hidden_dim=self.config["motion_hidden_dim"],
embed_dim=self.config["embed_dim"],
num_heads=self.config["motion_num_heads"],
num_layers=self.config["motion_num_layers"],
max_seq_len=self.config["motion_max_seq_len"],
dropout=self.config["motion_dropout"]
)
self.text_encoder = TextEncoderCLIP(
model_name=self.config["text_encoder_name"],
output_dim=self.config["embed_dim"]
)
self.logit_scale = nn.Parameter(torch.ones([]) * np.log(1 / 0.07))
self.register_buffer("mean", torch.zeros(self.config["motion_input_dim"]))
self.register_buffer("std", torch.ones(self.config["motion_input_dim"]))
def encode_text(self, texts: List[str], normalize: bool = True) -> torch.Tensor:
device = next(self.parameters()).device
text_embeds = self.text_encoder(texts, device)
if normalize:
text_embeds = F.normalize(text_embeds, dim=-1)
return text_embeds
def encode_motion(
self,
motion: Union[np.ndarray, torch.Tensor, List[np.ndarray]],
normalize: bool = True,
mask: Optional[torch.Tensor] = None,
apply_motion_norm: bool = True
) -> torch.Tensor:
"""Encode motion sequences to embeddings.
Args:
motion: Motion input as numpy array, torch tensor, or list of arrays.
Shape: (T, 272) for single motion or (B, T, 272) for batch.
normalize: Whether to L2-normalize the output embeddings.
mask: Optional boolean mask for padded sequences. Shape: (B, T).
apply_motion_norm: Whether to apply mean/std normalization to input.
Set to False if input is already normalized.
Returns:
Motion embeddings of shape (B, embed_dim) or (embed_dim,) for single input.
"""
device = next(self.parameters()).device
if isinstance(motion, list):
max_len = max(m.shape[0] for m in motion)
batch = torch.zeros(len(motion), max_len, motion[0].shape[-1])
mask = torch.zeros(len(motion), max_len, dtype=torch.bool)
for i, m in enumerate(motion):
if isinstance(m, np.ndarray):
m = torch.from_numpy(m)
batch[i, :m.shape[0]] = m
mask[i, :m.shape[0]] = True
motion = batch
mask = mask.to(device)
elif isinstance(motion, np.ndarray):
motion = torch.from_numpy(motion)
if motion.dim() == 2:
motion = motion.unsqueeze(0)
motion = motion.float().to(device)
if apply_motion_norm:
motion = (motion - self.mean) / (self.std + 1e-8)
motion_embeds = self.motion_encoder(motion, mask=mask)
if normalize:
motion_embeds = F.normalize(motion_embeds, dim=-1)
return motion_embeds
def compute_similarity(
self,
motion: Union[np.ndarray, torch.Tensor, List[np.ndarray]],
texts: List[str]
) -> torch.Tensor:
motion_embeds = self.encode_motion(motion, normalize=True)
text_embeds = self.encode_text(texts, normalize=True)
logit_scale = self.logit_scale.exp()
similarity = logit_scale * motion_embeds @ text_embeds.T
return similarity
def forward(
self,
motion: torch.Tensor,
texts: List[str],
motion_mask: Optional[torch.Tensor] = None
) -> dict:
motion_embeds = self.encode_motion(motion, normalize=True, mask=motion_mask)
text_embeds = self.encode_text(texts, normalize=True)
logit_scale = self.logit_scale.exp()
logits_per_motion = logit_scale * motion_embeds @ text_embeds.T
logits_per_text = logits_per_motion.T
return {"logits_per_motion": logits_per_motion, "logits_per_text": logits_per_text}
@classmethod
def from_pretrained(cls, path_or_repo: str, device: str = None, **kwargs):
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
path = Path(path_or_repo)
if path.exists():
config_file = path / "config.json"
weights_file = path / "pytorch_model.bin"
else:
if not HF_HUB_AVAILABLE:
raise ImportError("huggingface_hub required: pip install huggingface_hub")
config_file = hf_hub_download(path_or_repo, "config.json", **kwargs)
weights_file = hf_hub_download(path_or_repo, "pytorch_model.bin", **kwargs)
config_file = Path(config_file)
weights_file = Path(weights_file)
with open(config_file, 'r') as f:
config = json.load(f)
model = cls(config)
print(f"Loading weights from: {weights_file.name}")
state_dict = torch.load(weights_file, map_location="cpu")
missing, unexpected = model.load_state_dict(state_dict, strict=False)
if missing:
print(f"Missing keys: {len(missing)}")
if unexpected:
print(f"Unexpected keys: {len(unexpected)}")
model = model.to(device)
model.eval()
print(f"Loaded MotionCLIP (embed_dim={config.get('embed_dim', 512)}) on {device}")
return model
def save_pretrained(self, save_dir: str):
save_dir = Path(save_dir)
save_dir.mkdir(parents=True, exist_ok=True)
with open(save_dir / "config.json", 'w') as f:
json.dump(self.config, f, indent=2)
torch.save(self.state_dict(), save_dir / "pytorch_model.bin")
print(f"Saved MotionCLIP to {save_dir}")
if __name__ == "__main__":
model = MotionCLIP()
print(f"MotionCLIP created with {sum(p.numel() for p in model.parameters()):,} parameters")
dummy_motion = torch.randn(2, 64, 272)
motion_emb = model.encode_motion(dummy_motion)
print(f"Motion embedding shape: {motion_emb.shape}")