Spaces:
Running on Zero
Running on Zero
| # SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
| # SPDX-License-Identifier: Apache-2.0 | |
| import argparse | |
| import os | |
| from typing import Any, Dict, Optional | |
| import torch | |
| from kimodo import DEFAULT_MODEL, load_model | |
| from kimodo.constraints import load_constraints_lst | |
| from kimodo.exports.motion_io import save_kimodo_npz | |
| from kimodo.meta import load_prompts_from_meta | |
| from kimodo.model.cfg import CFG_TYPES | |
| from kimodo.model.registry import get_model_info | |
| from kimodo.runtime import runtime_health_report | |
| from kimodo.tools import load_json, seed_everything | |
| def parse_args(): | |
| parser = argparse.ArgumentParser(description="Cmd line API for generation motions with kimodo") | |
| parser.add_argument( | |
| "prompt", | |
| nargs="?", | |
| type=str, | |
| default=None, | |
| help="Text prompt describing the motion to generate, or several prompts separated by periods.", | |
| ) | |
| parser.add_argument( | |
| "--model", | |
| type=str, | |
| default=DEFAULT_MODEL, | |
| help="Name of the model (e.g. Kimodo-SOMA-RP-v1, etc).", | |
| ) | |
| parser.add_argument( | |
| "--duration", | |
| type=str, | |
| default="5.0", | |
| help="Duration in seconds (default: 5.0). Separate by spaces in a string for different durations per prompts", | |
| ) | |
| parser.add_argument( | |
| "--num_samples", | |
| type=int, | |
| default=1, | |
| help="Number of samples to generate (default: 1)", | |
| ) | |
| parser.add_argument( | |
| "--diffusion_steps", | |
| type=int, | |
| default=100, | |
| help="Number of diffusion steps (default: 100)", | |
| ) | |
| parser.add_argument( | |
| "--num_transition_frames", | |
| type=int, | |
| default=5, | |
| help="Number of frames to help transitioning (default: 5)", | |
| ) | |
| parser.add_argument( | |
| "--constraints", | |
| type=str, | |
| default=None, | |
| help="Saved constraint list", | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| type=str, | |
| default="output", | |
| help="Output stem name: with one sample writes a single file per format (e.g. test.npz, test.csv); with multiple samples creates a folder and writes test_00.npz, test_01.npz, ... inside it. Used for NPZ, AMASS NPZ, CSV, and BVH.", | |
| ) | |
| parser.add_argument( | |
| "--bvh", | |
| action="store_true", | |
| help="Also export BVH (SOMA models only); uses the same stem as --output.", | |
| ) | |
| parser.add_argument( | |
| "--no-postprocess", | |
| action="store_true", | |
| help="Don't apply motion post-processing to reduce foot skating (ignored for G1)", | |
| ) | |
| parser.add_argument( | |
| "--seed", | |
| type=int, | |
| default=None, | |
| help="Seed for reproducible results", | |
| ) | |
| parser.add_argument( | |
| "--input_folder", | |
| type=str, | |
| default=None, | |
| help="Folder containing meta.json and optional constraints.json. If set, generation settings are loaded from meta.json.", | |
| ) | |
| parser.add_argument( | |
| "--cfg_type", | |
| type=str, | |
| default=argparse.SUPPRESS, | |
| choices=CFG_TYPES, | |
| help=( | |
| "Classifier-free guidance mode: nocfg (no CFG), regular (single scale on cond vs uncond), " | |
| "or separated (custom: separate text and constraint scales). " | |
| "Use with --cfg_weight as required by the mode." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--cfg_weight", | |
| type=float, | |
| nargs="*", | |
| default=argparse.SUPPRESS, | |
| help=( | |
| "CFG scale(s): one float for regular, or two floats [text_weight, constraint_weight] for separated. " | |
| "Omit with --cfg_type nocfg. If omitted, two floats alone imply separated; one float alone implies regular." | |
| ), | |
| ) | |
| return parser.parse_args() | |
| def get_texts_and_num_frames_from_prompt(prompt: str, duration: str, fps: float): | |
| # Get the texts | |
| texts = [text.strip() for text in prompt.split(".")] | |
| texts = [text + "." for text in texts if text] | |
| nb_prompts = len(texts) | |
| # Get the durations | |
| if " " not in duration: | |
| duration_sec = float(duration) | |
| # same for all the prompts | |
| num_frames = [int(duration_sec * fps)] * nb_prompts | |
| else: | |
| durations = duration.split(" ") | |
| assert len(durations) == len(texts), "The number of durations should match the number of prompts" | |
| num_frames = [int(float(duration.strip()) * fps) for duration in durations] | |
| assert len(num_frames) == nb_prompts, "The number of durations should be 1 or match the number of texts" | |
| return texts, num_frames | |
| def _single_file_path(path: str, ext: str) -> str: | |
| """Return path for a single output file (no folder). | |
| Adds ext if missing; creates parent dirs if any. | |
| """ | |
| if not path.endswith(ext): | |
| path = path.rstrip(os.sep) + ext | |
| parent = os.path.dirname(path) | |
| if parent: | |
| os.makedirs(parent, exist_ok=True) | |
| return path | |
| def _output_dir_and_path(path: str, default_base: str, ext: str): | |
| """Create output folder from path and return (dir_path, path_for_file_with_suffix, base_name). | |
| If path has an extension, folder name is the path stem; else the path is the folder name. | |
| base_name is the folder basename for _00, _01, ... when n_samples > 1. | |
| """ | |
| folder = os.path.splitext(path)[0] if os.path.splitext(path)[1] else path | |
| os.makedirs(folder, exist_ok=True) | |
| base_name = os.path.basename(folder.rstrip(os.sep)) | |
| return folder, os.path.join(folder, default_base + ext), base_name | |
| def resolve_cfg_kwargs(args: argparse.Namespace, meta: Optional[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Resolve cfg_type / cfg_weight for model(...). | |
| Precedence: explicit CLI (--cfg_type / --cfg_weight) overrides meta.json ``cfg``; | |
| if neither applies, returns {} so the model uses its own defaults. | |
| """ | |
| ns = vars(args) | |
| has_type = "cfg_type" in ns | |
| has_wflag = "cfg_weight" in ns | |
| cli_type = ns.get("cfg_type") | |
| cli_w = ns.get("cfg_weight") | |
| if has_wflag: | |
| if cli_w is None or len(cli_w) == 0: | |
| raise ValueError("--cfg_weight requires one float (regular) or two floats (separated).") | |
| if has_type and cli_type == "nocfg": | |
| if has_wflag: | |
| raise ValueError("--cfg_weight is not used with --cfg_type nocfg.") | |
| return {"cfg_type": "nocfg"} | |
| if has_type or has_wflag: | |
| if has_type: | |
| eff_type = cli_type | |
| if has_wflag: | |
| if eff_type == "regular" and len(cli_w) != 1: | |
| raise ValueError("--cfg_type regular requires exactly one --cfg_weight value.") | |
| if eff_type == "separated" and len(cli_w) != 2: | |
| raise ValueError("--cfg_type separated requires exactly two --cfg_weight values.") | |
| else: | |
| if eff_type == "regular": | |
| raise ValueError("--cfg_type regular requires --cfg_weight with one float.") | |
| if eff_type == "separated": | |
| raise ValueError("--cfg_type separated requires --cfg_weight with two floats.") | |
| else: | |
| if len(cli_w) == 1: | |
| eff_type = "regular" | |
| elif len(cli_w) == 2: | |
| eff_type = "separated" | |
| else: | |
| raise ValueError("--cfg_weight expects 1 float (regular) or 2 floats (separated).") | |
| if eff_type == "regular": | |
| return {"cfg_type": "regular", "cfg_weight": float(cli_w[0])} | |
| return {"cfg_type": "separated", "cfg_weight": [float(cli_w[0]), float(cli_w[1])]} | |
| if meta and isinstance(meta.get("cfg"), dict): | |
| cfg = meta["cfg"] | |
| enabled = cfg.get("enabled", True) | |
| if not enabled: | |
| return {"cfg_type": "nocfg"} | |
| return { | |
| "cfg_type": "separated", | |
| "cfg_weight": [ | |
| float(cfg.get("text_weight", 2.0)), | |
| float(cfg.get("constraint_weight", 2.0)), | |
| ], | |
| } | |
| return {} | |
| def get_generation_inputs(args, fps: float): | |
| """Get texts/num_frames and parameter overrides from either CLI or input_folder.""" | |
| if args.input_folder is None: | |
| if not args.prompt: | |
| raise ValueError("Either provide 'prompt' or '--input_folder'.") | |
| texts, num_frames = get_texts_and_num_frames_from_prompt(args.prompt, args.duration, fps) | |
| return { | |
| "texts": texts, | |
| "num_frames": num_frames, | |
| "num_samples": args.num_samples, | |
| "diffusion_steps": args.diffusion_steps, | |
| "seed": args.seed, | |
| "constraints_path": args.constraints, | |
| "meta": None, | |
| } | |
| meta_path = os.path.join(args.input_folder, "meta.json") | |
| meta = load_json(meta_path) | |
| texts, durations_sec = load_prompts_from_meta(meta_path) | |
| num_frames = [int(float(duration) * fps) for duration in durations_sec] | |
| constraints_path = args.constraints | |
| default_constraints_path = os.path.join(args.input_folder, "constraints.json") | |
| if constraints_path is None and os.path.exists(default_constraints_path): | |
| constraints_path = default_constraints_path | |
| return { | |
| "texts": texts, | |
| "num_frames": num_frames, | |
| "num_samples": meta.get("num_samples", args.num_samples), | |
| "diffusion_steps": meta.get("diffusion_steps", args.diffusion_steps), | |
| "seed": meta.get("seed", args.seed), | |
| "constraints_path": constraints_path, | |
| "meta": meta, | |
| } | |
| def main(): | |
| requested_device = os.environ.get("KIMODO_DEVICE") | |
| report = runtime_health_report(requested_device) | |
| device = report.selected_device | |
| print( | |
| "Runtime health: " | |
| f"requested={report.requested_device} " | |
| f"selected={report.selected_device} " | |
| f"backend={report.backend} " | |
| f"reason={report.reason}" | |
| ) | |
| args = parse_args() | |
| # Load model (resolution of name done inside load_model) | |
| model, resolved_model = load_model( | |
| args.model, | |
| device=device, | |
| default_family="Kimodo", | |
| return_resolved_name=True, | |
| ) | |
| info = get_model_info(resolved_model) | |
| display = info.display_name if info else resolved_model | |
| print(f"Loaded model: {display} ({resolved_model})") | |
| # Get generation inputs | |
| generation_inputs = get_generation_inputs(args, model.fps) | |
| texts = generation_inputs["texts"] | |
| num_frames = generation_inputs["num_frames"] | |
| print("Will generate motions with the following prompts") | |
| for text, num_frame in zip(texts, num_frames): | |
| print(f" '{text}' with {num_frame} frames") | |
| # Load constraints | |
| constraints_path = generation_inputs["constraints_path"] | |
| if constraints_path: | |
| constraint_lst = load_constraints_lst(constraints_path, model.skeleton) | |
| else: | |
| constraint_lst = [] | |
| if constraint_lst: | |
| print(f"Using {len(constraint_lst)} set of constraints") | |
| for constraint in constraint_lst: | |
| print(f" {constraint}") | |
| if generation_inputs["seed"] is not None: | |
| seed_everything(generation_inputs["seed"]) | |
| cfg_kwargs = resolve_cfg_kwargs(args, generation_inputs.get("meta")) | |
| if cfg_kwargs: | |
| ct = cfg_kwargs.get("cfg_type") | |
| cw = cfg_kwargs.get("cfg_weight") | |
| if cw is not None: | |
| print(f"Using CFG: cfg_type={ct!r}, cfg_weight={cw!r}") | |
| else: | |
| print(f"Using CFG: cfg_type={ct!r}") | |
| # G1: postprocessing is disabled (does not work well for this model). | |
| use_postprocess = False if "g1" in resolved_model else (not args.no_postprocess) | |
| output = model( | |
| texts, | |
| num_frames, | |
| constraint_lst=constraint_lst, | |
| num_denoising_steps=generation_inputs["diffusion_steps"], | |
| num_samples=generation_inputs["num_samples"], | |
| multi_prompt=True, | |
| num_transition_frames=args.num_transition_frames, | |
| post_processing=use_postprocess, | |
| return_numpy=True, | |
| **cfg_kwargs, | |
| ) | |
| n_samples = int(output["posed_joints"].shape[0]) | |
| # Parse the output stem once; all formats (NPZ, AMASS NPZ, CSV, BVH) use this base name. | |
| output_base = args.output | |
| if n_samples == 1: | |
| npz_path = _single_file_path(output_base, ".npz") | |
| print(f"Saving the npz output to {npz_path}") | |
| single = { | |
| k: (v[0] if hasattr(v, "shape") and len(v.shape) > 0 and v.shape[0] == n_samples else v) | |
| for k, v in output.items() | |
| } | |
| save_kimodo_npz(npz_path, single) | |
| else: | |
| out_dir, _, base_name = _output_dir_and_path(output_base, "motion", ".npz") | |
| print(f"Saving the npz output to {out_dir}/ ({base_name}_00.npz ...)") | |
| for i in range(n_samples): | |
| single = { | |
| k: (v[i] if hasattr(v, "shape") and len(v.shape) > 0 and v.shape[0] == n_samples else v) | |
| for k, v in output.items() | |
| } | |
| save_kimodo_npz(os.path.join(out_dir, f"{base_name}_{i:02d}.npz"), single) | |
| if resolved_model == "kimodo-smplx-rp": | |
| from kimodo.exports.smplx import AMASSConverter | |
| converter = AMASSConverter(skeleton=model.skeleton, fps=model.fps) | |
| if n_samples == 1: | |
| # Use distinct name so AMASS NPZ does not overwrite the main NPZ | |
| amass_single_path = _single_file_path(output_base + "_amass", ".npz") | |
| print(f"Saving the amass output to {amass_single_path}") | |
| converter.convert_save_npz(output, amass_single_path) | |
| else: | |
| out_dir, _, base_name = _output_dir_and_path(output_base, "amass", ".npz") | |
| print(f"Saving the amass output to {out_dir}/ (amass_00.npz ...)") | |
| converter.convert_save_npz(output, os.path.join(out_dir, "amass.npz")) | |
| if resolved_model == "kimodo-g1-rp": | |
| from kimodo.exports.mujoco import MujocoQposConverter | |
| converter = MujocoQposConverter(model.skeleton) | |
| qpos = converter.dict_to_qpos(output, device) | |
| if n_samples == 1: | |
| csv_path = _single_file_path(output_base, ".csv") | |
| print(f"Saving the csv output to {csv_path}") | |
| converter.save_csv(qpos, csv_path) | |
| else: | |
| out_dir, _, base_name = _output_dir_and_path(output_base, "qpos", ".csv") | |
| print(f"Saving the csv output to {out_dir}/ ({base_name}_00.csv ...)") | |
| converter.save_csv(qpos, os.path.join(out_dir, base_name + ".csv")) | |
| if args.bvh: | |
| skeleton = model.skeleton | |
| if "somaskel" not in skeleton.name: | |
| print("BVH export is only supported for SOMA skeletons. Skipping --bvh.") | |
| else: | |
| from kimodo.exports.bvh import save_motion_bvh | |
| from kimodo.skeleton import SOMASkeleton30, global_rots_to_local_rots | |
| if isinstance(skeleton, SOMASkeleton30): | |
| # Motion has already been converted to somaskel77 within the model for output | |
| skeleton = skeleton.somaskel77.to(device) | |
| if n_samples == 1: | |
| bvh_path = _single_file_path(output_base, ".bvh") | |
| print(f"Saving the BVH output to {bvh_path}") | |
| joints_pos = torch.from_numpy(output["posed_joints"][0]).to(device) | |
| joints_rot = torch.from_numpy(output["global_rot_mats"][0]).to(device) | |
| local_rot_mats = global_rots_to_local_rots(joints_rot, skeleton) | |
| root_positions = joints_pos[:, skeleton.root_idx, :] | |
| save_motion_bvh(bvh_path, local_rot_mats, root_positions, skeleton=skeleton, fps=model.fps) | |
| else: | |
| out_dir, _, base_name = _output_dir_and_path(output_base, "motion", ".bvh") | |
| print(f"Saving the BVH output to {out_dir}/ ({base_name}_00.bvh ...)") | |
| for i in range(n_samples): | |
| joints_pos = torch.from_numpy(output["posed_joints"][i]).to(device) | |
| joints_rot = torch.from_numpy(output["global_rot_mats"][i]).to(device) | |
| local_rot_mats = global_rots_to_local_rots(joints_rot, skeleton) | |
| root_positions = joints_pos[:, skeleton.root_idx, :] | |
| save_motion_bvh( | |
| os.path.join(out_dir, f"{base_name}_{i:02d}.bvh"), | |
| local_rot_mats, | |
| root_positions, | |
| skeleton=skeleton, | |
| fps=model.fps, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |