| import os |
| import fnmatch |
| import json |
|
|
| import h5py |
| import yaml |
| import cv2 |
| import numpy as np |
|
|
| from configs.state_vec import STATE_VEC_IDX_MAPPING |
| TABLETOP_6D_INDICES_NAMES = [ |
| 'left_eef_pos_x','left_eef_pos_y','left_eef_pos_z','left_eef_angle_0','left_eef_angle_1','left_eef_angle_2','left_eef_angle_3','left_eef_angle_4','left_eef_angle_5','left_gripper_open','right_eef_pos_x','right_eef_pos_y','right_eef_pos_z','right_eef_angle_0','right_eef_angle_1','right_eef_angle_2','right_eef_angle_3','right_eef_angle_4','right_eef_angle_5','right_gripper_open'] |
| TABLETOP_6D_INDICES = [STATE_VEC_IDX_MAPPING[n] for n in TABLETOP_6D_INDICES_NAMES] |
|
|
| class TabletopHDF5VLADataset: |
| """ |
| This class is used to sample episodes from the embododiment dataset |
| stored in HDF5. |
| """ |
| def __init__(self, task_name) -> None: |
| |
| |
| dataset_name = task_name |
| HDF5_DIR = f"/data5/jellyho/tabletop/{dataset_name}/" |
| self.DATASET_NAME = dataset_name |
| |
| self.file_paths = [] |
| for root, _, files in os.walk(HDF5_DIR): |
| for filename in fnmatch.filter(files, '*.hdf5'): |
| file_path = os.path.join(root, filename) |
| self.file_paths.append(file_path) |
| |
| |
| with open('configs/base.yaml', 'r') as file: |
| config = yaml.safe_load(file) |
| self.CHUNK_SIZE = config['common']['action_chunk_size'] |
| self.IMG_HISORY_SIZE = config['common']['img_history_size'] |
| self.STATE_DIM = config['common']['state_dim'] |
| |
| |
| episode_lens = [] |
| for file_path in self.file_paths: |
| valid, res = self.parse_hdf5_file_state_only(file_path) |
| _len = res['state'].shape[0] if valid else 0 |
| episode_lens.append(_len) |
| self.episode_sample_weights = np.array(episode_lens) / np.sum(episode_lens) |
| |
| def __len__(self): |
| return len(self.file_paths) |
| |
| def get_dataset_name(self): |
| return self.DATASET_NAME |
| |
| def get_item(self, index: int=None, state_only=False): |
| """Get a training sample at a random timestep. |
| |
| Args: |
| index (int, optional): the index of the episode. |
| If not provided, a random episode will be selected. |
| state_only (bool, optional): Whether to return only the state. |
| In this way, the sample will contain a complete trajectory rather |
| than a single timestep. Defaults to False. |
| |
| Returns: |
| sample (dict): a dictionary containing the training sample. |
| """ |
| while True: |
| if index is None: |
| file_path = np.random.choice(self.file_paths, p=self.episode_sample_weights) |
| else: |
| file_path = self.file_paths[index] |
| valid, sample = self.parse_hdf5_file(file_path) \ |
| if not state_only else self.parse_hdf5_file_state_only(file_path) |
| if valid: |
| return sample |
| else: |
| index = np.random.randint(0, len(self.file_paths)) |
| |
| def parse_hdf5_file(self, file_path): |
| """[Modify] Parse a hdf5 file to generate a training sample at |
| a random timestep. |
| |
| Args: |
| file_path (str): the path to the hdf5 file |
| |
| Returns: |
| valid (bool): whether the episode is valid, which is useful for filtering. |
| If False, this episode will be dropped. |
| dict: a dictionary containing the training sample, |
| { |
| "meta": { |
| "dataset_name": str, # the name of your dataset. |
| "#steps": int, # the number of steps in the episode, |
| # also the total timesteps. |
| "instruction": str # the language instruction for this episode. |
| }, |
| "step_id": int, # the index of the sampled step, |
| # also the timestep t. |
| "state": ndarray, # state[t], (1, STATE_DIM). |
| "state_std": ndarray, # std(state[:]), (STATE_DIM,). |
| "state_mean": ndarray, # mean(state[:]), (STATE_DIM,). |
| "state_norm": ndarray, # norm(state[:]), (STATE_DIM,). |
| "actions": ndarray, # action[t:t+CHUNK_SIZE], (CHUNK_SIZE, STATE_DIM). |
| "state_indicator", ndarray, # indicates the validness of each dim, (STATE_DIM,). |
| "cam_high": ndarray, # external camera image, (IMG_HISORY_SIZE, H, W, 3) |
| # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
| "cam_high_mask": ndarray, # indicates the validness of each timestep, (IMG_HISORY_SIZE,) boolean array. |
| # For the first IMAGE_HISTORY_SIZE-1 timesteps, the mask should be False. |
| "cam_left_wrist": ndarray, # left wrist camera image, (IMG_HISORY_SIZE, H, W, 3). |
| # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
| "cam_left_wrist_mask": ndarray, |
| "cam_right_wrist": ndarray, # right wrist camera image, (IMG_HISORY_SIZE, H, W, 3). |
| # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
| # If only one wrist, make it right wrist, plz. |
| "cam_right_wrist_mask": ndarray |
| } or None if the episode is invalid. |
| """ |
| with h5py.File(file_path, 'r') as f: |
| states = f['observations']['states']['ee_6d_pos'][:] |
| actions = f['actions']['ee_6d_pos'][:] |
| num_steps = states.shape[0] |
| |
| if num_steps < 20: |
| return False, None |
| |
| |
| step_id = np.random.randint(0, num_steps) |
| |
| |
| if self.DATASET_NAME == 'aloha_box_into_pot_easy': |
| instruction = f['observations']['states']['language_instruction'][0].decode('utf-8') |
| else: |
| instruction = f"lang_embed/{self.DATASET_NAME}.pt" |
| |
| |
| meta = { |
| "dataset_name": self.DATASET_NAME, |
| "#steps": num_steps, |
| "step_id": step_id, |
| "instruction": instruction |
| } |
| |
| |
| states = states / np.array( |
| [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
| ) |
| actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( |
| [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
| ) |
| |
| |
| state = states[step_id:step_id+1] |
| state_std = np.std(states, axis=0) |
| state_mean = np.mean(states, axis=0) |
| state_norm = np.sqrt(np.mean(states**2, axis=0)) |
|
|
| if actions.shape[0] < self.CHUNK_SIZE: |
| |
| actions = np.concatenate([ |
| actions, |
| np.tile(actions[-1:], (self.CHUNK_SIZE-actions.shape[0], 1)) |
| ], axis=0) |
| |
| |
| def fill_in_state(values): |
| uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) |
| uni_vec[..., TABLETOP_6D_INDICES] = values |
| return uni_vec |
| state = fill_in_state(state) |
| state_indicator = fill_in_state(np.ones_like(state_std)) |
| state_std = fill_in_state(state_std) |
| state_mean = fill_in_state(state_mean) |
| state_norm = fill_in_state(state_norm) |
| |
| |
| actions = fill_in_state(actions) |
| |
| |
| def parse_img(key): |
| imgs = [] |
| for i in range(max(step_id-self.IMG_HISORY_SIZE+1, 0), step_id+1): |
| img = f['observations']['images'][key][i] |
| |
| imgs.append(img) |
| |
| imgs = np.stack(imgs) |
| if imgs.shape[0] < self.IMG_HISORY_SIZE: |
| |
| imgs = np.concatenate([ |
| np.tile(imgs[:1], (self.IMG_HISORY_SIZE-imgs.shape[0], 1, 1, 1)), |
| imgs |
| ], axis=0) |
| return imgs |
| |
| cam_high = parse_img('back') |
| |
| valid_len = min(step_id + 1, self.IMG_HISORY_SIZE) |
| cam_high_mask = np.array( |
| [False] * (self.IMG_HISORY_SIZE - valid_len) + [True] * valid_len |
| ) |
| cam_left_wrist = parse_img('wrist_left') |
| cam_left_wrist_mask = cam_high_mask.copy() |
| cam_right_wrist = parse_img('wrist_right') |
| cam_right_wrist_mask = cam_high_mask.copy() |
|
|
| |
| |
| |
| |
| |
| |
| return True, { |
| "meta": meta, |
| "state": state, |
| "state_std": state_std, |
| "state_mean": state_mean, |
| "state_norm": state_norm, |
| "actions": actions, |
| "state_indicator": state_indicator, |
| "cam_high": cam_high, |
| "cam_high_mask": cam_high_mask, |
| "cam_left_wrist": cam_left_wrist, |
| "cam_left_wrist_mask": cam_left_wrist_mask, |
| "cam_right_wrist": cam_right_wrist, |
| "cam_right_wrist_mask": cam_right_wrist_mask |
| } |
|
|
| def parse_hdf5_file_state_only(self, file_path): |
| """[Modify] Parse a hdf5 file to generate a state trajectory. |
| |
| Args: |
| file_path (str): the path to the hdf5 file |
| |
| Returns: |
| valid (bool): whether the episode is valid, which is useful for filtering. |
| If False, this episode will be dropped. |
| dict: a dictionary containing the training sample, |
| { |
| "state": ndarray, # state[:], (T, STATE_DIM). |
| "action": ndarray, # action[:], (T, STATE_DIM). |
| } or None if the episode is invalid. |
| """ |
| with h5py.File(file_path, 'r') as f: |
| states = f['observations']['states']['ee_6d_pos'][:] |
| actions = f['actions']['ee_6d_pos'][:] |
| num_steps = states.shape[0] |
| |
| step_id = np.random.randint(0, num_steps) |
|
|
| |
| states = states / np.array( |
| [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
| ) |
| actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( |
| [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
| ) |
| |
| |
| def fill_in_state(values): |
| uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) |
| uni_vec[..., TABLETOP_6D_INDICES] = values |
| return uni_vec |
| state = fill_in_state(states) |
| action = fill_in_state(actions) |
| |
| |
| return True, { |
| "state": state, |
| "action": action |
| } |
|
|
| class AnubisHDF5VLADataset: |
| """ |
| This class is used to sample episodes from the embododiment dataset |
| stored in HDF5. |
| """ |
| def __init__(self, task_name) -> None: |
| |
| |
| dataset_name = task_name |
| HDF5_DIR = f"/data5/jellyho/anubis_hdf5/{dataset_name}/" |
| self.DATASET_NAME = dataset_name |
| |
| self.file_paths = [] |
| for root, _, files in os.walk(HDF5_DIR): |
| for filename in fnmatch.filter(files, '*.hdf5'): |
| file_path = os.path.join(root, filename) |
| self.file_paths.append(file_path) |
| |
| |
| with open('configs/base.yaml', 'r') as file: |
| config = yaml.safe_load(file) |
| self.CHUNK_SIZE = config['common']['action_chunk_size'] |
| self.IMG_HISORY_SIZE = config['common']['img_history_size'] |
| self.STATE_DIM = config['common']['state_dim'] |
| |
| |
| episode_lens = [] |
| for file_path in self.file_paths: |
| valid, res = self.parse_hdf5_file_state_only(file_path) |
| _len = res['state'].shape[0] if valid else 0 |
| episode_lens.append(_len) |
| self.episode_sample_weights = np.array(episode_lens) / np.sum(episode_lens) |
| |
| def __len__(self): |
| return len(self.file_paths) |
| |
| def get_dataset_name(self): |
| return self.DATASET_NAME |
| |
| def get_item(self, index: int=None, state_only=False): |
| """Get a training sample at a random timestep. |
| |
| Args: |
| index (int, optional): the index of the episode. |
| If not provided, a random episode will be selected. |
| state_only (bool, optional): Whether to return only the state. |
| In this way, the sample will contain a complete trajectory rather |
| than a single timestep. Defaults to False. |
| |
| Returns: |
| sample (dict): a dictionary containing the training sample. |
| """ |
| while True: |
| if index is None: |
| file_path = np.random.choice(self.file_paths, p=self.episode_sample_weights) |
| else: |
| file_path = self.file_paths[index] |
| valid, sample = self.parse_hdf5_file(file_path) \ |
| if not state_only else self.parse_hdf5_file_state_only(file_path) |
| if valid: |
| return sample |
| else: |
| index = np.random.randint(0, len(self.file_paths)) |
| |
| def parse_hdf5_file(self, file_path): |
| """[Modify] Parse a hdf5 file to generate a training sample at |
| a random timestep. |
| |
| Args: |
| file_path (str): the path to the hdf5 file |
| |
| Returns: |
| valid (bool): whether the episode is valid, which is useful for filtering. |
| If False, this episode will be dropped. |
| dict: a dictionary containing the training sample, |
| { |
| "meta": { |
| "dataset_name": str, # the name of your dataset. |
| "#steps": int, # the number of steps in the episode, |
| # also the total timesteps. |
| "instruction": str # the language instruction for this episode. |
| }, |
| "step_id": int, # the index of the sampled step, |
| # also the timestep t. |
| "state": ndarray, # state[t], (1, STATE_DIM). |
| "state_std": ndarray, # std(state[:]), (STATE_DIM,). |
| "state_mean": ndarray, # mean(state[:]), (STATE_DIM,). |
| "state_norm": ndarray, # norm(state[:]), (STATE_DIM,). |
| "actions": ndarray, # action[t:t+CHUNK_SIZE], (CHUNK_SIZE, STATE_DIM). |
| "state_indicator", ndarray, # indicates the validness of each dim, (STATE_DIM,). |
| "cam_high": ndarray, # external camera image, (IMG_HISORY_SIZE, H, W, 3) |
| # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
| "cam_high_mask": ndarray, # indicates the validness of each timestep, (IMG_HISORY_SIZE,) boolean array. |
| # For the first IMAGE_HISTORY_SIZE-1 timesteps, the mask should be False. |
| "cam_left_wrist": ndarray, # left wrist camera image, (IMG_HISORY_SIZE, H, W, 3). |
| # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
| "cam_left_wrist_mask": ndarray, |
| "cam_right_wrist": ndarray, # right wrist camera image, (IMG_HISORY_SIZE, H, W, 3). |
| # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
| # If only one wrist, make it right wrist, plz. |
| "cam_right_wrist_mask": ndarray |
| } or None if the episode is invalid. |
| """ |
| with h5py.File(file_path, 'r') as f: |
| states = f['observation']['eef_pose'][:] |
| actions = f['action']['eef_pose'][:] |
| num_steps = states.shape[0] |
| |
| if num_steps < 20: |
| return False, None |
| |
| |
| step_id = np.random.randint(0, num_steps) |
| |
| |
| if self.DATASET_NAME == 'aloha_box_into_pot_easy': |
| instruction = f['observations']['states']['language_instruction'][0].decode('utf-8') |
| else: |
| instruction = f"lang_embed/{self.DATASET_NAME}.pt" |
| |
| |
| meta = { |
| "dataset_name": self.DATASET_NAME, |
| "#steps": num_steps, |
| "step_id": step_id, |
| "instruction": instruction |
| } |
| |
| |
| states = states / np.array( |
| [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
| ) |
| actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( |
| [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
| ) |
| |
| |
| state = states[step_id:step_id+1] |
| state_std = np.std(states, axis=0) |
| state_mean = np.mean(states, axis=0) |
| state_norm = np.sqrt(np.mean(states**2, axis=0)) |
|
|
| if actions.shape[0] < self.CHUNK_SIZE: |
| |
| actions = np.concatenate([ |
| actions, |
| np.tile(actions[-1:], (self.CHUNK_SIZE-actions.shape[0], 1)) |
| ], axis=0) |
| |
| |
| def fill_in_state(values): |
| uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) |
| uni_vec[..., TABLETOP_6D_INDICES] = values |
| return uni_vec |
| state = fill_in_state(state) |
| state_indicator = fill_in_state(np.ones_like(state_std)) |
| state_std = fill_in_state(state_std) |
| state_mean = fill_in_state(state_mean) |
| state_norm = fill_in_state(state_norm) |
| |
| |
| actions = fill_in_state(actions) |
| |
| |
| def parse_img(key): |
| imgs = [] |
| for i in range(max(step_id-self.IMG_HISORY_SIZE+1, 0), step_id+1): |
| img = f['observation'][key][i] |
| |
| imgs.append(img) |
| |
| imgs = np.stack(imgs) |
| if imgs.shape[0] < self.IMG_HISORY_SIZE: |
| |
| imgs = np.concatenate([ |
| np.tile(imgs[:1], (self.IMG_HISORY_SIZE-imgs.shape[0], 1, 1, 1)), |
| imgs |
| ], axis=0) |
| return imgs |
| |
| cam_high = parse_img('agentview_image') |
| |
| valid_len = min(step_id + 1, self.IMG_HISORY_SIZE) |
| cam_high_mask = np.array( |
| [False] * (self.IMG_HISORY_SIZE - valid_len) + [True] * valid_len |
| ) |
| cam_left_wrist = parse_img('wrist_left_image') |
| cam_left_wrist_mask = cam_high_mask.copy() |
| cam_right_wrist = parse_img('wrist_right_image') |
| cam_right_wrist_mask = cam_high_mask.copy() |
|
|
| |
| |
| |
| |
| |
| |
| return True, { |
| "meta": meta, |
| "state": state, |
| "state_std": state_std, |
| "state_mean": state_mean, |
| "state_norm": state_norm, |
| "actions": actions, |
| "state_indicator": state_indicator, |
| "cam_high": cam_high, |
| "cam_high_mask": cam_high_mask, |
| "cam_left_wrist": cam_left_wrist, |
| "cam_left_wrist_mask": cam_left_wrist_mask, |
| "cam_right_wrist": cam_right_wrist, |
| "cam_right_wrist_mask": cam_right_wrist_mask |
| } |
|
|
| def parse_hdf5_file_state_only(self, file_path): |
| """[Modify] Parse a hdf5 file to generate a state trajectory. |
| |
| Args: |
| file_path (str): the path to the hdf5 file |
| |
| Returns: |
| valid (bool): whether the episode is valid, which is useful for filtering. |
| If False, this episode will be dropped. |
| dict: a dictionary containing the training sample, |
| { |
| "state": ndarray, # state[:], (T, STATE_DIM). |
| "action": ndarray, # action[:], (T, STATE_DIM). |
| } or None if the episode is invalid. |
| """ |
| with h5py.File(file_path, 'r') as f: |
| states = f['observation']['eef_pose'][:] |
| actions = f['action']['eef_pose'][:] |
| num_steps = states.shape[0] |
| |
| step_id = np.random.randint(0, num_steps) |
|
|
| |
| states = states / np.array( |
| [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
| ) |
| actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( |
| [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
| ) |
| |
| |
| def fill_in_state(values): |
| uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) |
| uni_vec[..., TABLETOP_6D_INDICES] = values |
| return uni_vec |
| state = fill_in_state(states) |
| action = fill_in_state(actions) |
| |
| |
| return True, { |
| "state": state, |
| "action": action |
| } |
|
|
| if __name__ == "__main__": |
| ds = TabletopHDF5VLADataset() |
| for i in range(len(ds)): |
| print(f"Processing episode {i}/{len(ds)}...") |
| ds.get_item(i) |
|
|