| import os |
| import pickle |
| import matplotlib.pyplot as plt |
| import numpy as np |
| from keras.layers import Dense, Dropout, LeakyReLU |
| from keras.models import Sequential |
| from keras.optimizers import Adam |
| from numpy.random import randn |
| from sklearn.ensemble import RandomForestClassifier |
| from sklearn import metrics |
| from tqdm import tqdm |
|
|
|
|
| class GG(object): |
|
|
| def __init__(self, number_of_features, saved_models_path, learning_rate, dropout, alpha): |
| """ |
| The constructor for the General Generator class. |
| :param number_of_features: Number of features in the data. Used to determine the noise dimensions |
| :param saved_models_path: The folder where we save the models. |
| """ |
| self.saved_models_path = saved_models_path |
| self.number_of_features = number_of_features |
|
|
| self.generator_model = None |
| self.discriminator_model = RandomForestClassifier() |
| self.dropout = dropout |
| self.alpha = alpha |
| self.noise_dim = int(number_of_features / 2) |
| self.learning_rate = learning_rate |
| self.number_of_features = number_of_features |
| self.build_generator() |
| self.losses = {'gen_loss': [], 'dis_loss_pred': [], 'dis_loss_proba': []} |
| |
|
|
| def build_generator(self): |
| """ |
| This function creates the generator model for the GG. |
| We used a fairly simple MLP architecture. |
| :return: |
| """ |
|
|
| self.generator_model = Sequential() |
| self.generator_model.add(Dense(int(self.number_of_features * 2), input_shape=(self.noise_dim + 1, ))) |
| self.generator_model.add(LeakyReLU(alpha=self.alpha)) |
|
|
| self.generator_model.add(Dense(int(self.number_of_features * 4))) |
| self.generator_model.add(LeakyReLU(alpha=self.alpha)) |
| self.generator_model.add(Dropout(self.dropout)) |
|
|
| self.generator_model.add(Dense(int(self.number_of_features * 2))) |
| self.generator_model.add(LeakyReLU(alpha=self.alpha)) |
| self.generator_model.add(Dropout(self.dropout)) |
|
|
| self.generator_model.add(Dense(self.number_of_features, activation='sigmoid')) |
| optimizer = Adam(lr=self.learning_rate) |
| self.generator_model.compile(loss='categorical_crossentropy', optimizer=optimizer) |
|
|
| |
|
|
| def train_gg(self, x_train, y_train, epochs, batch_size, model_name, data, output_path, to_plot=False): |
| """ |
| This function running the training stage manually. |
| :param output_path: Path to save loss fig |
| :param to_plot: Plots the losses if True |
| :param x_train: the training set features |
| :param y_train: the training set classes |
| :param model_name: name of model to save (for generator) |
| :param epochs: number of epochs |
| :param batch_size: the batch size |
| :return: trains the discriminator and generator. |
| """ |
|
|
| losses_path = os.path.join(self.saved_models_path, f'{model_name}_losses') |
| model_file = os.path.join(self.saved_models_path, f'{model_name}_part_2_gen_weights.h5') |
| |
| |
| self.train_black_box_dis(x_train, y_train) |
| self.train_generator(x_train, model_file, epochs, batch_size, losses_path) |
| if to_plot: |
| self.plot_losses(data, output_path) |
|
|
| def train_black_box_dis(self, x_train, y_train): |
| """ |
| Trains the discriminator and saves it. |
| :param x_train: the training set features |
| :param y_train: the training set classes |
| :return: |
| """ |
| dis_output = os.path.join(self.saved_models_path, 'black_box_dis_model') |
|
|
| if os.path.exists(dis_output): |
| |
| with open(dis_output, 'rb') as rf_file: |
| self.discriminator_model = pickle.load(rf_file) |
|
|
| self.discriminator_model.fit(x_train, y_train) |
| with open(dis_output, 'wb') as rf_file: |
| pickle.dump(self.discriminator_model, rf_file) |
|
|
| def train_generator(self, data, model_path, epochs, start_batch_size, losses_path): |
| """ |
| Function for training the general generator. |
| :param losses_path: The filepath for the loss results |
| :param data: The normalized dataset |
| :param model_path: The name of the model to save. includes epoch size, batches etc. |
| :param epochs: Number of epochs |
| :param start_batch_size: Size of batch to use. |
| :return: trains the generator, saves it and the losses during training. |
| """ |
|
|
| if os.path.exists(model_path): |
| self.generator_model.load_weights(model_path) |
| with open(losses_path, 'rb') as loss_file: |
| self.losses = pickle.load(loss_file) |
| return |
|
|
| for epoch in range(epochs): |
| np.random.shuffle(data) |
| batch_size = start_batch_size |
| for i in tqdm(range(0, data.shape[0], batch_size), ascii=True): |
| if data.shape[0] - i >= batch_size: |
| batch_input = data[i:i + batch_size] |
| else: |
| batch_input = data[i:] |
| batch_size = batch_input.shape[0] |
|
|
| g_loss = self.train_generator_on_batch(batch_input) |
| self.losses['gen_loss'].append(g_loss) |
|
|
| self.save_generator_model(model_path, losses_path) |
|
|
| def save_generator_model(self, generator_model_path, losses_path): |
| """ |
| Saves the model and the loss data with pickle. |
| |
| :param generator_model_path: File path for the generator |
| :param losses_path: File path for the losses |
| :return: |
| """ |
| self.generator_model.save_weights(generator_model_path) |
| with open(losses_path, 'wb+') as loss_file: |
| pickle.dump(self.losses, loss_file) |
|
|
| def train_generator_on_batch(self, batch_input): |
| """ |
| Trains the generator for a single batch. Creates the necessary input, comprised of noise and the real |
| probabilities obtained from the black box. Compared to the target output, made of real samples and the |
| probabilities made up by the generator. |
| :param batch_input: |
| :return: |
| """ |
| batch_size = batch_input.shape[0] |
| discriminator_probabilities = self.discriminator_model.predict_proba(batch_input)[:, -1:] |
| |
|
|
| noise = randn(batch_size, self.noise_dim) |
| gen_model_input = np.hstack([noise, discriminator_probabilities]) |
| generated_probabilities = self.generator_model.predict(gen_model_input)[:, -1:] |
| target_output = np.hstack([batch_input, generated_probabilities]) |
| g_loss = self.generator_model.train_on_batch(gen_model_input, target_output) |
|
|
| return g_loss |
|
|
| def plot_discriminator_results(self, x_test, y_test, data, path): |
| """ |
| :param x_test: Test set |
| :param y_test: Test classes |
| :return: Prints the required plots. |
| """ |
|
|
| blackbox_probs = self.discriminator_model.predict_proba(x_test) |
| discriminator_predictions = self.discriminator_model.predict(x_test) |
| count_1 = int(np.sum(y_test)) |
| count_0 = int(y_test.shape[0] - count_1) |
| class_data = (['Class 0', 'Class 1'], [count_0, count_1]) |
| self.plot_data(class_data, path, mode='bar', x_title='Class', title=f'Distribution of classes - {data} dataset') |
| self.plot_data(blackbox_probs[:, 0], path, title=f'Probabilities for test set - class 0 - {data} dataset') |
| self.plot_data(blackbox_probs[:, 1], path, title=f'Probabilities for test set - class 1 - {data} dataset') |
|
|
| min_confidence = blackbox_probs[:, 0].min(), blackbox_probs[:, 1].min() |
| max_confidence = blackbox_probs[:, 0].max(), blackbox_probs[:, 1].max() |
| mean_confidence = blackbox_probs[:, 0].mean(), blackbox_probs[:, 1].mean() |
|
|
| print("Accuracy:", metrics.accuracy_score(y_test, discriminator_predictions)) |
| for c in [0, 1]: |
| print(f'Class {c} - Min confidence: {min_confidence[c]} - Max Confidence: {max_confidence[c]} - ' |
| f'Mean confidence: {mean_confidence[c]}') |
|
|
| def plot_generator_results(self, data, path, num_of_instances=1000): |
| """ |
| Creates plots for the generator results on 1000 instances. |
| :param path: |
| :param data: Name of dataset used. |
| :param num_of_instances: Number of samples to generate. |
| :return: |
| """ |
| sampled_proba, generated_instances = self.generate_n_samples(num_of_instances) |
|
|
| proba_fake = self.discriminator_model.predict_proba(generated_instances[:, :-1]) |
| for c in [0, 1]: |
| title = f'Confidence Score for Class {c} of Fake Samples - {data} dataset' |
| self.plot_data(proba_fake[:, c], path, x_title='Confidence Score', title=title) |
|
|
| black_box_confidence = proba_fake[:, 1:] |
| proba_error = np.abs(sampled_proba - black_box_confidence) |
| generated_classes = np.array([int(round(c)) for c in generated_instances[:, -1].tolist()]).reshape(1000, 1) |
| proba_stats = np.hstack([sampled_proba, generated_classes, proba_fake[:, :1], proba_fake[:, 1:], proba_error]) |
|
|
| for c in [0, 1]: |
| class_data = proba_stats[proba_stats[:, 1] == c] |
| class_data = class_data[class_data[:, 0].argsort()] |
| title = f'Error rate for different probabilities, class {c} - {data} dataset' |
| self.plot_data((class_data[:, 0], class_data[:, -1]), path, mode='plot', y_title='error rate', title=title) |
|
|
| def generate_n_samples(self, n): |
| """ |
| Functions for generating N samples with a uniformly distribution confidence level. |
| :param n: Number of samples |
| :return: a tuple of the confidence scores used and the samples created. |
| """ |
| noise = randn(n, self.noise_dim) |
| |
| confidences = np.random.uniform(0, 1, (n, 1)) |
|
|
| generator_input = np.hstack([noise, confidences]) |
| generated_instances = self.generator_model.predict(generator_input) |
|
|
| return confidences, generated_instances |
|
|
| @staticmethod |
| def plot_data(data, path, mode='hist', x_title='Probabilities', y_title='# of Instances', title='Distribution'): |
| """ |
| :param path: Path to save |
| :param mode: Mode to use |
| :param y_title: Title of y axis |
| :param x_title: Title of x axis |
| :param data: Data to plot |
| :param title: Title of plot |
| :return: Prints a plot |
| """ |
| plt.clf() |
|
|
| if mode == 'hist': |
| plt.hist(data) |
| elif mode == 'bar': |
| plt.bar(data[0], data[1]) |
| else: |
| plt.plot(data[0], data[1]) |
|
|
| plt.title(title) |
| plt.ylabel(y_title) |
| plt.xlabel(x_title) |
| |
| path = os.path.join(path, title) |
| plt.savefig(path) |
|
|
| def plot_losses(self, data, path): |
| """ |
| Plot the losses while training |
| :return: |
| """ |
| plt.clf() |
| plt.plot(self.losses['gen_loss']) |
| plt.title('Model loss') |
| plt.ylabel('Loss') |
| plt.xlabel('Iteration') |
| |
| plt.savefig(os.path.join(path, f'{data} dataset - general_generator_loss.png')) |
|
|
| def get_error(self, num_of_instances=1000): |
| """ |
| Calculates the error of the generator we created by measuring the difference between the probability that |
| was given as input and the probability of the discriminator on the sample created. |
| :param num_of_instances: Number of samples to generate. |
| :return: An array of errors. |
| """ |
| sampled_proba, generated_instances = self.generate_n_samples(num_of_instances) |
| proba_fake = self.discriminator_model.predict_proba(generated_instances[:, :-1]) |
| black_box_confidence = proba_fake[:, 1:] |
| return np.abs(sampled_proba - black_box_confidence) |
|
|
|
|