| from navsim.agents.abstract_agent import AbstractAgent |
| from omegaconf import DictConfig |
| import hydra |
| from hydra.utils import instantiate |
| import argparse |
| import os |
| from hugsim.visualize import to_video, save_frame |
| from hugsim.dataparser import parse_raw |
| import torch |
| from hugsim_client import HugsimClient |
|
|
| CONFIG_PATH = "navsim/planning/script/config/HUGSIM" |
| CONFIG_NAME = "transfuser" |
|
|
| hugsim_client = HugsimClient() |
| hugsim_client.reset_env() |
|
|
| def get_opts(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument('--output', type=str, required=True) |
| return parser.parse_args() |
|
|
|
|
| @hydra.main(config_path=CONFIG_PATH, config_name=CONFIG_NAME, version_base=None) |
| def main(cfg: DictConfig) -> None: |
| cfg.agent.config.latent = True |
| cfg.agent.checkpoint_path = "./ckpts/ltf_seed_0.ckpt" |
| print(cfg) |
| agent: AbstractAgent = instantiate(cfg.agent) |
| agent.initialize() |
| agent = agent.cuda() |
| |
| print('Ready for recieving') |
| cnt = 0 |
| output_folder = os.path.join(cfg.output, 'ltf') |
| os.makedirs(output_folder, exist_ok=True) |
| |
| env_state = hugsim_client.get_current_state() |
| while True: |
| if env_state.done: |
| return |
|
|
| data = parse_raw((env_state.state.obs, env_state.state.info)) |
| del data['raw_imgs'] |
| |
| try: |
| with torch.no_grad(): |
| traj = agent.compute_trajectory(data['input']) |
| except RuntimeError as e: |
| traj = None |
| print(e) |
|
|
| if traj is None: |
| print('Waiting for visualize tasks...') |
| return |
| |
| |
| imu_way_points = traj.poses[:, :3] |
| way_points = imu_way_points[:, [1, 0, 2]] |
| way_points[:, 0] *= -1 |
|
|
| env_state = hugsim_client.execute_action(way_points[:, :2]) |
| print('sent') |
| cnt += 1 |
| if env_state.cur_scene_done: |
| print('Scene done') |
|
|
|
|
| if __name__ == '__main__': |
| |
| main() |