| """ |
| LDF Model for Hugging Face Hub |
| |
| Usage: |
| from transformers import AutoModel |
| |
| model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True) |
| motion = model("a person walking forward", length=60) |
| """ |
|
|
| import torch |
| from transformers import PretrainedConfig, PreTrainedModel |
| from typing import Union, List, Optional |
| import os |
| import sys |
|
|
|
|
| class LDFConfig(PretrainedConfig): |
| """Configuration for LDF Motion Generation Model""" |
| model_type = "ldf_motion" |
| |
| def __init__( |
| self, |
| input_dim=4, |
| output_dim=263, |
| **kwargs |
| ): |
| super().__init__(**kwargs) |
| self.input_dim = input_dim |
| self.output_dim = output_dim |
|
|
|
|
| class LDFModel(PreTrainedModel): |
| """ |
| LDF Motion Generation Model |
| |
| This model generates motion sequences from text descriptions using Latent Diffusion Forcing. |
| |
| Example: |
| >>> from transformers import AutoModel |
| >>> model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True) |
| >>> motion = model("a person walking forward", length=60) |
| >>> print(motion.shape) # (~240, 263) |
| """ |
| |
| config_class = LDFConfig |
| |
| def __init__(self, config): |
| super().__init__(config) |
| self.config = config |
| |
| |
| self.ldf_model = None |
| self.vae = None |
| self.model_dir = None |
| |
| def _load_models(self): |
| """Load the actual LDF and VAE models""" |
| if self.ldf_model is not None: |
| return |
| |
| |
| if hasattr(self, 'name_or_path') and os.path.exists(self.name_or_path): |
| model_dir = self.name_or_path |
| else: |
| raise RuntimeError( |
| "Model directory not found. Please use from_pretrained() to load the model." |
| ) |
| |
| |
| self.model_dir = model_dir |
| |
| |
| if model_dir not in sys.path: |
| sys.path.insert(0, model_dir) |
| |
| |
| import importlib |
| generate_ldf = importlib.import_module('generate_ldf') |
| load_model_from_config = generate_ldf.load_model_from_config |
| |
| config_path = os.path.join(model_dir, "ldf.yaml") |
| old_argv = sys.argv |
| sys.argv = ['model', '--config', config_path] |
| |
| try: |
| self.vae, self.ldf_model = load_model_from_config() |
| |
| |
| device = next(self.parameters()).device if list(self.parameters()) else torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| self.ldf_model = self.ldf_model.to(device) |
| self.vae = self.vae.to(device) |
| finally: |
| sys.argv = old_argv |
| |
| @classmethod |
| def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): |
| """ |
| Load pretrained model |
| |
| Args: |
| pretrained_model_name_or_path: Model name or path |
| trust_remote_code: Must be True to load this custom model |
| **kwargs: Additional arguments |
| |
| Returns: |
| LDFModel instance |
| """ |
| |
| if not kwargs.get('trust_remote_code', False): |
| raise ValueError( |
| "Loading this model requires trust_remote_code=True. " |
| "Usage: AutoModel.from_pretrained(..., trust_remote_code=True)" |
| ) |
| |
| |
| if not os.path.exists(pretrained_model_name_or_path): |
| from huggingface_hub import snapshot_download |
| model_path = snapshot_download(repo_id=pretrained_model_name_or_path) |
| else: |
| model_path = pretrained_model_name_or_path |
| |
| |
| config = LDFConfig.from_pretrained(model_path) |
| |
| |
| model = cls(config) |
| model.name_or_path = model_path |
| |
| |
| model._load_models() |
| |
| return model |
| |
| def forward( |
| self, |
| text: Union[str, List[str], List[List[str]]], |
| length: Union[int, List[int]] = 60, |
| text_end: Optional[Union[List[int], List[List[int]]]] = None, |
| num_denoise_steps: Optional[int] = None, |
| **kwargs |
| ): |
| """ |
| Generate motion from text |
| |
| Args: |
| text: Text description(s) |
| length: Number of latent tokens (output frames ≈ length × 4) |
| text_end: Transition points for multi-text |
| num_denoise_steps: Number of denoising steps |
| |
| Returns: |
| Generated motion sequence(s) |
| """ |
| return self.__call__(text, length, text_end, num_denoise_steps) |
| |
| @torch.no_grad() |
| def __call__( |
| self, |
| text: Union[str, List[str], List[List[str]]], |
| length: Union[int, List[int]] = 60, |
| text_end: Optional[Union[List[int], List[List[int]]]] = None, |
| num_denoise_steps: Optional[int] = None, |
| output_joints: bool = False, |
| smoothing_alpha: float = 1.0 |
| ): |
| """ |
| Generate motion sequences |
| |
| Args: |
| text: Text description |
| - Single string: "walk" -> single sample |
| - String list: ["walk", "run"] -> batch |
| - Nested list: [["walk", "turn"], ["run", "jump"]] -> multi-text per sample |
| length: Number of latent tokens (frames ≈ length × 4) |
| text_end: Token positions for text switching |
| num_denoise_steps: Number of denoising steps |
| output_joints: If True, output 22×3 joint coordinates; if False (default), output 263-dim HumanML3D features |
| smoothing_alpha: EMA smoothing factor for joint positions (0.0-1.0, default=1.0 no smoothing) |
| - Only used when output_joints=True |
| - Recommended: 0.5 for smoother animations |
| |
| Returns: |
| numpy.ndarray or list of arrays |
| - If output_joints=False: shape (frames, 263) |
| - If output_joints=True: shape (frames, 22, 3) |
| """ |
| |
| self._load_models() |
| |
| |
| is_single = not isinstance(length, list) |
| if is_single: |
| text_batch = [text] |
| length_batch = [length] |
| text_end_batch = [text_end] if text_end is not None else None |
| else: |
| text_batch = text |
| length_batch = length |
| text_end_batch = text_end |
| |
| |
| if text_end_batch is not None: |
| for i, (txt, te) in enumerate(zip(text_batch, text_end_batch)): |
| if isinstance(txt, list) and te is not None: |
| if len(txt) != len(te): |
| raise ValueError( |
| f"Batch {i}: text has {len(txt)} segments but text_end has {len(te)} endpoints. " |
| f"They must match! text={txt}, text_end={te}" |
| ) |
| |
| batch_size = len(text_batch) |
| |
| |
| x = {"feature_length": torch.tensor(length_batch), "text": text_batch} |
| if text_end_batch is not None: |
| x["feature_text_end"] = text_end_batch |
| |
| |
| output = self.ldf_model.generate(x, num_denoise_steps=num_denoise_steps) |
| generated_batch = output["generated"] |
| |
| |
| decoded_results = [] |
| joints_results = [] if output_joints else None |
| |
| |
| if output_joints: |
| import importlib.util |
| import numpy as np |
| utils_spec = importlib.util.spec_from_file_location( |
| "motion_process", |
| os.path.join(self.model_dir, "ldf_utils", "motion_process.py") |
| ) |
| motion_process_module = importlib.util.module_from_spec(utils_spec) |
| utils_spec.loader.exec_module(motion_process_module) |
| |
| for i, generated in enumerate(generated_batch): |
| if generated is not None and torch.is_tensor(generated): |
| |
| decoded_g = self.vae.decode(generated[None, :])[0] |
| |
| if output_joints: |
| |
| |
| decoded_np = decoded_g.cpu().numpy() |
| recovery = motion_process_module.StreamJointRecovery263( |
| joints_num=22, smoothing_alpha=smoothing_alpha |
| ) |
| joints = [recovery.process_frame(frame) for frame in decoded_np] |
| joints = np.array(joints) |
| joints_results.append(joints) |
| else: |
| decoded_results.append(decoded_g.cpu().numpy()) |
| else: |
| if output_joints: |
| joints_results.append(None) |
| else: |
| decoded_results.append(None) |
| |
| |
| if output_joints: |
| return joints_results[0] if is_single else joints_results |
| else: |
| return decoded_results[0] if is_single else decoded_results |
| |
| def generate(self, *args, **kwargs): |
| """Alias for __call__ to match transformers API""" |
| return self.__call__(*args, **kwargs) |
|
|
|
|
| |
| LDFPipeline = LDFModel |
|
|
|
|
| |
| try: |
| from transformers import AutoModel, AutoConfig |
| AutoConfig.register("ldf_motion", LDFConfig) |
| AutoModel.register(LDFConfig, LDFModel) |
| except: |
| pass |
|
|