| from __future__ import annotations |
|
|
| import atexit |
| import logging |
| import os |
| import signal |
| from contextlib import suppress |
| from typing import Any |
|
|
| import gymnasium as gym |
| import numpy as np |
| import torch |
|
|
| from .errors import IsaacLabArenaError |
|
|
|
|
| def cleanup_isaaclab(env, simulation_app) -> None: |
| """Cleanup IsaacLab env and simulation app resources.""" |
| |
| old_sigint = signal.signal(signal.SIGINT, signal.SIG_IGN) |
| old_sigterm = signal.signal(signal.SIGTERM, signal.SIG_IGN) |
| try: |
| with suppress(Exception): |
| if env is not None: |
| env.close() |
| with suppress(Exception): |
| if simulation_app is not None: |
| simulation_app.app.close() |
| finally: |
| |
| signal.signal(signal.SIGINT, old_sigint) |
| signal.signal(signal.SIGTERM, old_sigterm) |
|
|
|
|
| class IsaacLabEnvWrapper(gym.vector.AsyncVectorEnv): |
| """Wrapper adapting IsaacLab batched GPU env to AsyncVectorEnv. |
| IsaacLab handles vectorization internally on GPU. We inherit from |
| AsyncVectorEnv for compatibility with LeRobot.""" |
|
|
| metadata = {"render_modes": ["rgb_array"], "render_fps": 30} |
| _cleanup_in_progress = False |
|
|
| def __init__( |
| self, |
| env, |
| episode_length: int = 500, |
| task: str | None = None, |
| render_mode: str | None = "rgb_array", |
| simulation_app=None, |
| ): |
| self._env = env |
| self._num_envs = env.num_envs |
| self._episode_length = episode_length |
| self._closed = False |
| self.render_mode = render_mode |
| self._simulation_app = simulation_app |
|
|
| self.observation_space = env.observation_space |
| self.action_space = env.action_space |
| self.single_observation_space = env.observation_space |
| self.single_action_space = env.action_space |
| self.task = task |
|
|
| if hasattr(env, "metadata") and env.metadata: |
| self.metadata = {**self.metadata, **env.metadata} |
|
|
| |
| atexit.register(self._cleanup) |
| signal.signal(signal.SIGINT, self._signal_handler) |
| signal.signal(signal.SIGTERM, self._signal_handler) |
|
|
| def _signal_handler(self, signum, frame): |
| if IsaacLabEnvWrapper._cleanup_in_progress: |
| return |
| IsaacLabEnvWrapper._cleanup_in_progress = True |
| logging.info(f"Received signal {signum}, cleaning up...") |
| self._cleanup() |
| |
| os._exit(0) |
|
|
| def _check_closed(self): |
| if self._closed: |
| raise IsaacLabArenaError() |
|
|
| @property |
| def unwrapped(self): |
| return self |
|
|
| @property |
| def num_envs(self) -> int: |
| return self._num_envs |
|
|
| @property |
| def _max_episode_steps(self) -> int: |
| return self._episode_length |
|
|
| @property |
| def device(self) -> str: |
| return getattr(self._env, "device", "cpu") |
|
|
| def reset( |
| self, |
| *, |
| seed: int | list[int] | None = None, |
| options: dict[str, Any] | None = None, |
| ) -> tuple[dict[str, Any], dict[str, Any]]: |
| self._check_closed() |
| if isinstance(seed, (list, tuple, range)): |
| seed = seed[0] if len(seed) > 0 else None |
|
|
| obs, info = self._env.reset(seed=seed, options=options) |
|
|
| if "final_info" not in info: |
| zeros = np.zeros(self._num_envs, dtype=bool) |
| info["final_info"] = {"is_success": zeros} |
|
|
| return obs, info |
|
|
| def step( |
| self, actions: np.ndarray | torch.Tensor |
| ) -> tuple[dict, np.ndarray, np.ndarray, np.ndarray, dict]: |
| self._check_closed() |
| if isinstance(actions, np.ndarray): |
| actions = torch.from_numpy(actions).to(self._env.device) |
|
|
| obs, reward, terminated, truncated, info = self._env.step(actions) |
|
|
| |
| reward = reward.cpu().numpy().astype(np.float32) |
| terminated = terminated.cpu().numpy().astype(bool) |
| truncated = truncated.cpu().numpy().astype(bool) |
|
|
| is_success = self._get_success(terminated, truncated) |
| info["final_info"] = {"is_success": is_success} |
|
|
| return obs, reward, terminated, truncated, info |
|
|
| def _get_success(self, terminated: np.ndarray, truncated: np.ndarray) -> np.ndarray: |
| is_success = np.zeros(self._num_envs, dtype=bool) |
|
|
| if not hasattr(self._env, "termination_manager"): |
| return is_success & (terminated | truncated) |
|
|
| term_manager = self._env.termination_manager |
| if not hasattr(term_manager, "get_term"): |
| return is_success & (terminated | truncated) |
|
|
| success_tensor = term_manager.get_term("success") |
| if success_tensor is None: |
| return is_success & (terminated | truncated) |
|
|
| is_success = success_tensor.cpu().numpy().astype(bool) |
|
|
| return is_success & (terminated | truncated) |
|
|
| def call(self, method_name: str, *args, **kwargs) -> list[Any]: |
| if method_name == "_max_episode_steps": |
| return [self._episode_length] * self._num_envs |
| if method_name == "task": |
| return [self.task] * self._num_envs |
| if method_name == "render": |
| return self.render_all() |
|
|
| if hasattr(self._env, method_name): |
| attr = getattr(self._env, method_name) |
| result = attr(*args, **kwargs) if callable(attr) else attr |
| if isinstance(result, list): |
| return result |
| return [result] * self._num_envs |
|
|
| raise AttributeError(f"IsaacLab-Arena has no method/attribute '{method_name}'") |
|
|
| def render_all(self) -> list[np.ndarray]: |
| self._check_closed() |
| frames = self.render() |
| if frames is None: |
| placeholder = np.zeros((480, 640, 3), dtype=np.uint8) |
| return [placeholder] * self._num_envs |
|
|
| return [frames] * self._num_envs |
|
|
| def render(self) -> np.ndarray | None: |
| """Render all environments and return list of frames.""" |
| self._check_closed() |
| if self.render_mode != "rgb_array": |
| return None |
|
|
| frames = self._env.render() if hasattr(self._env, "render") else None |
| if frames is None: |
| return None |
|
|
| if isinstance(frames, torch.Tensor): |
| frames = frames.cpu().numpy() |
|
|
| return frames[0] if frames.ndim == 4 else frames |
|
|
| def _cleanup(self) -> None: |
| if self._closed: |
| return |
| self._closed = True |
| IsaacLabEnvWrapper._cleanup_in_progress = True |
| logging.info("Cleaning up IsaacLab Arena environment...") |
| cleanup_isaaclab(self._env, self._simulation_app) |
|
|
| def close(self) -> None: |
| self._cleanup() |
|
|
| @property |
| def envs(self) -> list[IsaacLabEnvWrapper]: |
| return [self] * self._num_envs |
|
|
| def __del__(self): |
| self._cleanup() |
|
|
| def __enter__(self): |
| return self |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self._cleanup() |
| return False |