| from __future__ import annotations |
| import logging |
| from typing import Any |
|
|
| import importlib |
| import importlib.util |
| import logging |
| import os |
| import sys |
|
|
|
|
| |
| HUB_REPO_ID = "LightwheelAI/lw_benchhub_env" |
|
|
|
|
| def _download_and_import(filename: str): |
| """Download file from Hub and import as module.""" |
| from huggingface_hub import hf_hub_download |
|
|
| local_path = hf_hub_download(repo_id=HUB_REPO_ID, filename=filename) |
| module_dir = os.path.dirname(local_path) |
|
|
| |
| if module_dir not in sys.path: |
| sys.path.insert(0, module_dir) |
|
|
| module_name = filename.replace(".py", "") |
|
|
| |
| with open(local_path) as f: |
| content = f.read() |
|
|
| |
| content = content.replace("from .errors import", "from errors import") |
| content = content.replace("from .isaaclab_env_wrapper import", "from isaaclab_env_wrapper import") |
|
|
| |
| module = importlib.util.module_from_spec(importlib.util.spec_from_file_location(module_name, local_path)) |
| sys.modules[module_name] = module |
|
|
| |
| code = compile(content, local_path, "exec") |
| exec(code, module.__dict__) |
|
|
| return module |
|
|
|
|
| try: |
| from .errors import IsaacLabArenaCameraKeyError, IsaacLabArenaStateKeyError |
| from .isaaclab_env_wrapper import IsaacLabEnvWrapper, cleanup_isaaclab |
| except ImportError: |
| _errors = _download_and_import("errors.py") |
| _isaaclab_wrapper = _download_and_import("isaaclab_env_wrapper.py") |
| IsaacLabEnvWrapper = _isaaclab_wrapper.IsaacLabEnvWrapper |
| cleanup_isaaclab = _isaaclab_wrapper.cleanup_isaaclab |
| IsaacLabArenaCameraKeyError = _errors.IsaacLabArenaCameraKeyError |
| IsaacLabArenaStateKeyError = _errors.IsaacLabArenaStateKeyError |
|
|
|
|
| def make_env(n_envs: int = 1, use_async_envs: bool = False, cfg: Any = None): |
| """ |
| Create vectorized environments for leisaac task. |
| Args: |
| n_envs: Number of parallel environments |
| use_async_envs: Whether to use AsyncVectorEnv or SyncVectorEnv |
| (not use now) |
| cfg: Environment configuration object (EnvConfig) |
| Returns: |
| ManagerBasedRLEnv or DirectRLEnv inherit from gym.Env implemented in IsaacLab |
| """ |
|
|
| |
| if cfg is None or not hasattr(cfg, 'config_path'): |
| raise ValueError( |
| "Please set config_path via --env.kwargs='{\"config_path\": \"...\"}'") |
| config_path = cfg.config_path |
| logging.warning( |
| "Ignore n_envs and use_async_envs in parameter, we use the config path to create the environment") |
|
|
| from lw_benchhub.utils.envhub_utils import export_env_for_envhub |
|
|
| raw_env, environment, task, render_mode, episode_length, app_launcher = export_env_for_envhub( |
| config_path=config_path |
| ) |
|
|
| wrapped_env = IsaacLabEnvWrapper( |
| raw_env, |
| episode_length=episode_length, |
| task=task, |
| render_mode=render_mode, |
| simulation_app=app_launcher, |
| ) |
| logging.info(f"Created: {environment} with {wrapped_env.num_envs} envs") |
|
|
| return {environment: {0: wrapped_env}} |
|
|