| ''' |
| python3 split_data.py RegisteredImageFolderPath RegisteredLabelFolderPath |
| |
| Given the parameter as the path to the registered images, |
| function creates two folders in the base directory (same level as this script), randomly putting in |
| 70 percent of images into the train and 30 percent to the test |
| ''' |
| import os |
| import glob |
| import random |
| import shutil |
|
|
| from typing import Tuple |
| import numpy as np |
| from collections import OrderedDict |
| import json |
| import argparse |
|
|
|
|
| """ |
| creates a folder at a specified folder path if it does not exists |
| folder_path : relative path of the folder (from cur_dir) which needs to be created |
| over_write :(default: False) if True overwrite the existing folder |
| """ |
| def parse_command_line(): |
| print('---'*10) |
| print('Parsing Command Line Arguments') |
| parser = argparse.ArgumentParser( |
| description='pipeline for dataset split') |
| parser.add_argument('-bp', metavar='base path', type=str, |
| help="Absolute path of the base directory") |
| parser.add_argument('-ip', metavar='image path', type=str, |
| help="Relative path of the image directory") |
| parser.add_argument('-sp', metavar='segmentation path', type=str, |
| help="Relative path of the image directory") |
| parser.add_argument('-sl', metavar='segmentation information list', type=str, nargs='+', |
| help='a list of label name and corresponding value') |
| parser.add_argument('-ti', metavar='task id', type=int, |
| help='task id number') |
| parser.add_argument('-tn', metavar='task name', type=str, |
| help='task name') |
| parser.add_argument('-kf', metavar='k-fold validation', type=int, default=5, |
| help='k-fold validation') |
| argv = parser.parse_args() |
| return argv |
|
|
|
|
| def make_if_dont_exist(folder_path, overwrite=False): |
|
|
| if os.path.exists(folder_path): |
| if not overwrite: |
| print(f'{folder_path} exists.') |
| else: |
| print(f"{folder_path} overwritten") |
| shutil.rmtree(folder_path) |
| os.makedirs(folder_path) |
| else: |
| os.makedirs(folder_path) |
| print(f"{folder_path} created!") |
|
|
|
|
| def rename(location, oldname, newname): |
|
|
| os.rename(os.path.join(location, oldname), os.path.join(location, newname)) |
|
|
|
|
| def main(): |
| args = parse_command_line() |
| base = args.bp |
| reg_data_path = args.ip |
| lab_data_path = args.sp |
| task_id = args.ti |
| Name = args.tn |
| k_fold = args.kf |
| seg_list = args.sl |
| base_dir = "/home/ameen" |
| |
| nnunet_dir = "nnUNet/nnunet/nnUNet_raw_data_base/nnUNet_raw_data" |
| main_dir = os.path.join(base_dir, 'nnUNet/nnunet') |
| make_if_dont_exist(os.path.join(main_dir, 'nnUNet_preprocessed')) |
| make_if_dont_exist(os.path.join(main_dir, 'nnUNet_trained_models')) |
|
|
| os.environ['nnUNet_raw_data_base'] = os.path.join( |
| main_dir, 'nnUNet_raw_data_base') |
| os.environ['nnUNet_preprocessed'] = os.path.join( |
| main_dir, 'nnUNet_preprocessed') |
| os.environ['RESULTS_FOLDER'] = os.path.join( |
| main_dir, 'nnUNet_trained_models') |
|
|
| random.seed(19) |
| cur_path = os.getcwd() |
|
|
| image_list = glob.glob(os.path.join(base, reg_data_path) + "/*.nii.gz") |
| label_list = glob.glob(os.path.join(base, lab_data_path) + "/*.nii.gz") |
| num_images = len(image_list) |
| |
| num_each_fold = divmod(num_images, k_fold)[0] |
| fold_num = np.repeat(num_each_fold, k_fold) |
| num_remain = divmod(num_images, k_fold)[1] |
| count = 0 |
| while num_remain > 0: |
| fold_num[count] += 1 |
| count = (count+1) % 5 |
| num_remain -= 1 |
| |
| random.shuffle(image_list) |
| piece_data = {} |
| start_point = 0 |
| |
| for m in range(k_fold): |
| piece_data[f'fold_{m}'] = image_list[start_point:start_point+fold_num[m]] |
| start_point += fold_num[m] |
| |
| for j in range(k_fold): |
| task_name = f"Task0{task_id}_{Name}_fold{j}" |
| task_id += 1 |
| task_folder_name = os.path.join(base_dir, nnunet_dir, task_name) |
| train_image_dir = os.path.join(task_folder_name, 'imagesTr') |
| train_label_dir = os.path.join(task_folder_name, 'labelsTr') |
| test_dir = os.path.join(task_folder_name, 'imagesTs') |
|
|
| make_if_dont_exist(task_folder_name) |
| make_if_dont_exist(train_image_dir) |
| make_if_dont_exist(train_label_dir) |
| make_if_dont_exist(test_dir) |
| |
| num_test = fold_num[j] |
| num_train = np.sum(fold_num) - num_test |
| print("Number of training subjects: ", num_train, |
| "\nNumber of testing subjects:", num_test, "\nTotal:", num_images) |
| p = 0 |
| train_images = [] |
| |
| while p < len(piece_data): |
| if p !=j: |
| train_images.extend(piece_data[f'fold_{p}']) |
| p+=1 |
| |
| test_images = piece_data[f'fold_{j}'] |
| |
| for i in range(len(train_images)): |
| filename1 = os.path.basename(train_images[i]).split(".")[0] |
| number = ''.join(filter(lambda x: x.isdigit(), filename1)) |
| |
| shutil.copy(train_images[i], train_image_dir) |
| filename = os.path.basename(train_images[i]) |
| rename(train_image_dir, filename, Name + "_" + number + "_0000.nii.gz") |
|
|
| for label_dir in label_list: |
| if label_dir.endswith(os.path.basename(train_images[i])): |
| shutil.copy(label_dir, train_label_dir) |
| rename(train_label_dir, filename, Name + "_" + number + '.nii.gz') |
| break |
| |
| for i in range(len(test_images)): |
| |
| shutil.copy(test_images[i], test_dir) |
| filename = os.path.basename(test_images[i]) |
| filename1 = os.path.basename(test_images[i]).split(".")[0] |
| number = ''.join(filter(lambda x: x.isdigit(), filename1)) |
| rename(test_dir, filename, Name + "_" + number + "_0000.nii.gz") |
|
|
| |
| json_dict = OrderedDict() |
| json_dict['name'] = task_name |
| json_dict['description'] = Name |
| json_dict['tensorImageSize'] = "4D" |
| json_dict['reference'] = "MODIFY" |
| json_dict['licence'] = "MODIFY" |
| json_dict['release'] = "0.0" |
| json_dict['modality'] = { |
| "0": "CT" |
| } |
| json_dict['labels'] = { |
| "0": "background", |
| } |
| for i in range(0, len(seg_list), 2): |
| assert(seg_list[i].isdigit() == True) |
| assert(seg_list[i + 1].isdigit() == False) |
| json_dict['labels'].update({ |
| seg_list[i]: seg_list[i + 1] |
| }) |
| train_ids = os.listdir(train_image_dir) |
| test_ids = os.listdir(test_dir) |
| json_dict['numTraining'] = len(train_ids) |
| json_dict['numTest'] = len(test_ids) |
| json_dict['training'] = [{'image': "./imagesTr/%s" % (i[:i.find( |
| "_0000")]+'.nii.gz'), "label": "./labelsTr/%s" % (i[:i.find("_0000")]+'.nii.gz')} for i in train_ids] |
| json_dict['test'] = ["./imagesTs/%s" % |
| (i[:i.find("_0000")]+'.nii.gz') for i in test_ids] |
|
|
| with open(os.path.join(task_folder_name, "dataset.json"), 'w') as f: |
| json.dump(json_dict, f, indent=4, sort_keys=True) |
|
|
| if os.path.exists(os.path.join(task_folder_name, 'dataset.json')): |
| print("new json file created!") |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|