| import argparse |
| import json |
| import tqdm |
| import cv2 |
| import os |
| import numpy as np |
| from pycocotools import mask as mask_utils |
| import random |
| from PIL import Image |
| from natsort import natsorted |
| from pycocotools.mask import encode, decode, frPyObjects |
|
|
| EVALMODE = "test" |
|
|
|
|
| def blend_mask(input_img, binary_mask, alpha=0.5, color="g"): |
| if input_img.ndim == 2: |
| return input_img |
| mask_image = np.zeros(input_img.shape, np.uint8) |
| if color == "r": |
| mask_image[:, :, 0] = 255 |
| if color == "g": |
| mask_image[:, :, 1] = 255 |
| if color == "b": |
| mask_image[:, :, 2] = 255 |
| if color == "o": |
| mask_image[:, :, 0] = 255 |
| mask_image[:, :, 1] = 165 |
| mask_image[:, :, 2] = 0 |
| if color == "c": |
| mask_image[:, :, 0] = 0 |
| mask_image[:, :, 1] = 255 |
| mask_image[:, :, 2] = 255 |
| if color == "p": |
| mask_image[:, :, 0] = 128 |
| mask_image[:, :, 1] = 0 |
| mask_image[:, :, 2] = 128 |
| if color == "l": |
| mask_image[:, :, 0] = 128 |
| mask_image[:, :, 1] = 128 |
| mask_image[:, :, 2] = 0 |
| if color == "m": |
| mask_image[:, :, 0] = 128 |
| mask_image[:, :, 1] = 128 |
| mask_image[:, :, 2] = 128 |
| if color == "q": |
| mask_image[:, :, 0] = 165 |
| mask_image[:, :, 1] = 80 |
| mask_image[:, :, 2] = 30 |
| |
|
|
| mask_image = mask_image * np.repeat(binary_mask[:, :, np.newaxis], 3, axis=2) |
| blend_image = input_img[:, :, :].copy() |
| pos_idx = binary_mask > 0 |
| for ind in range(input_img.ndim): |
| ch_img1 = input_img[:, :, ind] |
| ch_img2 = mask_image[:, :, ind] |
| ch_img3 = blend_image[:, :, ind] |
| ch_img3[pos_idx] = alpha * ch_img1[pos_idx] + (1 - alpha) * ch_img2[pos_idx] |
| blend_image[:, :, ind] = ch_img3 |
| return blend_image |
|
|
|
|
| def upsample_mask(mask, frame): |
| H, W = frame.shape[:2] |
| mH, mW = mask.shape[:2] |
|
|
| if W > H: |
| ratio = mW / W |
| h = H * ratio |
| diff = int((mH - h) // 2) |
| if diff == 0: |
| mask = mask |
| else: |
| mask = mask[diff:-diff] |
| else: |
| ratio = mH / H |
| w = W * ratio |
| diff = int((mW - w) // 2) |
| if diff == 0: |
| mask = mask |
| else: |
| mask = mask[:, diff:-diff] |
|
|
| mask = cv2.resize(mask, (W, H)) |
| return mask |
|
|
|
|
| def downsample(mask, frame): |
| H, W = frame.shape[:2] |
| mH, mW = mask.shape[:2] |
|
|
| mask = cv2.resize(mask, (W, H)) |
| return mask |
|
|
|
|
| |
| |
| |
| |
| if __name__ == "__main__": |
|
|
| color = ['g', 'r', 'b', 'o', 'c', 'p', 'l', 'm', 'q'] |
| data_path = "/data/work-gcp-europe-west4-a/yuqian_fu/datasets/HANDAL" |
| output_path = "/data/work-gcp-europe-west4-a/yuqian_fu/datasets/HANDAL/Handal_vis_results_correct_last" |
| mask_base_path = "/data/work-gcp-europe-west4-a/yuqian_fu/datasets/HANDAL/predictions_handal_all" |
| json_path = "/data/work-gcp-europe-west4-a/yuqian_fu/datasets/HANDAL/handal_test_all_instruct_correct_videoname.json" |
|
|
|
|
| with open(json_path, "r") as fp: |
| datas = json.load(fp) |
| print(len(datas)) |
| |
| video_select = ["handal_dataset_fixed_joint_pliers"] |
| |
| for video_name in tqdm.tqdm(video_select): |
| |
| data_list = [] |
| for data in datas: |
| if data["video_name"] == video_name: |
| data_list.append(data) |
| print(len(data_list)) |
| data_list = random.sample(data_list, 100) |
| for data in data_list: |
| query_img = cv2.imread(os.path.join(data_path, data['first_frame_image'])) |
| target_img = cv2.imread(os.path.join(data_path, data['image'])) |
|
|
|
|
| |
| for i,ann in enumerate(data["anns"]): |
| mask = decode(ann["segmentation"]) |
| mask = downsample(mask, target_img) |
| out = blend_mask(target_img, mask, color=color[0]) |
| os.makedirs( |
| f"{output_path}/{video_name}/target_gt", |
| exist_ok=True, |
| ) |
| img_path1 = data['image'] |
| tmp_list = img_path1.split("/")[1:] |
| joined_path = os.path.join(*tmp_list) |
| |
| output_file_path = os.path.join(output_path, video_name, "target_gt", joined_path) |
| |
| dir_path = output_file_path.split("/")[:-1] |
| dir_path = "/".join(dir_path) |
| os.makedirs( |
| dir_path, |
| exist_ok=True,) |
| cv2.imwrite( |
| output_file_path, |
| out, |
| ) |
| |
|
|
| |
| for i,ann in enumerate(data["first_frame_anns"]): |
| mask = decode(ann["segmentation"]) |
| mask = downsample(mask, query_img) |
| out = blend_mask(query_img, mask, color=color[0]) |
| |
| |
| |
| |
| new_path_query = data['first_frame_image'].replace(f"{video_name}/", "") |
| output_file_path = os.path.join(output_path, video_name, "query_gt", new_path_query) |
| |
| |
| dir_path = output_file_path.split("/")[:-1] |
| dir_path = "/".join(dir_path) |
| os.makedirs( |
| dir_path, |
| exist_ok=True,) |
| cv2.imwrite( |
| output_file_path, |
| out, |
| ) |
|
|
|
|
| |
| mask_path = os.path.join(mask_base_path, data['image']) |
| |
| mask_path = mask_path.replace(".jpg", ".png") |
| mask = Image.open(mask_path) |
| mask = np.array(mask) |
| unique_instances = np.unique(mask) |
| unique_instances = unique_instances[unique_instances != 0] |
| if len(unique_instances) > 9: |
| continue |
| for i,instance_value in enumerate(unique_instances): |
| binary_mask = (mask == instance_value).astype(np.uint8) |
| binary_mask = cv2.resize(binary_mask, (target_img.shape[1], target_img.shape[0])) |
| try: |
| binary_mask = upsample_mask(binary_mask, target_img) |
| frame = blend_mask(target_img, binary_mask, color=color[i]) |
| except: |
| breakpoint() |
| |
| new_path_predict = data['image'].replace(f"{video_name}/", "") |
| |
| |
| output_file_path = os.path.join(output_path, video_name, "predict", new_path_predict) |
| dir_path = output_file_path.split("/")[:-1] |
| dir_path = "/".join(dir_path) |
| os.makedirs( |
| dir_path, |
| exist_ok=True,) |
| cv2.imwrite( |
| output_file_path, |
| frame, |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
|
|
|
|
| |
| |
| |
|
|
|
|
| |
|
|