| """Utils for evaluating robot policies in various environments."""
|
|
|
| import os
|
| import random
|
| import time
|
| from typing import Any, Dict, List, Optional, Union
|
|
|
| import numpy as np
|
| import torch
|
|
|
| from experiments.robot.openvla_utils import (
|
| get_vla,
|
| get_vla_action,
|
| )
|
|
|
|
|
| ACTION_DIM = 7
|
| DATE = time.strftime("%Y_%m_%d")
|
| DATE_TIME = time.strftime("%Y_%m_%d-%H_%M_%S")
|
| DEVICE = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
|
|
|
|
|
| np.set_printoptions(formatter={"float": lambda x: "{0:0.3f}".format(x)})
|
|
|
|
|
| OPENVLA_V01_SYSTEM_PROMPT = (
|
| "A chat between a curious user and an artificial intelligence assistant. "
|
| "The assistant gives helpful, detailed, and polite answers to the user's questions."
|
| )
|
|
|
|
|
| MODEL_IMAGE_SIZES = {
|
| "openvla": 224,
|
|
|
| }
|
|
|
|
|
| def set_seed_everywhere(seed: int) -> None:
|
| """
|
| Set random seed for all random number generators for reproducibility.
|
|
|
| Args:
|
| seed: The random seed to use
|
| """
|
| torch.manual_seed(seed)
|
| torch.cuda.manual_seed_all(seed)
|
| np.random.seed(seed)
|
| random.seed(seed)
|
| torch.backends.cudnn.deterministic = True
|
| torch.backends.cudnn.benchmark = False
|
| os.environ["PYTHONHASHSEED"] = str(seed)
|
|
|
|
|
| def get_model(cfg: Any, wrap_diffusion_policy_for_droid: bool = False) -> torch.nn.Module:
|
| """
|
| Load and initialize model for evaluation based on configuration.
|
|
|
| Args:
|
| cfg: Configuration object with model parameters
|
| wrap_diffusion_policy_for_droid: Whether to wrap diffusion policy for DROID
|
|
|
| Returns:
|
| torch.nn.Module: The loaded model
|
|
|
| Raises:
|
| ValueError: If model family is not supported
|
| """
|
| if cfg.model_family == "openvla":
|
| model = get_vla(cfg)
|
| else:
|
| raise ValueError(f"Unsupported model family: {cfg.model_family}")
|
|
|
| print(f"Loaded model: {type(model)}")
|
| return model
|
|
|
|
|
| def get_image_resize_size(cfg: Any) -> Union[int, tuple]:
|
| """
|
| Get image resize dimensions for a specific model.
|
|
|
| If returned value is an int, the resized image will be a square.
|
| If returned value is a tuple, the resized image will be a rectangle.
|
|
|
| Args:
|
| cfg: Configuration object with model parameters
|
|
|
| Returns:
|
| Union[int, tuple]: Image resize dimensions
|
|
|
| Raises:
|
| ValueError: If model family is not supported
|
| """
|
| if cfg.model_family not in MODEL_IMAGE_SIZES:
|
| raise ValueError(f"Unsupported model family: {cfg.model_family}")
|
|
|
| return MODEL_IMAGE_SIZES[cfg.model_family]
|
|
|
|
|
| def get_action(
|
| cfg: Any,
|
| model: torch.nn.Module,
|
| obs: Dict[str, Any],
|
| task_label: str,
|
| processor: Optional[Any] = None,
|
| action_head: Optional[torch.nn.Module] = None,
|
| proprio_projector: Optional[torch.nn.Module] = None,
|
| noisy_action_projector: Optional[torch.nn.Module] = None,
|
| use_film: bool = False,
|
| ) -> Union[List[np.ndarray], np.ndarray]:
|
| """
|
| Query the model to get action predictions.
|
|
|
| Args:
|
| cfg: Configuration object with model parameters
|
| model: The loaded model
|
| obs: Observation dictionary
|
| task_label: Text description of the task
|
| processor: Model processor for inputs
|
| action_head: Optional action head for continuous actions
|
| proprio_projector: Optional proprioception projector
|
| noisy_action_projector: Optional noisy action projector for diffusion
|
| use_film: Whether to use FiLM
|
|
|
| Returns:
|
| Union[List[np.ndarray], np.ndarray]: Predicted actions
|
|
|
| Raises:
|
| ValueError: If model family is not supported
|
| """
|
| with torch.no_grad():
|
| if cfg.model_family == "openvla":
|
| action = get_vla_action(
|
| cfg=cfg,
|
| vla=model,
|
| processor=processor,
|
| obs=obs,
|
| task_label=task_label,
|
| action_head=action_head,
|
| proprio_projector=proprio_projector,
|
| noisy_action_projector=noisy_action_projector,
|
| use_film=use_film,
|
| )
|
| else:
|
| raise ValueError(f"Unsupported model family: {cfg.model_family}")
|
|
|
| return action
|
|
|
|
|
| def normalize_gripper_action(action: np.ndarray, binarize: bool = True) -> np.ndarray:
|
| """
|
| Normalize gripper action from [0,1] to [-1,+1] range.
|
|
|
| This is necessary for some environments because the dataset wrapper
|
| standardizes gripper actions to [0,1]. Note that unlike the other action
|
| dimensions, the gripper action is not normalized to [-1,+1] by default.
|
|
|
| Normalization formula: y = 2 * (x - orig_low) / (orig_high - orig_low) - 1
|
|
|
| Args:
|
| action: Action array with gripper action in the last dimension
|
| binarize: Whether to binarize gripper action to -1 or +1
|
|
|
| Returns:
|
| np.ndarray: Action array with normalized gripper action
|
| """
|
|
|
| normalized_action = action.copy()
|
|
|
|
|
| orig_low, orig_high = 0.0, 1.0
|
| normalized_action[..., -1] = 2 * (normalized_action[..., -1] - orig_low) / (orig_high - orig_low) - 1
|
|
|
| if binarize:
|
|
|
| normalized_action[..., -1] = np.sign(normalized_action[..., -1])
|
|
|
| return normalized_action
|
|
|
|
|
| def invert_gripper_action(action: np.ndarray) -> np.ndarray:
|
| """
|
| Flip the sign of the gripper action (last dimension of action vector).
|
|
|
| This is necessary for environments where -1 = open, +1 = close, since
|
| the RLDS dataloader aligns gripper actions such that 0 = close, 1 = open.
|
|
|
| Args:
|
| action: Action array with gripper action in the last dimension
|
|
|
| Returns:
|
| np.ndarray: Action array with inverted gripper action
|
| """
|
|
|
| inverted_action = action.copy()
|
|
|
|
|
| inverted_action[..., -1] *= -1.0
|
|
|
| return inverted_action
|
|
|