import os import cv2 import json import logging import random from typing import Dict import torch from torch.utils.data import Dataset from torchvision import transforms import numpy as np import transformers from pycocotools.coco import COCO from .constants import COCO_KEYPOINT_NAME, KeypointLocationDescription, KeypointLocationQuestion from .constants import COCO_KEYPOINT_NAME_TOKEN DEFAULT_IMAGE_PATCH_TOKEN = "" PREFIX_IMAGE = "Image: " PREFIX_NO_IMAGE = "Image: N/A" BEGIN_DESCRIPTION = "" END_DESCRIPTION = "" IGNORE_INDEX = -100 DEFAULT_EOS_TOKEN = "" BEGIN_OPTIONS = "" END_OPTIONS = "" BEGIN_LOC = "" END_LOC = "" BEGIN_QUESTION = "" END_QUESTION = "" class PoseHICODetDataset(Dataset): """Dataset for supervised fine-tuning.""" def __init__(self, data_path: str, multimodal_cfg: dict, annotation_path: str = './outputs/merged_labels.json', max_samples: int = 0, ): super(PoseHICODetDataset, self).__init__() logging.warning("Loading data...") self.multimodal_cfg = multimodal_cfg self.mllm_image_size = multimodal_cfg['image_size'] self.aspect_ratio = 1.0 self.pixel_std = 200 self.num_joints = 17 self.num_joints_full_body = 136 self.list_data_dict = self._load_json(annotation_path) if max_samples > 0: self.list_data_dict = self.list_data_dict[:max_samples] json_path = os.path.join(data_path, "Annotation/hico-det-instance-level/hico-det-training-set-instance-level.json") with open(json_path, "r", encoding="utf-8") as f: hoi_data = json.load(f) self.hoi_data = hoi_data def _load_json(self, data_path): with open(data_path, 'r', encoding="utf-8") as f: data_list = json.load(f) return data_list def __len__(self): return len(self.list_data_dict) def __getitem__(self, i): sources = self.list_data_dict[i] image = self._get_image_item(sources) hoi_id = self._find_hoi_id(sources) assert hoi_id != -1 sources['hoi_id'] = hoi_id data_dict = {} data_dict['image'] = image data_dict['meta'] = sources return data_dict def _get_image_item(self, sources): file_name = sources['file_name'] image_folder = self.multimodal_cfg['image_folder'] image_file = os.path.join(image_folder, file_name) image = cv2.imread( image_file, cv2.IMREAD_COLOR | cv2.IMREAD_IGNORE_ORIENTATION ) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # process image joints = sources['keypoints'] joints_vis = sources['vis'] x1, y1, x2, y2 = sources['human_bbox'] w, h = x2-x1, y2-y1 c, s = self._xywh2cs(x1, y1, w, h) r = 0 trans = get_affine_transform(c, s, r, (int(self.mllm_image_size), int(self.mllm_image_size))) image = cv2.warpAffine( image, trans, (int(self.mllm_image_size), int(self.mllm_image_size)), flags=cv2.INTER_LINEAR) return image def _xywh2cs(self, x, y, w, h): center = np.zeros((2), dtype=np.float32) center[0] = x + w * 0.5 center[1] = y + h * 0.5 if w > self.aspect_ratio * h: h = w * 1.0 / self.aspect_ratio elif w < self.aspect_ratio * h: w = h * self.aspect_ratio scale = np.array( [w * 1.0 / self.pixel_std, h * 1.0 / self.pixel_std], dtype=np.float32) if center[0] != -1: # scale = scale * 1.25 scale = scale * 1.0 return center, scale def _match_action_labels(self, src_action_labels, action_labels): is_match = False if len(src_action_labels) != len(action_labels): return is_match else: exsistance = [] for new_item in src_action_labels: exists = any(d.get("human_part") == new_item["human_part"] and d.get("partstate") == new_item["partstate"] for d in action_labels) exsistance.append(exists) is_match = all(exsistance) return is_match def _find_hoi_id(self, sources): file_name = sources['file_name'] hoi_data = self.hoi_data[file_name] hoi_labels = hoi_data['labels'] hoi_id = -1 src_action_labels = sources['action_labels'] for dic in hoi_labels: action_labels = dic['action_labels'] #human_bbox = dic['human_bbox'] hoi_id = dic['hoi_id'] is_a_member = self._match_action_labels(src_action_labels=src_action_labels, action_labels=action_labels) if is_a_member: return hoi_id return hoi_id def fliplr_joints(joints, joints_vis, width, matched_parts): """ flip coords """ # Flip horizontal joints[:, 0] = width - joints[:, 0] - 1 # Change left-right parts for pair in matched_parts: joints[pair[0], :], joints[pair[1], :] = \ joints[pair[1], :], joints[pair[0], :].copy() joints_vis[pair[0], :], joints_vis[pair[1], :] = \ joints_vis[pair[1], :], joints_vis[pair[0], :].copy() return joints*joints_vis, joints_vis def transform_preds(coords, center, scale, output_size): target_coords = np.zeros(coords.shape) trans = get_affine_transform(center, scale, 0, output_size, inv=1) for p in range(coords.shape[0]): target_coords[p, 0:2] = affine_transform(coords[p, 0:2], trans) return target_coords def get_affine_transform( center, scale, rot, output_size, shift=np.array([0, 0], dtype=np.float32), inv=0 ): if not isinstance(scale, np.ndarray) and not isinstance(scale, list): print(scale) scale = np.array([scale, scale]) scale_tmp = scale * 200.0 src_w = scale_tmp[0] dst_w = output_size[0] dst_h = output_size[1] rot_rad = np.pi * rot / 180 src_dir = get_dir([0, src_w * -0.5], rot_rad) dst_dir = np.array([0, dst_w * -0.5], np.float32) src = np.zeros((3, 2), dtype=np.float32) dst = np.zeros((3, 2), dtype=np.float32) src[0, :] = center + scale_tmp * shift src[1, :] = center + src_dir + scale_tmp * shift dst[0, :] = [dst_w * 0.5, dst_h * 0.5] dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir src[2:, :] = get_3rd_point(src[0, :], src[1, :]) dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :]) if inv: trans = cv2.getAffineTransform(np.float32(dst), np.float32(src)) else: trans = cv2.getAffineTransform(np.float32(src), np.float32(dst)) return trans def affine_transform(pt, t): new_pt = np.array([pt[0], pt[1], 1.]).T new_pt = np.dot(t, new_pt) return new_pt[:2] def get_3rd_point(a, b): direct = a - b return b + np.array([-direct[1], direct[0]], dtype=np.float32) def get_dir(src_point, rot_rad): sn, cs = np.sin(rot_rad), np.cos(rot_rad) src_result = [0, 0] src_result[0] = src_point[0] * cs - src_point[1] * sn src_result[1] = src_point[0] * sn + src_point[1] * cs return src_result