""" TensorFlow 2.x reproduction of the EQL (Equation Learner) symbolic regression architecture from: "Symbolic regression for scientific discovery: an application to wind speed forecasting" Abdellaoui & Mehrkanoon, arXiv:2102.10570 """ import os, sys, time, copy import numpy as np from collections import OrderedDict from tqdm import tqdm from scipy.io import loadmat from sklearn.metrics import mean_squared_error, mean_absolute_error import tensorflow as tf import sympy as sp import h5py import matplotlib matplotlib.use('Agg') from matplotlib import pyplot as plt # --------------------------------------------------------------------------- # 1. Primitive functions # --------------------------------------------------------------------------- class BaseFunction: def __init__(self, norm=1): self.norm = norm def sp(self, x): return None def tf(self, x): z = sp.symbols('z') return sp.utilities.lambdify(z, self.sp(z), 'tensorflow')(x) def np(self, x): z = sp.symbols('z') return sp.utilities.lambdify(z, self.sp(z), 'numpy')(x) def name(self, x): return str(self.sp) class Constant(BaseFunction): def tf(self, x): return tf.ones_like(x) def sp(self, x): return 1 def np(self, x): return np.ones_like class Identity(BaseFunction): def tf(self, x): return tf.identity(x) / self.norm def sp(self, x): return x / self.norm def np(self, x): return np.array(x) / self.norm class Square(BaseFunction): def tf(self, x): return tf.square(x) / self.norm def sp(self, x): return x ** 2 / self.norm def np(self, x): return np.square(x) / self.norm class Sin(BaseFunction): def tf(self, x): return tf.sin(x * 4 * np.pi) / self.norm def sp(self, x): return sp.sin(x * 4 * sp.pi) / self.norm def np(self, x): return np.sin(x * 4 * np.pi) / self.norm class Sigmoid(BaseFunction): def sp(self, x): return 1 / (1 + sp.exp(-20 * x)) / self.norm def np(self, x): return 1 / (1 + np.exp(-20 * x)) / self.norm def name(self, x): return "sigmoid(x)" class BaseFunction2: def __init__(self, norm=1.0): self.norm = norm def sp(self, x, y): return None def tf(self, x, y): a, b = sp.symbols('a b') return sp.utilities.lambdify([a, b], self.sp(a, b), 'tensorflow')(x, y) def np(self, x, y): a, b = sp.symbols('a b') return sp.utilities.lambdify([a, b], self.sp(a, b), 'numpy')(x, y) def name(self, x, y): return str(self.sp) class Product(BaseFunction2): def __init__(self, norm=0.1): super().__init__(norm=norm) def sp(self, x, y): return x * y / self.norm def count_double(funcs): return sum(1 for f in funcs if isinstance(f, BaseFunction2)) def count_inputs(funcs): i = 0 for f in funcs: if isinstance(f, BaseFunction): i += 1 elif isinstance(f, BaseFunction2): i += 2 return i # --------------------------------------------------------------------------- # 2. Symbolic network # --------------------------------------------------------------------------- class SymbolicLayer: def __init__(self, funcs=None, initial_weight=None, variable=False, init_stddev=0.1): if funcs is None: funcs = default_func self.initial_weight = initial_weight self.W = None self.built = False if self.initial_weight is not None: if not variable: self.W = tf.Variable(self.initial_weight) else: self.W = self.initial_weight self.built = True self.output = None self.init_stddev = init_stddev self.n_funcs = len(funcs) self.funcs = [f.tf for f in funcs] self.n_double = count_double(funcs) self.n_single = self.n_funcs - self.n_double self.out_dim = self.n_funcs + self.n_double def build(self, in_dim): self.W = tf.Variable(tf.random.normal(shape=[in_dim, self.out_dim], stddev=self.init_stddev)) self.built = True def __call__(self, x): if not self.built: self.build(int(x.shape[1])) g = tf.matmul(x, self.W) self.output = [] in_i, out_i = 0, 0 while out_i < self.n_single: self.output.append(self.funcs[out_i](g[:, in_i])) in_i += 1 out_i += 1 while out_i < self.n_funcs: self.output.append(self.funcs[out_i](g[:, in_i], g[:, in_i + 1])) in_i += 2 out_i += 1 self.output = tf.stack(self.output, axis=1) return self.output def get_weight(self): return self.W def set_weight(self, weight): self.W = weight class SymbolicNet: def __init__(self, symbolic_depth, funcs=None, initial_weights=None, initial_bias=None, variable=False, init_stddev=0.1): self.depth = symbolic_depth self.funcs = funcs self.shape = (None, 1) if initial_weights is not None: self.symbolic_layers = [ SymbolicLayer(funcs=funcs, initial_weight=initial_weights[i], variable=variable) for i in range(self.depth) ] if not variable: self.output_weight = tf.Variable(initial_weights[-1]) else: self.output_weight = initial_weights[-1] else: if isinstance(init_stddev, list): self.symbolic_layers = [SymbolicLayer(funcs=funcs, init_stddev=init_stddev[i]) for i in range(self.depth)] else: self.symbolic_layers = [SymbolicLayer(funcs=funcs, init_stddev=init_stddev) for _ in range(self.depth)] self.output_weight = tf.Variable(tf.random.uniform(shape=(self.symbolic_layers[-1].n_funcs, 1))) def build(self, input_dim): in_dim = input_dim for i in range(self.depth): self.symbolic_layers[i].build(in_dim) in_dim = self.symbolic_layers[i].n_funcs def __call__(self, input): self.shape = (int(input.shape[1]), 1) h = input for i in range(self.depth): h = self.symbolic_layers[i](h) h = tf.matmul(h, self.output_weight) return h def get_weights(self): return [self.symbolic_layers[i].W for i in range(self.depth)] + [self.output_weight] def set_weights(self, weights): for i in range(self.depth): self.symbolic_layers[i].W = weights[i] self.output_weight = weights[self.depth] # --------------------------------------------------------------------------- # 3. Regularization # --------------------------------------------------------------------------- def l12_smooth(input_tensor, a=0.05): if isinstance(input_tensor, list): return sum(l12_smooth(t, a) for t in input_tensor) smooth_abs = tf.where( tf.abs(input_tensor) < a, tf.pow(input_tensor, 4) / (-8 * a ** 3) + tf.square(input_tensor) * 3 / 4 / a + 3 * a / 8, tf.abs(input_tensor) ) return tf.reduce_sum(tf.sqrt(smooth_abs)) # --------------------------------------------------------------------------- # 4. Pretty print # --------------------------------------------------------------------------- def apply_activation(W, funcs, n_double=0): W = sp.Matrix(W) if n_double == 0: for i in range(W.shape[0]): for j in range(W.shape[1]): W[i, j] = funcs[j](W[i, j]) else: W_new = W.copy() out_size = len(funcs) for i in range(W.shape[0]): in_j, out_j = 0, 0 while out_j < out_size - n_double: W_new[i, out_j] = funcs[out_j](W[i, in_j]) in_j += 1 out_j += 1 while out_j < out_size: W_new[i, out_j] = funcs[out_j](W[i, in_j], W[i, in_j + 1]) in_j += 2 out_j += 1 for _ in range(n_double): W_new.col_del(-1) W = W_new return W def sym_pp(W_list, funcs, var_names, threshold=1e-3, n_double=0): vars = [sp.Symbol(v) if isinstance(v, str) else v for v in var_names] expr = sp.Matrix(vars).T for W in W_list: W = filter_mat(sp.Matrix(W), threshold=threshold) expr = expr * W expr = apply_activation(expr, funcs, n_double=n_double) return expr def last_pp(eq, W): return eq * filter_mat(sp.Matrix(W)) def network(weights, funcs, var_names, threshold=1e-3): n_double = count_double(funcs) funcs = [f.sp for f in funcs] expr = sym_pp(weights[:-1], funcs, var_names, threshold=threshold, n_double=n_double) expr = last_pp(expr, weights[-1]) expr = expr[0, 0] return expr def filter_mat(mat, threshold=0.01): for i in range(mat.shape[0]): for j in range(mat.shape[1]): if abs(mat[i, j]) < threshold: mat[i, j] = 0 return mat # --------------------------------------------------------------------------- # 5. Data helpers # --------------------------------------------------------------------------- def tensor_to_matrix(tensor): number_features = tensor.shape[0] number_cities = tensor.shape[1] len_dates = tensor.shape[2] matrix_for_scaling = np.ones((len_dates, number_cities * number_features)) for i in range(number_cities): for j in range(len_dates): features_city = tensor[:, i, j] matrix_for_scaling[j, i * number_features:(i + 1) * number_features] = features_city return matrix_for_scaling def get_x_right_format(phase, steps_ahead, feature): steps_ahead_to_step = {6: "1", 12: "2", 18: "3", 24: "4"} filename = "{}/step{}.mat".format(feature, steps_ahead_to_step[steps_ahead]) data = loadmat('Denmark_data/{}'.format(filename)) x = data["Xtr"] if phase == "train" else data["Xtest"] x = np.transpose(x, (0, 3, 2, 1)) # => (Dates, Features, Lags, Cities) x = np.transpose(x, (0, 1, 3, 2)) # => (Dates, Features, Cities, Lags) all_features_all_cities = x.shape[1] * x.shape[2] x_output = np.zeros((x.shape[0], 80)) for i in range(x.shape[0]): temp_matrix = tensor_to_matrix(x[i]) for j in range(temp_matrix.shape[0]): x_output[i, j * all_features_all_cities:(j + 1) * all_features_all_cities] = temp_matrix[j] return x_output def generate_all_data(phase, steps_ahead, feature, city_index): steps_ahead_to_step = {6: "1", 12: "2", 18: "3", 24: "4"} filename = "{}/step{}.mat".format(feature, steps_ahead_to_step[steps_ahead]) data = loadmat('Denmark_data/{}'.format(filename)) if phase == "train": y_temp = data["Ytr"][:, city_index] elif phase == "test": y_temp = data["Ytest"][:, city_index] else: return None, None x = get_x_right_format(phase, steps_ahead, feature).astype("float32") y = y_temp.reshape(y_temp.shape[0], 1).astype('float32') return x, y def generate_variable_list(alphabet_size, num_variables): lower_case = [chr(i + 97) for i in range(26)] variables = lower_case[:alphabet_size] prefix_index = 0 letter_index = 0 prefix = variables[0] for i in range(num_variables): if (i + 1) % alphabet_size == 0: variables.append(prefix + lower_case[letter_index]) prefix_index += 1 prefix = variables[prefix_index] letter_index = 0 else: variables.append(prefix + lower_case[letter_index]) letter_index += 1 return variables # --------------------------------------------------------------------------- # 6. Utils # --------------------------------------------------------------------------- def create_experiment_folder(experiment_number): os.makedirs("ExperimentsSR/Experiment{}".format(experiment_number), exist_ok=True) def get_experiment_number(): if not os.path.isdir("ExperimentsSR"): return 1 folders = os.listdir("ExperimentsSR/") nums = [] import re for f in folders: n = re.findall(r'\d+', f) if n: nums.append(int(n[0])) return max(nums) + 1 if nums else 1 def create_summary_file(experiment_number): open("ExperimentsSR/Experiment{}/summary_experiment{}.txt".format(experiment_number, experiment_number), "w").close() def get_folders_started(): os.makedirs("ExperimentsSR", exist_ok=True) experiment_number = get_experiment_number() create_experiment_folder(experiment_number) create_summary_file(experiment_number) return experiment_number def record_base_info(experiment_number, **config): with open("ExperimentsSR/Experiment{}/summary_experiment{}.txt".format(experiment_number, experiment_number), "a+") as f: for k, v in config.items(): f.write("{}: {}\n".format(k, v)) def append_text_to_summary(experiment_number, text): with open("ExperimentsSR/Experiment{}/summary_experiment{}.txt".format(experiment_number, experiment_number), "a+") as f: f.write(text) def plot_train_vs_validation(experiment_number, num_epochs, train_loss, validation_loss, phase): x = range(len(train_loss)) plt.plot(x, train_loss, label="Training") plt.plot(x, validation_loss, label="Validation") plt.ylabel("MSE") plt.xlabel("Epochs") plt.title(phase) plt.legend() plt.savefig("ExperimentsSR/Experiment{}/train_vs_validation_{}.jpg".format(experiment_number, phase)) plt.clf() def plot_histogram(experiment_number, weights, phase, type_weights, a): plt.hist(weights) plt.title(phase + ", " + type_weights + ", a:{}".format(a)) plt.savefig("ExperimentsSR/Experiment{}/hist_{}_{}.jpg".format(experiment_number, phase, type_weights)) plt.clf() def plot_descaled_real_vs_prediction(experiment_number, y_real, y_predicted, y_min, y_max): yp = y_predicted.numpy() if hasattr(y_predicted, 'numpy') else np.array(y_predicted) yr = y_real.numpy() if hasattr(y_real, 'numpy') else np.array(y_real) y_predicted_rescaled = yp * (y_max - y_min) + y_min y_test_rescaled = yr * (y_max - y_min) + y_min mse = mean_squared_error(y_predicted_rescaled, y_test_rescaled) mae = mean_absolute_error(y_predicted_rescaled, y_test_rescaled) plt.figure(figsize=(10, 8)) plt.plot(y_test_rescaled[:2000], label="Real") plt.plot(y_predicted_rescaled[:2000], label="Prediction from formula") plt.legend() plt.title("MAE: {:.2e}, MSE: {:.2e}".format(mae, mse)) plt.savefig("ExperimentsSR/Experiment{}/real_vs_prediction_phase1.jpg".format(experiment_number)) plt.clf() return mae # --------------------------------------------------------------------------- # 7. Weights I/O # --------------------------------------------------------------------------- def save_weights(weights, experiment_number, phase, best_val_loss): if phase == "phase1": filename = "ExperimentsSR/Experiment{}/nt_val_weights_{:.2e}.hdf5".format(experiment_number, best_val_loss) elif phase == "phase2": filename = "ExperimentsSR/Experiment{}/phase2_val_weights_{:.2e}.hdf5".format(experiment_number, best_val_loss) with h5py.File(filename, "w") as f: for i in range(len(weights)): w = weights[i] if hasattr(w, 'numpy'): w = w.numpy() f.create_dataset('dataset{}'.format(i), data=w) def load_weights(filename): weights = [] with h5py.File(filename, "r") as f: for i in range(3): weights.append(f.get('dataset{}'.format(i))[()]) return weights