| """ |
| TensorFlow 2.x reproduction of the EQL (Equation Learner) symbolic regression |
| architecture from: |
| "Symbolic regression for scientific discovery: an application to wind speed forecasting" |
| Abdellaoui & Mehrkanoon, arXiv:2102.10570 |
| """ |
| import os, sys, time, copy |
| import numpy as np |
| from collections import OrderedDict |
| from tqdm import tqdm |
| from scipy.io import loadmat |
| from sklearn.metrics import mean_squared_error, mean_absolute_error |
| import tensorflow as tf |
| import sympy as sp |
| import h5py |
| import matplotlib |
| matplotlib.use('Agg') |
| from matplotlib import pyplot as plt |
|
|
| |
| |
| |
| class BaseFunction: |
| def __init__(self, norm=1): |
| self.norm = norm |
| def sp(self, x): return None |
| def tf(self, x): |
| z = sp.symbols('z') |
| return sp.utilities.lambdify(z, self.sp(z), 'tensorflow')(x) |
| def np(self, x): |
| z = sp.symbols('z') |
| return sp.utilities.lambdify(z, self.sp(z), 'numpy')(x) |
| def name(self, x): return str(self.sp) |
|
|
| class Constant(BaseFunction): |
| def tf(self, x): return tf.ones_like(x) |
| def sp(self, x): return 1 |
| def np(self, x): return np.ones_like |
|
|
| class Identity(BaseFunction): |
| def tf(self, x): return tf.identity(x) / self.norm |
| def sp(self, x): return x / self.norm |
| def np(self, x): return np.array(x) / self.norm |
|
|
| class Square(BaseFunction): |
| def tf(self, x): return tf.square(x) / self.norm |
| def sp(self, x): return x ** 2 / self.norm |
| def np(self, x): return np.square(x) / self.norm |
|
|
| class Sin(BaseFunction): |
| def tf(self, x): return tf.sin(x * 4 * np.pi) / self.norm |
| def sp(self, x): return sp.sin(x * 4 * sp.pi) / self.norm |
| def np(self, x): return np.sin(x * 4 * np.pi) / self.norm |
|
|
| class Sigmoid(BaseFunction): |
| def sp(self, x): return 1 / (1 + sp.exp(-20 * x)) / self.norm |
| def np(self, x): return 1 / (1 + np.exp(-20 * x)) / self.norm |
| def name(self, x): return "sigmoid(x)" |
|
|
| class BaseFunction2: |
| def __init__(self, norm=1.0): self.norm = norm |
| def sp(self, x, y): return None |
| def tf(self, x, y): |
| a, b = sp.symbols('a b') |
| return sp.utilities.lambdify([a, b], self.sp(a, b), 'tensorflow')(x, y) |
| def np(self, x, y): |
| a, b = sp.symbols('a b') |
| return sp.utilities.lambdify([a, b], self.sp(a, b), 'numpy')(x, y) |
| def name(self, x, y): return str(self.sp) |
|
|
| class Product(BaseFunction2): |
| def __init__(self, norm=0.1): super().__init__(norm=norm) |
| def sp(self, x, y): return x * y / self.norm |
|
|
| def count_double(funcs): |
| return sum(1 for f in funcs if isinstance(f, BaseFunction2)) |
|
|
| def count_inputs(funcs): |
| i = 0 |
| for f in funcs: |
| if isinstance(f, BaseFunction): i += 1 |
| elif isinstance(f, BaseFunction2): i += 2 |
| return i |
|
|
| |
| |
| |
| class SymbolicLayer: |
| def __init__(self, funcs=None, initial_weight=None, variable=False, init_stddev=0.1): |
| if funcs is None: |
| funcs = default_func |
| self.initial_weight = initial_weight |
| self.W = None |
| self.built = False |
| if self.initial_weight is not None: |
| if not variable: |
| self.W = tf.Variable(self.initial_weight) |
| else: |
| self.W = self.initial_weight |
| self.built = True |
| self.output = None |
| self.init_stddev = init_stddev |
| self.n_funcs = len(funcs) |
| self.funcs = [f.tf for f in funcs] |
| self.n_double = count_double(funcs) |
| self.n_single = self.n_funcs - self.n_double |
| self.out_dim = self.n_funcs + self.n_double |
|
|
| def build(self, in_dim): |
| self.W = tf.Variable(tf.random.normal(shape=[in_dim, self.out_dim], stddev=self.init_stddev)) |
| self.built = True |
|
|
| def __call__(self, x): |
| if not self.built: |
| self.build(int(x.shape[1])) |
| g = tf.matmul(x, self.W) |
| self.output = [] |
| in_i, out_i = 0, 0 |
| while out_i < self.n_single: |
| self.output.append(self.funcs[out_i](g[:, in_i])) |
| in_i += 1 |
| out_i += 1 |
| while out_i < self.n_funcs: |
| self.output.append(self.funcs[out_i](g[:, in_i], g[:, in_i + 1])) |
| in_i += 2 |
| out_i += 1 |
| self.output = tf.stack(self.output, axis=1) |
| return self.output |
|
|
| def get_weight(self): return self.W |
| def set_weight(self, weight): self.W = weight |
|
|
|
|
| class SymbolicNet: |
| def __init__(self, symbolic_depth, funcs=None, initial_weights=None, |
| initial_bias=None, variable=False, init_stddev=0.1): |
| self.depth = symbolic_depth |
| self.funcs = funcs |
| self.shape = (None, 1) |
| if initial_weights is not None: |
| self.symbolic_layers = [ |
| SymbolicLayer(funcs=funcs, initial_weight=initial_weights[i], variable=variable) |
| for i in range(self.depth) |
| ] |
| if not variable: |
| self.output_weight = tf.Variable(initial_weights[-1]) |
| else: |
| self.output_weight = initial_weights[-1] |
| else: |
| if isinstance(init_stddev, list): |
| self.symbolic_layers = [SymbolicLayer(funcs=funcs, init_stddev=init_stddev[i]) for i in range(self.depth)] |
| else: |
| self.symbolic_layers = [SymbolicLayer(funcs=funcs, init_stddev=init_stddev) for _ in range(self.depth)] |
| self.output_weight = tf.Variable(tf.random.uniform(shape=(self.symbolic_layers[-1].n_funcs, 1))) |
|
|
| def build(self, input_dim): |
| in_dim = input_dim |
| for i in range(self.depth): |
| self.symbolic_layers[i].build(in_dim) |
| in_dim = self.symbolic_layers[i].n_funcs |
|
|
| def __call__(self, input): |
| self.shape = (int(input.shape[1]), 1) |
| h = input |
| for i in range(self.depth): |
| h = self.symbolic_layers[i](h) |
| h = tf.matmul(h, self.output_weight) |
| return h |
|
|
| def get_weights(self): |
| return [self.symbolic_layers[i].W for i in range(self.depth)] + [self.output_weight] |
|
|
| def set_weights(self, weights): |
| for i in range(self.depth): |
| self.symbolic_layers[i].W = weights[i] |
| self.output_weight = weights[self.depth] |
|
|
|
|
| |
| |
| |
| def l12_smooth(input_tensor, a=0.05): |
| if isinstance(input_tensor, list): |
| return sum(l12_smooth(t, a) for t in input_tensor) |
| smooth_abs = tf.where( |
| tf.abs(input_tensor) < a, |
| tf.pow(input_tensor, 4) / (-8 * a ** 3) + tf.square(input_tensor) * 3 / 4 / a + 3 * a / 8, |
| tf.abs(input_tensor) |
| ) |
| return tf.reduce_sum(tf.sqrt(smooth_abs)) |
|
|
| |
| |
| |
| def apply_activation(W, funcs, n_double=0): |
| W = sp.Matrix(W) |
| if n_double == 0: |
| for i in range(W.shape[0]): |
| for j in range(W.shape[1]): |
| W[i, j] = funcs[j](W[i, j]) |
| else: |
| W_new = W.copy() |
| out_size = len(funcs) |
| for i in range(W.shape[0]): |
| in_j, out_j = 0, 0 |
| while out_j < out_size - n_double: |
| W_new[i, out_j] = funcs[out_j](W[i, in_j]) |
| in_j += 1 |
| out_j += 1 |
| while out_j < out_size: |
| W_new[i, out_j] = funcs[out_j](W[i, in_j], W[i, in_j + 1]) |
| in_j += 2 |
| out_j += 1 |
| for _ in range(n_double): |
| W_new.col_del(-1) |
| W = W_new |
| return W |
|
|
|
|
| def sym_pp(W_list, funcs, var_names, threshold=1e-3, n_double=0): |
| vars = [sp.Symbol(v) if isinstance(v, str) else v for v in var_names] |
| expr = sp.Matrix(vars).T |
| for W in W_list: |
| W = filter_mat(sp.Matrix(W), threshold=threshold) |
| expr = expr * W |
| expr = apply_activation(expr, funcs, n_double=n_double) |
| return expr |
|
|
|
|
| def last_pp(eq, W): |
| return eq * filter_mat(sp.Matrix(W)) |
|
|
|
|
| def network(weights, funcs, var_names, threshold=1e-3): |
| n_double = count_double(funcs) |
| funcs = [f.sp for f in funcs] |
| expr = sym_pp(weights[:-1], funcs, var_names, threshold=threshold, n_double=n_double) |
| expr = last_pp(expr, weights[-1]) |
| expr = expr[0, 0] |
| return expr |
|
|
|
|
| def filter_mat(mat, threshold=0.01): |
| for i in range(mat.shape[0]): |
| for j in range(mat.shape[1]): |
| if abs(mat[i, j]) < threshold: |
| mat[i, j] = 0 |
| return mat |
|
|
| |
| |
| |
| def tensor_to_matrix(tensor): |
| number_features = tensor.shape[0] |
| number_cities = tensor.shape[1] |
| len_dates = tensor.shape[2] |
| matrix_for_scaling = np.ones((len_dates, number_cities * number_features)) |
| for i in range(number_cities): |
| for j in range(len_dates): |
| features_city = tensor[:, i, j] |
| matrix_for_scaling[j, i * number_features:(i + 1) * number_features] = features_city |
| return matrix_for_scaling |
|
|
|
|
| def get_x_right_format(phase, steps_ahead, feature): |
| steps_ahead_to_step = {6: "1", 12: "2", 18: "3", 24: "4"} |
| filename = "{}/step{}.mat".format(feature, steps_ahead_to_step[steps_ahead]) |
| data = loadmat('Denmark_data/{}'.format(filename)) |
| x = data["Xtr"] if phase == "train" else data["Xtest"] |
| x = np.transpose(x, (0, 3, 2, 1)) |
| x = np.transpose(x, (0, 1, 3, 2)) |
| all_features_all_cities = x.shape[1] * x.shape[2] |
| x_output = np.zeros((x.shape[0], 80)) |
| for i in range(x.shape[0]): |
| temp_matrix = tensor_to_matrix(x[i]) |
| for j in range(temp_matrix.shape[0]): |
| x_output[i, j * all_features_all_cities:(j + 1) * all_features_all_cities] = temp_matrix[j] |
| return x_output |
|
|
|
|
| def generate_all_data(phase, steps_ahead, feature, city_index): |
| steps_ahead_to_step = {6: "1", 12: "2", 18: "3", 24: "4"} |
| filename = "{}/step{}.mat".format(feature, steps_ahead_to_step[steps_ahead]) |
| data = loadmat('Denmark_data/{}'.format(filename)) |
| if phase == "train": |
| y_temp = data["Ytr"][:, city_index] |
| elif phase == "test": |
| y_temp = data["Ytest"][:, city_index] |
| else: |
| return None, None |
| x = get_x_right_format(phase, steps_ahead, feature).astype("float32") |
| y = y_temp.reshape(y_temp.shape[0], 1).astype('float32') |
| return x, y |
|
|
|
|
| def generate_variable_list(alphabet_size, num_variables): |
| lower_case = [chr(i + 97) for i in range(26)] |
| variables = lower_case[:alphabet_size] |
| prefix_index = 0 |
| letter_index = 0 |
| prefix = variables[0] |
| for i in range(num_variables): |
| if (i + 1) % alphabet_size == 0: |
| variables.append(prefix + lower_case[letter_index]) |
| prefix_index += 1 |
| prefix = variables[prefix_index] |
| letter_index = 0 |
| else: |
| variables.append(prefix + lower_case[letter_index]) |
| letter_index += 1 |
| return variables |
|
|
|
|
| |
| |
| |
| def create_experiment_folder(experiment_number): |
| os.makedirs("ExperimentsSR/Experiment{}".format(experiment_number), exist_ok=True) |
|
|
|
|
| def get_experiment_number(): |
| if not os.path.isdir("ExperimentsSR"): |
| return 1 |
| folders = os.listdir("ExperimentsSR/") |
| nums = [] |
| import re |
| for f in folders: |
| n = re.findall(r'\d+', f) |
| if n: |
| nums.append(int(n[0])) |
| return max(nums) + 1 if nums else 1 |
|
|
|
|
| def create_summary_file(experiment_number): |
| open("ExperimentsSR/Experiment{}/summary_experiment{}.txt".format(experiment_number, experiment_number), "w").close() |
|
|
|
|
| def get_folders_started(): |
| os.makedirs("ExperimentsSR", exist_ok=True) |
| experiment_number = get_experiment_number() |
| create_experiment_folder(experiment_number) |
| create_summary_file(experiment_number) |
| return experiment_number |
|
|
|
|
| def record_base_info(experiment_number, **config): |
| with open("ExperimentsSR/Experiment{}/summary_experiment{}.txt".format(experiment_number, experiment_number), "a+") as f: |
| for k, v in config.items(): |
| f.write("{}: {}\n".format(k, v)) |
|
|
|
|
| def append_text_to_summary(experiment_number, text): |
| with open("ExperimentsSR/Experiment{}/summary_experiment{}.txt".format(experiment_number, experiment_number), "a+") as f: |
| f.write(text) |
|
|
|
|
| def plot_train_vs_validation(experiment_number, num_epochs, train_loss, validation_loss, phase): |
| x = range(len(train_loss)) |
| plt.plot(x, train_loss, label="Training") |
| plt.plot(x, validation_loss, label="Validation") |
| plt.ylabel("MSE") |
| plt.xlabel("Epochs") |
| plt.title(phase) |
| plt.legend() |
| plt.savefig("ExperimentsSR/Experiment{}/train_vs_validation_{}.jpg".format(experiment_number, phase)) |
| plt.clf() |
|
|
|
|
| def plot_histogram(experiment_number, weights, phase, type_weights, a): |
| plt.hist(weights) |
| plt.title(phase + ", " + type_weights + ", a:{}".format(a)) |
| plt.savefig("ExperimentsSR/Experiment{}/hist_{}_{}.jpg".format(experiment_number, phase, type_weights)) |
| plt.clf() |
|
|
|
|
| def plot_descaled_real_vs_prediction(experiment_number, y_real, y_predicted, y_min, y_max): |
| yp = y_predicted.numpy() if hasattr(y_predicted, 'numpy') else np.array(y_predicted) |
| yr = y_real.numpy() if hasattr(y_real, 'numpy') else np.array(y_real) |
| y_predicted_rescaled = yp * (y_max - y_min) + y_min |
| y_test_rescaled = yr * (y_max - y_min) + y_min |
| mse = mean_squared_error(y_predicted_rescaled, y_test_rescaled) |
| mae = mean_absolute_error(y_predicted_rescaled, y_test_rescaled) |
| plt.figure(figsize=(10, 8)) |
| plt.plot(y_test_rescaled[:2000], label="Real") |
| plt.plot(y_predicted_rescaled[:2000], label="Prediction from formula") |
| plt.legend() |
| plt.title("MAE: {:.2e}, MSE: {:.2e}".format(mae, mse)) |
| plt.savefig("ExperimentsSR/Experiment{}/real_vs_prediction_phase1.jpg".format(experiment_number)) |
| plt.clf() |
| return mae |
|
|
|
|
| |
| |
| |
| def save_weights(weights, experiment_number, phase, best_val_loss): |
| if phase == "phase1": |
| filename = "ExperimentsSR/Experiment{}/nt_val_weights_{:.2e}.hdf5".format(experiment_number, best_val_loss) |
| elif phase == "phase2": |
| filename = "ExperimentsSR/Experiment{}/phase2_val_weights_{:.2e}.hdf5".format(experiment_number, best_val_loss) |
| with h5py.File(filename, "w") as f: |
| for i in range(len(weights)): |
| w = weights[i] |
| if hasattr(w, 'numpy'): |
| w = w.numpy() |
| f.create_dataset('dataset{}'.format(i), data=w) |
|
|
|
|
| def load_weights(filename): |
| weights = [] |
| with h5py.File(filename, "r") as f: |
| for i in range(3): |
| weights.append(f.get('dataset{}'.format(i))[()]) |
| return weights |
|
|