| import math |
| from os import path as osp |
| from typing import Callable, Optional |
| import glob |
| import torch |
| from torch.utils.data.dataset import Dataset |
| import torchvision.transforms.functional as TF |
| import numpy as np |
| from PIL import Image, ImageOps |
| import pandas as pd |
| from .masking import MaskGenerator |
| from . import data_utils as utils |
|
|
|
|
| class VideoAttentionTargetVideo(Dataset): |
| def __init__( |
| self, |
| image_root: str, |
| anno_root: str, |
| head_root: str, |
| transform: Callable, |
| input_size: int, |
| output_size: int, |
| quant_labelmap: bool = True, |
| is_train: bool = True, |
| seq_len: int = 8, |
| max_len: int = 32, |
| *, |
| mask_generator: Optional[MaskGenerator] = None, |
| bbox_jitter: float = 0.5, |
| rand_crop: float = 0.5, |
| rand_flip: float = 0.5, |
| color_jitter: float = 0.5, |
| rand_rotate: float = 0.0, |
| rand_lsj: float = 0.0, |
| ): |
| dfs = [] |
| for show_dir in glob.glob(osp.join(anno_root, "*")): |
| for sequence_path in glob.glob(osp.join(show_dir, "*", "*.txt")): |
| df = pd.read_csv( |
| sequence_path, |
| header=None, |
| index_col=False, |
| names=[ |
| "path", |
| "x_min", |
| "y_min", |
| "x_max", |
| "y_max", |
| "gaze_x", |
| "gaze_y", |
| ], |
| ) |
| show_name = sequence_path.split("/")[-3] |
| clip = sequence_path.split("/")[-2] |
| df["path"] = df["path"].apply( |
| lambda path: osp.join(show_name, clip, path) |
| ) |
| cur_len = len(df.index) |
| if is_train: |
| if cur_len <= max_len: |
| if cur_len >= seq_len: |
| dfs.append(df) |
| continue |
| remainder = cur_len % max_len |
| df_splits = [ |
| df[i : i + max_len] |
| for i in range(0, cur_len - max_len, max_len) |
| ] |
| if remainder >= seq_len: |
| df_splits.append(df[-remainder:]) |
| dfs.extend(df_splits) |
| else: |
| if cur_len < seq_len: |
| continue |
| df_splits = [ |
| df[i : i + seq_len] |
| for i in range(0, cur_len - seq_len, seq_len) |
| ] |
| dfs.extend(df_splits) |
|
|
| for df in dfs: |
| df.reset_index(inplace=True) |
| self.dfs = dfs |
| self.length = len(dfs) |
|
|
| self.data_dir = image_root |
| self.head_dir = head_root |
| self.transform = transform |
| self.draw_labelmap = ( |
| utils.draw_labelmap if quant_labelmap else utils.draw_labelmap_no_quant |
| ) |
| self.is_train = is_train |
|
|
| self.input_size = input_size |
| self.output_size = output_size |
| self.seq_len = seq_len |
|
|
| if self.is_train: |
| self.bbox_jitter = bbox_jitter |
| self.rand_crop = rand_crop |
| self.rand_flip = rand_flip |
| self.color_jitter = color_jitter |
| self.rand_rotate = rand_rotate |
| self.rand_lsj = rand_lsj |
| self.mask_generator = mask_generator |
|
|
| def __getitem__(self, index): |
| df = self.dfs[index] |
| seq_len = len(df.index) |
| for coord in ["x_min", "y_min", "x_max", "y_max"]: |
| df[coord] = utils.smooth_by_conv(11, df, coord) |
|
|
| if self.is_train: |
| |
| cond_jitter = np.random.random_sample() |
| cond_flip = np.random.random_sample() |
| cond_color = np.random.random_sample() |
| if cond_color < self.color_jitter: |
| n1 = np.random.uniform(0.5, 1.5) |
| n2 = np.random.uniform(0.5, 1.5) |
| n3 = np.random.uniform(0.5, 1.5) |
| cond_crop = np.random.random_sample() |
| cond_rotate = np.random.random_sample() |
| if cond_rotate < self.rand_rotate: |
| angle = (2 * np.random.random_sample() - 1) * 20 |
| angle = -math.radians(angle) |
| cond_lsj = np.random.random_sample() |
| if cond_lsj < self.rand_lsj: |
| lsj_scale = 0.1 + np.random.random_sample() * 0.9 |
|
|
| |
| if seq_len > self.seq_len: |
| sampled_ind = np.random.randint(0, seq_len - self.seq_len) |
| seq_len = self.seq_len |
| else: |
| sampled_ind = 0 |
|
|
| if cond_crop < self.rand_crop: |
| sliced_x_min = df["x_min"].iloc[sampled_ind : sampled_ind + seq_len] |
| sliced_x_max = df["x_max"].iloc[sampled_ind : sampled_ind + seq_len] |
| sliced_y_min = df["y_min"].iloc[sampled_ind : sampled_ind + seq_len] |
| sliced_y_max = df["y_max"].iloc[sampled_ind : sampled_ind + seq_len] |
|
|
| sliced_gaze_x = df["gaze_x"].iloc[sampled_ind : sampled_ind + seq_len] |
| sliced_gaze_y = df["gaze_y"].iloc[sampled_ind : sampled_ind + seq_len] |
|
|
| check_sum = sliced_gaze_x.sum() + sliced_gaze_y.sum() |
| all_outside = check_sum == -2 * seq_len |
|
|
| |
| if all_outside: |
| crop_x_min = np.min([sliced_x_min.min(), sliced_x_max.min()]) |
| crop_y_min = np.min([sliced_y_min.min(), sliced_y_max.min()]) |
| crop_x_max = np.max([sliced_x_min.max(), sliced_x_max.max()]) |
| crop_y_max = np.max([sliced_y_min.max(), sliced_y_max.max()]) |
| else: |
| crop_x_min = np.min( |
| [sliced_gaze_x.min(), sliced_x_min.min(), sliced_x_max.min()] |
| ) |
| crop_y_min = np.min( |
| [sliced_gaze_y.min(), sliced_y_min.min(), sliced_y_max.min()] |
| ) |
| crop_x_max = np.max( |
| [sliced_gaze_x.max(), sliced_x_min.max(), sliced_x_max.max()] |
| ) |
| crop_y_max = np.max( |
| [sliced_gaze_y.max(), sliced_y_min.max(), sliced_y_max.max()] |
| ) |
|
|
| |
| if crop_x_min >= 0: |
| crop_x_min = np.random.uniform(0, crop_x_min) |
| if crop_y_min >= 0: |
| crop_y_min = np.random.uniform(0, crop_y_min) |
|
|
| |
| path = osp.join(self.data_dir, df["path"].iloc[0]) |
| img = Image.open(path) |
| img = img.convert("RGB") |
| width, height = img.size |
|
|
| |
| crop_width_min = crop_x_max - crop_x_min |
| crop_height_min = crop_y_max - crop_y_min |
| crop_width_max = width - crop_x_min |
| crop_height_max = height - crop_y_min |
| |
| crop_width = np.random.uniform(crop_width_min, crop_width_max) |
| crop_height = np.random.uniform(crop_height_min, crop_height_max) |
|
|
| |
| crop_y_min, crop_x_min, crop_height, crop_width = map( |
| int, map(round, (crop_y_min, crop_x_min, crop_height, crop_width)) |
| ) |
| else: |
| sampled_ind = 0 |
|
|
| images = [] |
| head_channels = [] |
| heatmaps = [] |
| gazes = [] |
| gaze_inouts = [] |
| imsizes = [] |
| head_masks = [] |
| if self.is_train and self.mask_generator is not None: |
| image_masks = [] |
| for i, row in df.iterrows(): |
| if self.is_train and (i < sampled_ind or i >= (sampled_ind + self.seq_len)): |
| continue |
|
|
| x_min = row["x_min"] |
| y_min = row["y_min"] |
| x_max = row["x_max"] |
| y_max = row["y_max"] |
| gaze_x = row["gaze_x"] |
| gaze_y = row["gaze_y"] |
|
|
| if x_min > x_max: |
| x_min, x_max = x_max, x_min |
| if y_min > y_max: |
| y_min, y_max = y_max, y_min |
|
|
| path = row["path"] |
| img = Image.open(osp.join(self.data_dir, path)).convert("RGB") |
| width, height = img.size |
| imsize = torch.FloatTensor([width, height]) |
| imsizes.append(imsize) |
| |
| |
| if osp.exists(osp.join(self.head_dir, path)): |
| head_mask = Image.open(osp.join(self.head_dir, path)).resize( |
| (width, height) |
| ) |
| else: |
| head_mask = Image.fromarray(np.zeros((height, width), dtype=np.float32)) |
|
|
| x_min, y_min, x_max, y_max = map(float, [x_min, y_min, x_max, y_max]) |
| gaze_x, gaze_y = map(float, [gaze_x, gaze_y]) |
| if gaze_x == -1 and gaze_y == -1: |
| gaze_inside = False |
| else: |
| if ( |
| gaze_x < 0 |
| ): |
| gaze_x = 0 |
| if gaze_y < 0: |
| gaze_y = 0 |
| gaze_inside = True |
|
|
| if self.is_train: |
| |
| |
| if cond_jitter < self.bbox_jitter: |
| k = cond_jitter * 0.1 |
| x_min -= k * abs(x_max - x_min) |
| y_min -= k * abs(y_max - y_min) |
| x_max += k * abs(x_max - x_min) |
| y_max += k * abs(y_max - y_min) |
| x_min = np.clip(x_min, 0, width - 1) |
| x_max = np.clip(x_max, 0, width - 1) |
| y_min = np.clip(y_min, 0, height - 1) |
| y_max = np.clip(y_max, 0, height - 1) |
|
|
| |
| if cond_color < self.color_jitter: |
| img = TF.adjust_brightness(img, brightness_factor=n1) |
| img = TF.adjust_contrast(img, contrast_factor=n2) |
| img = TF.adjust_saturation(img, saturation_factor=n3) |
|
|
| |
| if cond_crop < self.rand_crop: |
| |
| img = TF.crop(img, crop_y_min, crop_x_min, crop_height, crop_width) |
| head_mask = TF.crop( |
| head_mask, crop_y_min, crop_x_min, crop_height, crop_width |
| ) |
|
|
| |
| offset_x, offset_y = crop_x_min, crop_y_min |
|
|
| |
| x_min, y_min, x_max, y_max = ( |
| x_min - offset_x, |
| y_min - offset_y, |
| x_max - offset_x, |
| y_max - offset_y, |
| ) |
| if gaze_inside: |
| gaze_x, gaze_y = (gaze_x - offset_x), (gaze_y - offset_y) |
| else: |
| gaze_x = -1 |
| gaze_y = -1 |
|
|
| width, height = crop_width, crop_height |
|
|
| |
| if cond_flip < self.rand_flip: |
| img = img.transpose(Image.FLIP_LEFT_RIGHT) |
| head_mask = head_mask.transpose(Image.FLIP_LEFT_RIGHT) |
| x_max_2 = width - x_min |
| x_min_2 = width - x_max |
| x_max = x_max_2 |
| x_min = x_min_2 |
| if gaze_x != -1 and gaze_y != -1: |
| gaze_x = width - gaze_x |
|
|
| |
| if cond_rotate < self.rand_rotate: |
| rot_mat = [ |
| round(math.cos(angle), 15), |
| round(math.sin(angle), 15), |
| 0.0, |
| round(-math.sin(angle), 15), |
| round(math.cos(angle), 15), |
| 0.0, |
| ] |
|
|
| def _transform(x, y, matrix): |
| return ( |
| matrix[0] * x + matrix[1] * y + matrix[2], |
| matrix[3] * x + matrix[4] * y + matrix[5], |
| ) |
|
|
| def _inv_transform(x, y, matrix): |
| x, y = x - matrix[2], y - matrix[5] |
| return ( |
| matrix[0] * x + matrix[3] * y, |
| matrix[1] * x + matrix[4] * y, |
| ) |
|
|
| |
| rot_center = (width / 2.0, height / 2.0) |
| rot_mat[2], rot_mat[5] = _transform( |
| -rot_center[0], -rot_center[1], rot_mat |
| ) |
| rot_mat[2] += rot_center[0] |
| rot_mat[5] += rot_center[1] |
| xx = [] |
| yy = [] |
| for x, y in ((0, 0), (width, 0), (width, height), (0, height)): |
| x, y = _transform(x, y, rot_mat) |
| xx.append(x) |
| yy.append(y) |
| nw = math.ceil(max(xx)) - math.floor(min(xx)) |
| nh = math.ceil(max(yy)) - math.floor(min(yy)) |
| rot_mat[2], rot_mat[5] = _transform( |
| -(nw - width) / 2.0, -(nh - height) / 2.0, rot_mat |
| ) |
|
|
| img = img.transform((nw, nh), Image.AFFINE, rot_mat, Image.BILINEAR) |
| head_mask = head_mask.transform( |
| (nw, nh), Image.AFFINE, rot_mat, Image.BILINEAR |
| ) |
|
|
| xx = [] |
| yy = [] |
| for x, y in ( |
| (x_min, y_min), |
| (x_min, y_max), |
| (x_max, y_min), |
| (x_max, y_max), |
| ): |
| x, y = _inv_transform(x, y, rot_mat) |
| xx.append(x) |
| yy.append(y) |
| x_max, x_min = min(max(xx), nw), max(min(xx), 0) |
| y_max, y_min = min(max(yy), nh), max(min(yy), 0) |
| gaze_x, gaze_y = _inv_transform(gaze_x, gaze_y, rot_mat) |
| width, height = nw, nh |
|
|
| if cond_lsj < self.rand_lsj: |
| nh, nw = int(height * lsj_scale), int(width * lsj_scale) |
| img = TF.resize(img, (nh, nw)) |
| img = ImageOps.expand(img, (0, 0, width - nw, height - nh)) |
| head_mask = TF.resize(head_mask, (nh, nw)) |
| head_mask = ImageOps.expand( |
| head_mask, (0, 0, width - nw, height - nh) |
| ) |
| x_min, y_min, x_max, y_max = ( |
| x_min * lsj_scale, |
| y_min * lsj_scale, |
| x_max * lsj_scale, |
| y_max * lsj_scale, |
| ) |
| gaze_x, gaze_y = gaze_x * lsj_scale, gaze_y * lsj_scale |
|
|
| head_channel = utils.get_head_box_channel( |
| x_min, |
| y_min, |
| x_max, |
| y_max, |
| width, |
| height, |
| resolution=self.input_size, |
| coordconv=False, |
| ).unsqueeze(0) |
|
|
| if self.is_train and self.mask_generator is not None: |
| image_mask = self.mask_generator( |
| x_min / width, |
| y_min / height, |
| x_max / width, |
| y_max / height, |
| head_channel, |
| ) |
| image_masks.append(image_mask) |
|
|
| if self.transform is not None: |
| img = self.transform(img) |
| head_mask = TF.to_tensor( |
| TF.resize(head_mask, (self.input_size, self.input_size)) |
| ) |
|
|
| if gaze_inside: |
| gaze_x /= float(width) |
| gaze_y /= float(height) |
| gaze_heatmap = torch.zeros( |
| self.output_size, self.output_size |
| ) |
| gaze_map = self.draw_labelmap( |
| gaze_heatmap, |
| [gaze_x * self.output_size, gaze_y * self.output_size], |
| 3, |
| type="Gaussian", |
| ) |
| gazes.append(torch.FloatTensor([gaze_x, gaze_y])) |
| else: |
| gaze_map = torch.zeros(self.output_size, self.output_size) |
| gazes.append(torch.FloatTensor([-1, -1])) |
| images.append(img) |
| head_channels.append(head_channel) |
| head_masks.append(head_mask) |
| heatmaps.append(gaze_map) |
| gaze_inouts.append(torch.FloatTensor([int(gaze_inside)])) |
|
|
| images = torch.stack(images) |
| head_channels = torch.stack(head_channels) |
| heatmaps = torch.stack(heatmaps) |
| gazes = torch.stack(gazes) |
| gaze_inouts = torch.stack(gaze_inouts) |
| head_masks = torch.stack(head_masks) |
| imsizes = torch.stack(imsizes) |
|
|
| out_dict = { |
| "images": images, |
| "head_channels": head_channels, |
| "heatmaps": heatmaps, |
| "gazes": gazes, |
| "gaze_inouts": gaze_inouts, |
| "head_masks": head_masks, |
| "imsize": imsizes, |
| } |
| if self.is_train and self.mask_generator is not None: |
| out_dict["image_masks"] = torch.stack(image_masks) |
| return out_dict |
|
|
| def __len__(self): |
| return self.length |
|
|
|
|
| def video_collate(batch): |
| keys = batch[0].keys() |
| return {key: torch.cat([item[key] for item in batch]) for key in keys} |
|
|