| |
|
|
| import os |
| from pathlib import Path |
| from datetime import datetime |
| import shutil |
| from collections import Counter |
| import warnings |
| import json |
|
|
| import matplotlib.pyplot as plt |
| import pandas as pd |
| import numpy as np |
| import cv2 as cv |
| from tqdm import tqdm |
| from ensemble_boxes import weighted_boxes_fusion |
|
|
| warnings.filterwarnings("ignore", category=UserWarning) |
|
|
| def plot_img(img, size=(18, 18), is_rgb=True, title="", cmap='gray'): |
| plt.figure(figsize=size) |
| plt.imshow(img, cmap=cmap) |
| plt.suptitle(title) |
| plt.show() |
|
|
| def plot_imgs(imgs, cols=2, size=10, is_rgb=True, title="", cmap='gray', img_size=None): |
| rows = len(imgs)//cols + 1 |
| fig = plt.figure(figsize=(cols*size, rows*size)) |
| for i, img in enumerate(imgs): |
| if img_size is not None: |
| img = cv.resize(img, img_size) |
| fig.add_subplot(rows, cols, i+1) |
| plt.imshow(img, cmap=cmap) |
| plt.suptitle(title) |
| |
| def draw_bbox(image, box, label, color, thickness=3): |
| alpha = 0.1 |
| alpha_box = 0.4 |
| overlay_bbox = image.copy() |
| overlay_text = image.copy() |
| output = image.copy() |
|
|
| text_width, text_height = cv.getTextSize(label.upper(), cv.FONT_HERSHEY_SIMPLEX, 0.6, 1)[0] |
| cv.rectangle(overlay_bbox, (box[0], box[1]), (box[2], box[3]), |
| color, -1) |
| cv.addWeighted(overlay_bbox, alpha, output, 1 - alpha, 0, output) |
| cv.rectangle(overlay_text, (box[0], box[1]-7-text_height), (box[0]+text_width+2, box[1]), |
| (0, 0, 0), -1) |
| cv.addWeighted(overlay_text, alpha_box, output, 1 - alpha_box, 0, output) |
| cv.rectangle(output, (box[0], box[1]), (box[2], box[3]), |
| color, thickness) |
| cv.putText(output, label.upper(), (box[0], box[1]-5), |
| cv.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1, cv.LINE_AA) |
| return output |
|
|
| def normalize_bboxes(df): |
| df['x_min'] = df.apply(lambda row: (row.x_min)/row.width * 512, axis =1) |
| df['y_min'] = df.apply(lambda row: (row.y_min)/row.height * 512, axis =1) |
|
|
| df['x_max'] = df.apply(lambda row: (row.x_max)/row.width * 512, axis =1) |
| df['y_max'] = df.apply(lambda row: (row.y_max)/row.height * 512, axis =1) |
|
|
| df['x_mid'] = df.apply(lambda row: (row.x_max+row.x_min)/2 * 512, axis =1) |
| df['y_mid'] = df.apply(lambda row: (row.y_max+row.y_min)/2 * 512, axis =1) |
|
|
| df['w'] = df.apply(lambda row: (row.x_max-row.x_min), axis =1) |
| df['h'] = df.apply(lambda row: (row.y_max-row.y_min), axis =1) |
|
|
| df['area'] = df['w']*df['h'] |
| return df |
|
|
|
|
| labels = [ |
| "__ignore__", |
| "Aortic_enlargement", |
| "Atelectasis", |
| "Calcification", |
| "Cardiomegaly", |
| "Consolidation", |
| "ILD", |
| "Infiltration", |
| "Lung_Opacity", |
| "Nodule/Mass", |
| "Other_lesion", |
| "Pleural_effusion", |
| "Pleural_thickening", |
| "Pneumothorax", |
| "Pulmonary_fibrosis" |
| ] |
|
|
| label2color = [[59, 238, 119], [222, 21, 229], [94, 49, 164], [206, 221, 133], [117, 75, 3], |
| [210, 224, 119], [211, 176, 166], [63, 7, 197], [102, 65, 77], [194, 134, 175], |
| [209, 219, 50], [255, 44, 47], [89, 125, 149], [110, 27, 100]] |
|
|
| viz_labels = labels[1:] |
|
|
| now = datetime.now() |
|
|
| data = dict( |
| info=dict( |
| description=None, |
| url=None, |
| version=None, |
| year=now.year, |
| contributor=None, |
| date_created=now.strftime('%Y-%m-%d %H:%M:%S.%f'), |
| ), |
| licenses=[dict( |
| url=None, |
| id=0, |
| name=None, |
| )], |
| images=[], |
| type='instances', |
| annotations=[], |
| categories=[], |
| ) |
|
|
| class_name_to_id = {} |
| for i, each_label in enumerate(labels): |
| class_id = i - 1 |
| class_name = each_label |
| if class_id == -1: |
| assert class_name == '__ignore__' |
| continue |
| class_name_to_id[class_name] = class_id |
| data['categories'].append(dict( |
| supercategory=None, |
| id=class_id, |
| name=class_name, |
| )) |
|
|
| train_out_dir = 'data/train' |
| valid_out_dir = 'data/valid' |
| test_out_dir = 'data/test' |
|
|
| for dir in [train_out_dir, valid_out_dir, test_out_dir]: |
| if Path(dir).exists(): |
| shutil.rmtree(dir) |
| os.makedirs(dir) |
|
|
| train_out_file = 'data/train_annotations.json' |
| valid_out_file = 'data/valid_annotations.json' |
| test_out_file = 'data/test_annotations.json' |
|
|
| all_images_folder = 'vinbigdata/train' |
| all_files = os.listdir(all_images_folder) |
| all_files = np.sort(np.array(all_files)) |
|
|
| data_train = data.copy() |
| data_valid = data.copy() |
| data_test = data.copy() |
|
|
| for data in [data_train, data_valid, data_test]: |
| data['images'] = [] |
| data['annotations'] = [] |
|
|
| all_annotations = pd.read_csv('vinbigdata/train.csv') |
| all_annotations = all_annotations[all_annotations.class_id != 14] |
| all_annotations['image_path'] = all_annotations['image_id'].map(lambda id: |
| os.path.join(all_images_folder, str(id) + '.png')) |
| normalize_bboxes(all_annotations) |
| all_image_paths = all_annotations['image_path'].unique() |
|
|
| np.random.seed(1) |
|
|
| indices = np.arange(len(all_image_paths)) |
| np.random.shuffle(indices) |
|
|
| |
| splits = [0.7, 0.1, 0.2] |
|
|
| train_split_index = int(splits[0] * len(indices)) |
| valid_split_index = int((splits[0] + splits[1]) * len(indices)) |
|
|
| train_paths = all_image_paths[:train_split_index] |
| valid_paths = all_image_paths[train_split_index:valid_split_index] |
| test_paths = all_image_paths[valid_split_index:] |
|
|
| print(f'train: {len(train_paths)}, test: {len(test_paths)}, valid: {len(valid_paths)}') |
|
|
| folders = [train_out_dir, valid_out_dir, test_out_dir] |
| paths = [train_paths, valid_paths, test_paths] |
| data_dicts = [data_train, data_valid, data_test] |
| out_files = [train_out_file, valid_out_file, test_out_file] |
|
|
| |
| iou_thr = 0.2 |
| skip_box_thr = 0.0001 |
|
|
| for (folder, paths, data, out_file) in zip(folders, paths, data_dicts, out_files): |
| print(f'Saving to {folder}...') |
|
|
| viz_images = [] |
|
|
| for i, path in tqdm(enumerate(paths)): |
| img_array = cv.imread(path) |
| image_basename = Path(path).stem |
| shutil.copy2(path, folder) |
| |
| |
| data['images'].append(dict( |
| license=0, |
| url=None, |
| file_name=os.path.join(folder.split('/')[-1], image_basename+ '.png'), |
| height=img_array.shape[0], |
| width=img_array.shape[1], |
| date_captured=None, |
| id=i |
| )) |
| |
| img_annotations = all_annotations[all_annotations.image_id==image_basename] |
| boxes_viz = img_annotations[['x_min', 'y_min', 'x_max', 'y_max']].to_numpy().tolist() |
| labels_viz = img_annotations['class_id'].to_numpy().tolist() |
| |
| |
| if (i%500==0): |
| img_before = img_array.copy() |
| for box, label in zip(boxes_viz, labels_viz): |
| x_min, y_min, x_max, y_max = (box[0], box[1], box[2], box[3]) |
| color = label2color[int(label)] |
| img_before = draw_bbox(img_before, list(np.int_(box)), viz_labels[label], color) |
| viz_images.append(img_before) |
| |
| boxes_list = [] |
| scores_list = [] |
| labels_list = [] |
| weights = [] |
| |
| boxes_single = [] |
| labels_single = [] |
|
|
| cls_ids = img_annotations['class_id'].unique().tolist() |
| |
| count_dict = Counter(img_annotations['class_id'].tolist()) |
|
|
| for cid in cls_ids: |
| |
| if count_dict[cid]==1: |
| labels_single.append(cid) |
| boxes_single.append(img_annotations[img_annotations.class_id==cid][['x_min', 'y_min', 'x_max', 'y_max']].to_numpy().squeeze().tolist()) |
|
|
| else: |
| cls_list =img_annotations[img_annotations.class_id==cid]['class_id'].tolist() |
| labels_list.append(cls_list) |
| bbox = img_annotations[img_annotations.class_id==cid][['x_min', 'y_min', 'x_max', 'y_max']].to_numpy() |
| |
| |
| bbox = bbox/(img_array.shape[1], img_array.shape[0], img_array.shape[1], img_array.shape[0]) |
| bbox = np.clip(bbox, 0, 1) |
| boxes_list.append(bbox.tolist()) |
| scores_list.append(np.ones(len(cls_list)).tolist()) |
| weights.append(1) |
| |
| |
| boxes, scores, box_labels = weighted_boxes_fusion(boxes_list=boxes_list, scores_list=scores_list, |
| labels_list=labels_list, weights=weights, |
| iou_thr=iou_thr, skip_box_thr=skip_box_thr) |
| |
| boxes = boxes*(img_array.shape[1], img_array.shape[0], img_array.shape[1], img_array.shape[0]) |
| boxes = boxes.round(1).tolist() |
| box_labels = box_labels.astype(int).tolist() |
| boxes.extend(boxes_single) |
| box_labels.extend(labels_single) |
| |
| for box, label in zip(boxes, box_labels): |
| x_min, y_min, x_max, y_max = (box[0], box[1], box[2], box[3]) |
| area = round((x_max-x_min)*(y_max-y_min),1) |
| bbox =[ |
| round(x_min, 1), |
| round(y_min, 1), |
| round((x_max-x_min), 1), |
| round((y_max-y_min), 1) |
| ] |
| |
| data['annotations'].append(dict( id=len(data['annotations']), image_id=i, |
| category_id=int(label), area=area, bbox=bbox, |
| iscrowd=0)) |
| |
| |
| if (i%500==0): |
| img_after = img_array.copy() |
| for box, label in zip(boxes, box_labels): |
| color = label2color[int(label)] |
| img_after = draw_bbox(img_after, list(np.int_(box)), viz_labels[label], color) |
| viz_images.append(img_after) |
|
|
| plot_imgs(viz_images, cmap=None, size=40) |
| plt.figtext(0.3, 0.9,"Original Bboxes", va="top", ha="center", size=15) |
| plt.figtext(0.73, 0.9,"WBF", va="top", ha="center", size=15) |
| plt.show() |
| |
| with open(out_file, 'w') as f: |
| json.dump(data, f, indent=4) |
|
|