| import os |
| os.chdir("/data/public/GenNet") |
| import sys |
| import glob |
| import numpy as np |
| import pandas as pd |
| |
| import matplotlib |
| matplotlib.use('agg') |
| import tensorflow as tf |
| import tensorflow.keras as K |
| import scipy |
| import tables |
|
|
| tf.keras.backend.set_epsilon(0.0000001) |
| tf_version = tf.__version__ |
| if tf_version <= '1.13.1': |
| from GenNet_utils.LocallyDirectedConnected import LocallyDirected1D |
| print('= or less then 1.13.1: tensorflow version is', tf_version) |
| elif tf_version >= '2.0': |
| from GenNet_utils.LocallyDirectedConnected_tf2 import LocallyDirected1D |
| print('= or more then 2.0: tensorflow version is', tf_version) |
| else: |
| print("unexpected tensorflow version") |
| from GenNet_utils.LocallyDirectedConnected_tf2 import LocallyDirected1D |
|
|
| studyname = 'test_ImmGen' |
|
|
| def layer_block(model, mask, i, regression): |
| |
| if regression: |
| activation_type="relu" |
| else: |
| activation_type="tanh" |
| |
| model = LocallyDirected1D(mask=mask, filters=1, input_shape=(mask.shape[0], 1), |
| name="LocallyDirected_" + str(i))(model) |
| |
| model = K.layers.Activation(activation_type)(model) |
| model = K.layers.BatchNormalization(center=False, scale=False)(model) |
| return model |
|
|
|
|
| def add_covariates(model, input_cov, num_covariates, regression, negative_values_ytrain, mean_ytrain): |
| if num_covariates > 0: |
| model = activation_layer(model, regression, negative_values_ytrain) |
| model = K.layers.concatenate([model, input_cov], axis=1) |
| model = K.layers.BatchNormalization(center=False, scale=False)(model) |
| model = K.layers.Dense(units=1, bias_initializer= tf.keras.initializers.Constant(mean_ytrain))(model) |
| return model |
|
|
|
|
| def activation_layer(model, regression, negative_values_ytrain): |
| |
| if regression: |
| if negative_values_ytrain: |
| model = K.layers.Activation("linear")(model) |
| print('using a linear activation function') |
| else: |
| model = K.layers.Activation("relu")(model) |
| print('using a relu activation function') |
| else: |
| model = K.layers.Activation("sigmoid")(model) |
| |
| return model |
|
|
| def create_network_from_npz(datapath, |
| inputsize, |
| genotype_path, |
| l1_value=0.01, |
| regression=False, |
| num_covariates=0, |
| mask_order = []): |
| print("Creating networks from npz masks") |
| print("regression", regression) |
| if regression: |
| mean_ytrain, negative_values_ytrain = regression_properties(datapath) |
| else: |
| mean_ytrain = 0 |
| negative_values_ytrain = False |
|
|
| masks = [] |
| mask_shapes_x = [] |
| mask_shapes_y = [] |
|
|
| print(mask_order) |
|
|
| if len(mask_order) > 0: |
| for mask in mask_order: |
| mask = scipy.sparse.load_npz(datapath + '/'+str(mask)+'.npz') |
| masks.append(mask) |
| mask_shapes_x.append(mask.shape[0]) |
| mask_shapes_y.append(mask.shape[1]) |
|
|
| for x in range(len(masks) - 1): |
| assert mask_shapes_y[x] == mask_shapes_x[x + 1] |
| else: |
| |
| for npz_path in glob.glob(datapath + '/*.npz'): |
| mask = scipy.sparse.load_npz(npz_path) |
| masks.append(mask) |
| mask_shapes_x.append(mask.shape[0]) |
| mask_shapes_y.append(mask.shape[1]) |
|
|
| for i in range(len(masks)): |
| argsort_x = np.argsort(mask_shapes_x)[::-1] |
| argsort_y = np.argsort(mask_shapes_y)[::-1] |
|
|
| mask_shapes_x = np.array(mask_shapes_x) |
| mask_shapes_y = np.array(mask_shapes_y) |
| assert all(argsort_x == argsort_y) |
|
|
| masks = [masks[i] for i in argsort_y] |
| mask_shapes_x = mask_shapes_x[argsort_x] |
| mask_shapes_y = mask_shapes_y[argsort_y] |
|
|
| for x in range(len(masks) - 1): |
| assert mask_shapes_y[x] == mask_shapes_x[x + 1] |
| print('mask_shapes_x[0]', mask_shapes_x[0]) |
| assert mask_shapes_x[0] == inputsize |
| print('mask_shapes_y[-1]', mask_shapes_y[-1]) |
| if mask_shapes_y[-1] == 1: |
| all_masks_available = True |
| else: |
| all_masks_available = False |
|
|
| input_layer = K.Input((inputsize,), name='input_layer') |
| input_cov = K.Input((num_covariates,), name='inputs_cov') |
|
|
| model = K.layers.Reshape(input_shape=(inputsize,), target_shape=(inputsize, 1))(input_layer) |
| |
| for i in range(len(masks)): |
| mask = masks[i] |
| model = layer_block(model, mask, i, regression) |
|
|
| model = K.layers.Flatten()(model) |
|
|
| if all_masks_available: |
| model = LocallyDirected1D(mask=masks[-1], filters=1, input_shape=(mask.shape[0], 1), |
| name="output_layer")(model) |
| else: |
| model = K.layers.Dense(units=1, name="output_layer", |
| kernel_regularizer=tf.keras.regularizers.l1(l=l1_value) |
| )(model) |
|
|
| model = add_covariates(model, input_cov, num_covariates, regression, negative_values_ytrain, mean_ytrain) |
|
|
| output_layer = activation_layer(model, regression, negative_values_ytrain) |
| model = K.Model(inputs=[input_layer, input_cov], outputs=output_layer) |
|
|
| print(model.summary()) |
|
|
| return model, masks |
|
|
| def get_testdata(datapath): |
| |
| h5file = tables.open_file(datapath + studyname + '_genotype_processed.h5', "r") |
| |
| |
| xbatch = h5file.root.data[:] |
| |
| h5file.close() |
| return xbatch |
|
|
| def predict(): |
| xtest = get_testdata(datapath) |
| pred = model.predict(xtest) |
| print('model prediction: ', pred) |
|
|
| datapath = '/data/public/GenNet/processed_data/' |
| inputsize = 6986636 |
| num_covariates = 0 |
| genotype_path = datapath |
| l1_value = 0.001 |
| model, masks = create_network_from_npz(datapath=datapath, inputsize=inputsize, genotype_path=genotype_path,mask_order=['UKBB_sparse_connection_mask_ensmb_alligned','gene_ensmbl_ImmGen_mask_tstat'], |
| l1_value=l1_value, regression=False, num_covariates=num_covariates,) |
| model.load_weights(datapath + 'bestweight_job_Imm_CAD.h5') |
| print('weights have been loaded') |
| predict() |
|
|