| import os |
| import sys |
| import numpy as np |
| import h5py |
| import scipy.io as spio |
| import nibabel as nib |
|
|
| import argparse |
| parser = argparse.ArgumentParser(description='Argument Parser') |
| parser.add_argument("-sub", "--sub",help="Subject Number",default=1) |
| args = parser.parse_args() |
| sub=int(args.sub) |
| assert sub in [1,2,5,7] |
|
|
| def loadmat(filename): |
| ''' |
| this function should be called instead of direct spio.loadmat |
| as it cures the problem of not properly recovering python dictionaries |
| from mat files. It calls the function check keys to cure all entries |
| which are still mat-objects |
| ''' |
| def _check_keys(d): |
| ''' |
| checks if entries in dictionary are mat-objects. If yes |
| todict is called to change them to nested dictionaries |
| ''' |
| for key in d: |
| if isinstance(d[key], spio.matlab.mio5_params.mat_struct): |
| d[key] = _todict(d[key]) |
| return d |
|
|
| def _todict(matobj): |
| ''' |
| A recursive function which constructs from matobjects nested dictionaries |
| ''' |
| d = {} |
| for strg in matobj._fieldnames: |
| elem = matobj.__dict__[strg] |
| if isinstance(elem, spio.matlab.mio5_params.mat_struct): |
| d[strg] = _todict(elem) |
| elif isinstance(elem, np.ndarray): |
| d[strg] = _tolist(elem) |
| else: |
| d[strg] = elem |
| return d |
|
|
| def _tolist(ndarray): |
| ''' |
| A recursive function which constructs lists from cellarrays |
| (which are loaded as numpy ndarrays), recursing into the elements |
| if they contain matobjects. |
| ''' |
| elem_list = [] |
| for sub_elem in ndarray: |
| if isinstance(sub_elem, spio.matlab.mio5_params.mat_struct): |
| elem_list.append(_todict(sub_elem)) |
| elif isinstance(sub_elem, np.ndarray): |
| elem_list.append(_tolist(sub_elem)) |
| else: |
| elem_list.append(sub_elem) |
| return elem_list |
| data = spio.loadmat(filename, struct_as_record=False, squeeze_me=True) |
| return _check_keys(data) |
|
|
|
|
|
|
| stim_order_f = 'nsddata/experiments/nsd/nsd_expdesign.mat' |
| stim_order = loadmat(stim_order_f) |
|
|
|
|
| |
|
|
| sig_train = {} |
| sig_test = {} |
| num_trials = 37*750 |
| for idx in range(num_trials): |
| ''' nsdId as in design csv files''' |
| nsdId = stim_order['subjectim'][sub-1, stim_order['masterordering'][idx] - 1] - 1 |
| if stim_order['masterordering'][idx]>1000: |
| if nsdId not in sig_train: |
| sig_train[nsdId] = [] |
| sig_train[nsdId].append(idx) |
| else: |
| if nsdId not in sig_test: |
| sig_test[nsdId] = [] |
| sig_test[nsdId].append(idx) |
|
|
|
|
| train_im_idx = list(sig_train.keys()) |
| test_im_idx = list(sig_test.keys()) |
|
|
|
|
| roi_dir = 'nsddata/ppdata/subj{:02d}/func1pt8mm/roi/'.format(sub) |
| betas_dir = 'nsddata_betas/ppdata/subj{:02d}/func1pt8mm/betas_fithrf_GLMdenoise_RR/'.format(sub) |
|
|
| mask_filename = 'nsdgeneral.nii.gz' |
| mask = nib.load(roi_dir+mask_filename).get_fdata() |
| num_voxel = mask[mask>0].shape[0] |
|
|
| fmri = np.zeros((num_trials, num_voxel)).astype(np.float32) |
| for i in range(37): |
| beta_filename = "betas_session{0:02d}.nii.gz".format(i+1) |
| beta_f = nib.load(betas_dir+beta_filename).get_fdata().astype(np.float32) |
| fmri[i*750:(i+1)*750] = beta_f[mask>0].transpose() |
| del beta_f |
| print(i) |
| |
| print("fMRI Data are loaded.") |
|
|
| f_stim = h5py.File('nsddata_stimuli/stimuli/nsd/nsd_stimuli.hdf5', 'r') |
| stim = f_stim['imgBrick'][:] |
|
|
| print("Stimuli are loaded.") |
|
|
| num_train, num_test = len(train_im_idx), len(test_im_idx) |
| vox_dim, im_dim, im_c = num_voxel, 425, 3 |
| fmri_array = np.zeros((num_train,vox_dim)) |
| stim_array = np.zeros((num_train,im_dim,im_dim,im_c)) |
| for i,idx in enumerate(train_im_idx): |
| stim_array[i] = stim[idx] |
| fmri_array[i] = fmri[sorted(sig_train[idx])].mean(0) |
| print(i) |
|
|
| np.save('processed_data/subj{:02d}/nsd_train_fmriavg_nsdgeneral_sub{}.npy'.format(sub,sub),fmri_array ) |
| np.save('processed_data/subj{:02d}/nsd_train_stim_sub{}.npy'.format(sub,sub),stim_array ) |
|
|
| print("Training data is saved.") |
|
|
| fmri_array = np.zeros((num_test,vox_dim)) |
| stim_array = np.zeros((num_test,im_dim,im_dim,im_c)) |
| for i,idx in enumerate(test_im_idx): |
| stim_array[i] = stim[idx] |
| fmri_array[i] = fmri[sorted(sig_test[idx])].mean(0) |
| print(i) |
|
|
| np.save('processed_data/subj{:02d}/nsd_test_fmriavg_nsdgeneral_sub{}.npy'.format(sub,sub),fmri_array ) |
| np.save('processed_data/subj{:02d}/nsd_test_stim_sub{}.npy'.format(sub,sub),stim_array ) |
|
|
| print("Test data is saved.") |
|
|
| annots_cur = np.load('annots/COCO_73k_annots_curated.npy') |
|
|
| captions_array = np.empty((num_train,5),dtype=annots_cur.dtype) |
| for i,idx in enumerate(train_im_idx): |
| captions_array[i,:] = annots_cur[idx,:] |
| print(i) |
| np.save('processed_data/subj{:02d}/nsd_train_cap_sub{}.npy'.format(sub,sub),captions_array ) |
| |
| captions_array = np.empty((num_test,5),dtype=annots_cur.dtype) |
| for i,idx in enumerate(test_im_idx): |
| captions_array[i,:] = annots_cur[idx,:] |
| print(i) |
| np.save('processed_data/subj{:02d}/nsd_test_cap_sub{}.npy'.format(sub,sub),captions_array ) |
|
|
| print("Caption data are saved.") |