""" Example reward-model wrapper for LIBERO environments. """ from __future__ import annotations from collections import deque from dataclasses import dataclass from typing import Any, Deque, Dict, List, Optional, Sequence, Tuple, Union import os import argparse import sys sys.path.append(os.path.join(os.path.dirname(__file__), "..", "LIBERO")) import numpy as np import gymnasium as gym import gymnasium.vector as gym_vector from robometer.evals.eval_utils import raw_dict_to_sample, extract_rewards_from_output, extract_success_probs_from_output from robometer.evals.eval_server import process_batch_helper from robometer.utils.setup_utils import setup_batch_collator from robometer.utils.tensor_utils import t2n from robometer.utils.save import load_model_from_hf class GymToGymnasiumWrapper(gym.Env): """ A wrapper to convert a classic Gym environment to a Gymnasium-like interface. It adapts `reset()` and `step()` signatures, handles info dict changes, and supports compatibility. """ def __init__(self, env, time_limit: int = None): super().__init__() # make sure Env is initialized self.env = env # Action space remains the same if hasattr(self.env, "action_space"): self.action_space = self.env.action_space if hasattr(self.env, "observation_space"): self.observation_space = self.env.observation_space self.reward_range = getattr(env, "reward_range", None) self.metadata = getattr(env, "metadata", {}) self.time_limit = time_limit self.current_step = 0 def reset(self, *, seed=None, options=None): # Reset step counter self.current_step = 0 # Gym reset sometimes does not support 'seed' or 'options' if seed is not None: try: obs = self.env.reset(seed=seed) except TypeError: self.env.seed(seed) obs = self.env.reset() else: obs = self.env.reset() info = {} if isinstance(obs, tuple) and len(obs) == 2: obs, info = obs return obs, info def step(self, action): result = self.env.step(action) self.current_step += 1 if len(result) == 4: obs, reward, done, info = result terminated = done # Gymnasium expects terminated, truncated if self.time_limit is not None and self.current_step >= self.time_limit: truncated = True else: truncated = info.get("TimeLimit.truncated", False) return obs, reward, terminated, truncated, info elif len(result) == 5: # Already modern API return result else: raise ValueError("Unexpected number of outputs from env.step") def render(self, *args, **kwargs): return self.env.render(*args, **kwargs) def close(self): return self.env.close() def __getattr__(self, name): # Forward other attributes/methods to original env return getattr(self.env, name) @dataclass class RewardModelStepOutput: # Kept for potential downstream debugging/typing; not required by wrappers. reward: float success_prob: float per_key_rewards: Dict[str, float] per_key_success_probs: Dict[str, float] class _RewardModelInferenceMixin: """ Shared reward-model inference logic with local model. """ def __init__( self, model_path: str, device: str, max_frames: Optional[int] = None, ): if model_path is not None: reward_model_config, tokenizer, processor, reward_model = load_model_from_hf( model_path=model_path, device=device, ) reward_model.eval() self.reward_model = reward_model self.reward_model_config = reward_model_config if self.reward_model is None: raise ValueError("reward_model must be provided") # Determine max_frames if max_frames is not None: self.max_frames = int(max_frames) elif self.reward_model_config is not None: self.max_frames = int(getattr(getattr(self.reward_model_config, "data", None), "max_frames", 16)) else: self.max_frames = 16 # Local model path: set up collator once self.processor = None self.tokenizer = None self.batch_collator = None self._model_device = None self._model_type = None if self.reward_model is not None: self.processor = getattr(self.reward_model, "processor", None) self.tokenizer = getattr(self.reward_model, "tokenizer", None) if self.processor is None or self.tokenizer is None: raise ValueError( "processor and tokenizer must be available on reward_model " "(reward_model.processor / reward_model.tokenizer)" ) # Ensure multi-image behavior is enabled (matches SPUR buffer) if self.reward_model_config is not None: data_cfg = getattr(self.reward_model_config, "data", None) if data_cfg is not None and hasattr(data_cfg, "use_multi_image") and not data_cfg.use_multi_image: data_cfg.use_multi_image = True # Resolve model type/device once self._model_type = getattr(getattr(self.reward_model_config, "model", None), "model_type", None) if self._model_type is None: raise ValueError("reward_model_config.model.model_type is required for local reward inference") self._model_device = getattr(self.reward_model, "device", None) if self._model_device is None: try: import torch self._model_device = next(self.reward_model.parameters()).device if isinstance(self._model_device, torch.device): self._model_device = str(self._model_device) except Exception: self._model_device = None self.batch_collator = setup_batch_collator( self.processor, self.tokenizer, self.reward_model_config, is_eval=True ) def _compute_rewards_batch( self, batch_raw: List[Dict[str, Any]] ) -> Tuple[List[float], List[float]]: """ Returns lists: (progress_rewards, success_probs). """ if len(batch_raw) == 0: return [], [] samples = [ raw_dict_to_sample(raw_data=raw, max_frames=self.max_frames, sample_type="progress") for raw in batch_raw ] is_discrete_mode = ( self.reward_model_config is not None and getattr(getattr(self.reward_model_config, "loss", None), "progress_loss_type", None) == "discrete" ) num_bins = ( getattr(getattr(self.reward_model_config, "loss", None), "progress_discrete_bins", None) if self.reward_model_config is not None else None ) outputs = process_batch_helper( model_type=self._model_type, model=self.reward_model, tokenizer=self.tokenizer, batch_collator=self.batch_collator, device=self._model_device, batch_data=[s.model_dump() for s in samples], job_id=0, is_discrete_mode=bool(is_discrete_mode), num_bins=num_bins, ) rewards = extract_rewards_from_output(outputs) success_probs = extract_success_probs_from_output(outputs) return rewards.tolist(), success_probs.tolist() class LiberoRobometerRewardWrapper(gym.Wrapper, _RewardModelInferenceMixin): """ Non-vector LIBERO wrapper that replaces rewards with reward-model predictions. """ def __init__( self, env, model_path: str, device: str, reward_relabeling_keys: Sequence[str], *, use_relative_rewards: bool = False, add_estimated_reward: bool = False, use_success_detection: bool = False, success_detection_duration: int = 2, success_detection_threshold: float = 0.65, max_frames: Optional[int] = None, ): self.env = GymToGymnasiumWrapper(env, time_limit=400) gym.Wrapper.__init__(self, self.env) _RewardModelInferenceMixin.__init__( self, model_path=model_path, device=device, max_frames=max_frames, ) self.reward_relabeling_keys = list(reward_relabeling_keys) if len(self.reward_relabeling_keys) == 0: raise ValueError("reward_relabeling_keys must be non-empty") # Action space remains the same if not hasattr(self.env, "action_space"): self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(7,), dtype=np.float32) else: self.action_space = self.env.action_space self.use_relative_rewards = bool(use_relative_rewards) self.add_estimated_reward = bool(add_estimated_reward) self.use_success_detection = bool(use_success_detection) self.success_detection_duration = int(success_detection_duration) self.success_detection_threshold = float(success_detection_threshold) self._frames: Dict[str, Deque[np.ndarray]] = {} self.language_instruction = self.env.language_instruction self.episode_id = 0 self._step_in_episode: int = 0 self._prev_reward: float = 0.0 self._success_window: Deque[float] = deque(maxlen=self.success_detection_duration) def _get_language_instruction(self, obs: Dict[str, Any], info: Dict[str, Any]) -> Optional[str]: if isinstance(info, dict) and "language_instruction" in info: return info.get("language_instruction") if isinstance(obs, dict) and isinstance(obs.get("prompt"), str): return obs.get("prompt") return self.language_instruction def reset(self, **kwargs): obs, info = self.env.reset(**kwargs) self.language_instruction = self.env.language_instruction self.episode_id += 1 self._frames = {k: [] for k in self.reward_relabeling_keys} self._step_in_episode = 0 self._prev_reward = 0.0 self._success_window = deque(maxlen=self.success_detection_duration) if isinstance(obs, dict): for k in self.reward_relabeling_keys: if k in obs: self._frames[k].append(t2n(obs[k])) return obs, info def step(self, action): obs, env_reward, terminated, truncated, info = self.env.step(action) if not isinstance(info, dict): info = {} if info is None else dict(info) # In LIBERO, done is only True when task succeeds, so success = done # But don't overwrite if already present in info if "success" not in info: info["success"] = terminated if terminated: assert env_reward == 1.0, "Reward should be 1.0 when task succeeds" env_reward -= 1 # reward is -1, 0 if isinstance(obs, dict): for k in self.reward_relabeling_keys: if k in obs: self._frames[k].append(t2n(obs[k])) # Prepare per-key inputs for this timestep per_key_rewards: Dict[str, float] = {} per_key_success: Dict[str, float] = {} for key_idx, key in enumerate(self.reward_relabeling_keys): frames = np.stack(list(self._frames[key]), axis=0) if len(self._frames[key]) > 0 else np.array([]) raw = dict( frames=frames, task=self.language_instruction, id=self.episode_id, metadata=dict( subsequence_length=len(self._frames[key]) if self._frames[key] is not None else 0, ), video_embeddings=None, text_embedding=None, ) rewards, success_probs = self._compute_rewards_batch([raw]) per_key_rewards[key] = rewards[0] per_key_success[key] = success_probs[0] pred_reward = np.mean(list(per_key_rewards.values())) if per_key_rewards else 0.0 success_prob = np.mean(list(per_key_success.values())) if per_key_success else 0.0 # Relative reward option if self.use_relative_rewards: current = pred_reward pred_reward = pred_reward - self._prev_reward self._prev_reward = current if terminated or truncated: self._prev_reward = 0.0 # Success detection option if self.use_success_detection: self._success_window.append(success_prob) if len(self._success_window) == self.success_detection_duration: votes = sum(1 for p in self._success_window if p >= self.success_detection_threshold) if votes > (self.success_detection_duration / 2): terminated = True info["success"] = True info["success_from_reward_model"] = True # Decide what reward to return if self.add_estimated_reward: out_reward = env_reward + pred_reward else: out_reward = pred_reward info["env_reward"] = env_reward info["predicted_reward"] = pred_reward info["success_prob"] = success_prob info["predicted_rewards_by_key"] = per_key_rewards info["success_probs_by_key"] = per_key_success info["step_in_episode"] = int(self._step_in_episode) self._step_in_episode += 1 # If the env auto-resets under the hood, start a fresh history when done/truncated. if terminated or truncated: self._frames = {k: [] for k in self.reward_relabeling_keys} self.language_instruction = self.env.language_instruction self._step_in_episode = 0 self._success_window = deque(maxlen=self.success_detection_duration) return obs, out_reward, terminated, truncated, info class VectorLiberoRobometerRewardWrapper(gym_vector.VectorWrapper, _RewardModelInferenceMixin): """ Vectorized LIBERO wrapper that replaces rewards with reward-model predictions per env. """ def __init__( self, env: gym_vector.VectorEnv, model_path: str, device: str, reward_relabeling_keys: Sequence[str], *, use_relative_rewards: bool = False, add_estimated_reward: bool = False, replace_reward: bool = True, use_success_detection: bool = False, success_detection_duration: int = 2, success_detection_threshold: float = 0.65, max_frames: Optional[int] = None, ): gym_vector.VectorWrapper.__init__(self, env) _RewardModelInferenceMixin.__init__( self, model_path=model_path, device=device, max_frames=max_frames, ) self.reward_relabeling_keys = list(reward_relabeling_keys) if len(self.reward_relabeling_keys) == 0: raise ValueError("reward_relabeling_keys must be non-empty") self.use_relative_rewards = bool(use_relative_rewards) self.add_estimated_reward = bool(add_estimated_reward) self.replace_reward = bool(replace_reward) self.use_success_detection = bool(use_success_detection) self.success_detection_duration = int(success_detection_duration) self.success_detection_threshold = float(success_detection_threshold) self._n = int(getattr(self.env, "num_envs", 1)) self._frames: List[Dict[str, Deque[np.ndarray]]] = [] self._language_instructions: List[Optional[str]] = [] self._episode_ids: List[int] = [] self._step_in_episode: List[int] = [] self._prev_rewards: List[float] = [] self._success_windows: List[Deque[float]] = [] self._init_state() def _init_state(self): self._n = int(getattr(self.env, "num_envs", self._n)) self._frames = [ {k: deque(maxlen=self.max_frames) for k in self.reward_relabeling_keys} for _ in range(self._n) ] self._language_instructions = [None for _ in range(self._n)] self._episode_ids = [0 for _ in range(self._n)] self._step_in_episode = [0 for _ in range(self._n)] self._prev_rewards = [0.0 for _ in range(self._n)] self._success_windows = [deque(maxlen=self.success_detection_duration) for _ in range(self._n)] def _get_language_instruction_vec(self, obs: Dict[str, Any], info: Any) -> List[Optional[str]]: getter = getattr(self.env, "get_language_instruction", None) if callable(getter): try: instr = getter() if isinstance(instr, str): return [instr] * self._n except Exception: pass # Try prompt in obs if isinstance(obs, dict) and "prompt" in obs: p = obs["prompt"] if isinstance(p, list) and len(p) == self._n: return [str(x) for x in p] if isinstance(p, np.ndarray) and p.shape[0] == self._n: return [str(x) for x in p.tolist()] # Fallback: single instruction attribute (shared across envs) shared = getattr(self.env, "language_instruction", None) return [shared] * self._n def reset(self, **kwargs): obs, info = self.env.reset(**kwargs) self._init_state() if isinstance(obs, dict): instrs = self._get_language_instruction_vec(obs, info) for i in range(self._n): self._language_instructions[i] = instrs[i] self._episode_ids[i] += 1 for k in self.reward_relabeling_keys: if k in obs: arr = t2n(obs[k]) if arr is not None and arr.shape[0] == self._n: for i in range(self._n): self._frames[i][k].append(arr[i]) return obs, info def step(self, actions): obs, env_rewards, terminateds, truncateds, info = self.env.step(actions) # Normalize arrays env_rewards_np = t2n(env_rewards) terminateds_np = t2n(terminateds).astype(bool) truncateds_np = t2n(truncateds).astype(bool) if env_rewards_np is None: env_rewards_np = np.zeros((self._n,), dtype=np.float64) # In LIBERO, done is only True when task succeeds, so success = terminated. # Mirror the non-vector wrapper's reward shift (0/1 -> -1/0). for i in range(self._n): if bool(terminateds_np[i]): assert float(env_rewards_np[i]) == 1.0, "Reward should be 1.0 when task succeeds" env_rewards_shifted = env_rewards_np.astype(np.float64) - 1.0 # Gymnasium VectorEnv may auto-reset in the same step; if so, terminal obs is in info["final_observation"] final_obs = None if isinstance(info, dict) and "final_observation" in info: final_obs = info.get("final_observation") reset_instrs = self._get_language_instruction_vec(obs, info) if isinstance(obs, dict) else [None] * self._n task_for_model: List[Optional[str]] = [ (self._language_instructions[i] if self._language_instructions[i] is not None else reset_instrs[i]) for i in range(self._n) ] # Update frame histories using the correct observation for this transition. # If SAME_STEP autoreset is enabled, use terminal obs from final_observation when available. if isinstance(obs, dict): for k in self.reward_relabeling_keys: if k not in obs: continue arr_reset = t2n(obs[k]) if arr_reset is None or arr_reset.shape[0] != self._n: continue for i in range(self._n): frame_i = arr_reset[i] if final_obs is not None and i < len(final_obs) and final_obs[i] is not None: fo_i = final_obs[i] if isinstance(fo_i, dict) and k in fo_i: frame_i = t2n(fo_i[k]) self._frames[i][k].append(frame_i) # Batch reward computation per key across envs per_env_per_key_reward: Dict[str, List[float]] = {k: [0.0] * self._n for k in self.reward_relabeling_keys} per_env_per_key_success: Dict[str, List[float]] = {k: [0.0] * self._n for k in self.reward_relabeling_keys} for key_idx, key in enumerate(self.reward_relabeling_keys): batch_raw: List[Dict[str, Any]] = [] for i in range(self._n): frames = np.stack(list(self._frames[i][key]), axis=0) if len(self._frames[i][key]) > 0 else np.array([]) batch_raw.append( dict( frames=frames, task=task_for_model[i], id=int(self._episode_ids[i]), metadata=dict(subsequence_length=len(self._frames[i][key])), video_embeddings=None, text_embedding=None, ) ) rewards_k, success_k = self._compute_rewards_batch(batch_raw) for i in range(self._n): per_env_per_key_reward[key][i] = rewards_k[i] if i < len(rewards_k) else 0.0 per_env_per_key_success[key][i] = success_k[i] if i < len(success_k) else 0.0 # Aggregate across keys pred_rewards_abs = np.zeros((self._n,), dtype=np.float64) success_probs = np.zeros((self._n,), dtype=np.float64) for i in range(self._n): r_vals = [per_env_per_key_reward[k][i] for k in self.reward_relabeling_keys] s_vals = [per_env_per_key_success[k][i] for k in self.reward_relabeling_keys] pred_rewards_abs[i] = np.mean(r_vals) if len(r_vals) else 0.0 success_probs[i] = np.mean(s_vals) if len(s_vals) else 0.0 pred_rewards_out = pred_rewards_abs.copy() if self.use_relative_rewards: for i in range(self._n): cur = float(pred_rewards_abs[i]) pred_rewards_out[i] = cur - self._prev_rewards[i] self._prev_rewards[i] = cur if terminateds_np[i] or truncateds_np[i]: self._prev_rewards[i] = 0.0 # Success detection if self.use_success_detection: for i in range(self._n): self._success_windows[i].append(float(success_probs[i])) if len(self._success_windows[i]) == self.success_detection_duration: votes = sum(1 for p in self._success_windows[i] if p >= self.success_detection_threshold) if votes > (self.success_detection_duration / 2): terminateds_np[i] = True # Determine reward output if self.add_estimated_reward: out_rewards = env_rewards_shifted + pred_rewards_out else: out_rewards = env_rewards_shifted if not self.replace_reward else pred_rewards_out # Inject info # Gymnasium vector env `info` is typically a dict of arrays; keep it dict-like. if info is None: info = {} if isinstance(info, dict): info = dict(info) if "success" not in info: info["success"] = terminateds_np.copy() info["env_reward"] = env_rewards_shifted.astype(np.float64) info["predicted_reward"] = pred_rewards_out.astype(np.float64) info["predicted_reward_abs"] = pred_rewards_abs.astype(np.float64) info["success_prob"] = success_probs.astype(np.float64) info["step_in_episode"] = np.asarray(self._step_in_episode, dtype=np.int32) # Also provide per-key arrays if isinstance(info, dict): for k in self.reward_relabeling_keys: info[f"predicted_reward/{k}"] = np.asarray(per_env_per_key_reward[k], dtype=np.float64) info[f"success_prob/{k}"] = np.asarray(per_env_per_key_success[k], dtype=np.float64) # Advance step counters and clear per-env state on episode end (to support auto-reset vector envs) for i in range(self._n): self._step_in_episode[i] += 1 if terminateds_np[i] or truncateds_np[i]: self._frames[i] = {k: deque(maxlen=self.max_frames) for k in self.reward_relabeling_keys} self._language_instructions[i] = reset_instrs[i] self._step_in_episode[i] = 0 self._success_windows[i] = deque(maxlen=self.success_detection_duration) self._episode_ids[i] += 1 # If SAME_STEP autoreset happened, seed next episode history with reset obs immediately. if isinstance(obs, dict) and final_obs is not None and i < len(final_obs) and final_obs[i] is not None: for k in self.reward_relabeling_keys: if k not in obs: continue arr_reset = t2n(obs[k]) if arr_reset is not None and arr_reset.shape[0] == self._n: self._frames[i][k].append(arr_reset[i]) return obs, out_rewards.astype(np.float64), terminateds_np, truncateds_np, info def main(): try: from libero.libero.envs import OffScreenRenderEnv, DummyVectorEnv from libero.libero import benchmark, get_libero_path except ImportError: print("LIBERO not found. Please install LIBERO.") sys.exit(1) parser = argparse.ArgumentParser( description="Run RBM inference locally: load model from HuggingFace and compute per-frame progress and success.", epilog="Outputs: .npy (rewards), _success_probs.npy, _progress_success.png", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--model-path", default="aliangdw/Robometer-4B-LIBERO", help="HuggingFace model id or local checkpoint path") parser.add_argument("--task-suite-name", default="libero_90", help="LIBERO task suite name") parser.add_argument("--task-id", default=28, type=int, help="LIBERO task id") parser.add_argument("--vectorized", action="store_true", help="Run in vectorized mode") parser.add_argument("--num-envs", default=2, type=int, help="Number of environments to run in parallel") args = parser.parse_args() if not args.vectorized: print("Testing Single LIBERO Robometer Reward Wrapper") seed = np.random.randint(0, 1000000) # Get task info benchmark_dict = benchmark.get_benchmark_dict() task_suite = benchmark_dict[args.task_suite_name]() task = task_suite.get_task(args.task_id) task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file) env_args = {"bddl_file_name": task_bddl_file, "camera_heights": 256, "camera_widths": 256} base_env = OffScreenRenderEnv(**env_args) base_env.seed(seed) robometer_libero_env = LiberoRobometerRewardWrapper(base_env, model_path=args.model_path, device="cuda", reward_relabeling_keys=["agentview_image"], add_estimated_reward=True, ) obs, info = robometer_libero_env.reset() for i in range(10): action = np.random.uniform(-1, 1, 7) obs, reward, terminated, truncated, info = robometer_libero_env.step(action) print(f"Reward at step {i}: {reward}") robometer_libero_env.close() else: print("Testing Vectorized LIBERO Robometer Reward Wrapper") def make_env(): seed = np.random.randint(0, 1000000) # Get task info benchmark_dict = benchmark.get_benchmark_dict() task_suite = benchmark_dict[args.task_suite_name]() task = task_suite.get_task(args.task_id) task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file) env_args = {"bddl_file_name": task_bddl_file, "camera_heights": 256, "camera_widths": 256} base_env = OffScreenRenderEnv(**env_args) base_env.seed(seed) sample_obs = base_env.reset() env = GymToGymnasiumWrapper(base_env, time_limit=400) # Action space remains the same if not hasattr(env, "action_space"): env.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(7,), dtype=np.float32) if not hasattr(env, "observation_space"): # Create observation space from sample_obs, which is a dict of arrays obs_space_dict = {} for k, v in sample_obs.items(): # Common LIBERO obs dict may include images (uint8) and sometimes text prompts. if isinstance(v, (str, bytes, bytearray)) or (isinstance(v, np.ndarray) and v.dtype.kind in {"U", "S"}): obs_space_dict[k] = gym.spaces.Text(max_length=2048) continue v_arr = np.asarray(v) dt = v_arr.dtype if np.issubdtype(dt, np.uint8): # Images: bounded [0, 255] obs_space_dict[k] = gym.spaces.Box( low=np.zeros(v_arr.shape, dtype=np.uint8), high=np.full(v_arr.shape, 255, dtype=np.uint8), shape=v_arr.shape, dtype=np.uint8, ) elif np.issubdtype(dt, np.integer): ii = np.iinfo(dt) obs_space_dict[k] = gym.spaces.Box( low=np.full(v_arr.shape, ii.min, dtype=dt), high=np.full(v_arr.shape, ii.max, dtype=dt), shape=v_arr.shape, dtype=dt, ) else: # Floats/other numeric: unbounded obs_space_dict[k] = gym.spaces.Box( low=np.full(v_arr.shape, -np.inf, dtype=np.float32), high=np.full(v_arr.shape, np.inf, dtype=np.float32), shape=v_arr.shape, dtype=np.float32, ) env.observation_space = gym.spaces.Dict(obs_space_dict) return env env_fns = [make_env for _ in range(args.num_envs)] env = gym.vector.SyncVectorEnv(env_fns) robometer_libero_env = VectorLiberoRobometerRewardWrapper(env, model_path=args.model_path, device="cuda", reward_relabeling_keys=["agentview_image"], add_estimated_reward=True, ) obs, info = robometer_libero_env.reset() for i in range(10): actions = np.random.uniform(-1, 1, (args.num_envs, 7)) obs, rewards, terminateds, truncateds, infos = robometer_libero_env.step(actions) print(f"Rewards at step {i}: {rewards}") robometer_libero_env.close() if __name__ == "__main__": main()