| import os
|
| import subprocess
|
| import sys
|
| import argparse
|
| import random
|
| import logging
|
| from datetime import datetime
|
| import json
|
| from typing import List, Tuple, Dict, Any
|
|
|
| import numpy as np
|
| import tensorflow as tf
|
| from tensorflow.keras.models import Sequential, load_model, clone_model
|
| from tensorflow.keras.layers import Dense, Input
|
| from tensorflow.keras.optimizers import Adam
|
| from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
|
| import matplotlib.pyplot as plt
|
| from scipy.stats import kendalltau
|
|
|
|
|
| DEFAULT_SEQ_LENGTH = 10
|
| DEFAULT_POP_SIZE = 50
|
| DEFAULT_GENERATIONS = 50
|
| DEFAULT_MUTATION_RATE = 0.4
|
| DEFAULT_WEIGHT_MUT_RATE = 0.8
|
| DEFAULT_ACTIVATION_MUT_RATE = 0.2
|
| DEFAULT_MUTATION_STRENGTH = 0.1
|
| DEFAULT_TOURNAMENT_SIZE = 5
|
| DEFAULT_ELITISM_COUNT = 2
|
| DEFAULT_EPOCHS_FINAL_TRAIN = 100
|
| DEFAULT_BATCH_SIZE = 64
|
|
|
|
|
| def setup_logging(log_dir: str, log_level=logging.INFO) -> None:
|
| """Configures logging to file and console."""
|
| log_filename = os.path.join(log_dir, 'evolution.log')
|
| logging.basicConfig(
|
| level=log_level,
|
| format='%(asctime)s - %(levelname)s - %(message)s',
|
| handlers=[
|
| logging.FileHandler(log_filename),
|
| logging.StreamHandler(sys.stdout)
|
| ]
|
| )
|
|
|
|
|
| def check_gpu() -> bool:
|
| """Checks for GPU availability and sets memory growth."""
|
| gpus = tf.config.list_physical_devices('GPU')
|
| if gpus:
|
| try:
|
|
|
| for gpu in gpus:
|
| tf.config.experimental.set_memory_growth(gpu, True)
|
| logical_gpus = tf.config.list_logical_devices('GPU')
|
| logging.info(f"{len(gpus)} Physical GPUs, {len(logical_gpus)} Logical GPUs found.")
|
| logging.info(f"Using GPU: {gpus[0].name}")
|
| return True
|
| except RuntimeError as e:
|
|
|
| logging.error(f"Error setting memory growth: {e}")
|
| return False
|
| else:
|
| logging.warning("GPU not found. Using CPU.")
|
| return False
|
|
|
|
|
| def generate_data(num_samples: int, seq_length: int) -> Tuple[np.ndarray, np.ndarray]:
|
| """Generates random sequences and their sorted versions."""
|
| logging.info(f"Generating {num_samples} samples with sequence length {seq_length}...")
|
| X = np.random.rand(num_samples, seq_length) * 100
|
| y = np.sort(X, axis=1)
|
| logging.info("Data generation complete.")
|
| return X, y
|
|
|
|
|
| def create_individual(seq_length: int) -> Sequential:
|
| """Creates a Keras Sequential model with random architecture."""
|
| model = Sequential(name=f"model_random_{random.randint(1000, 9999)}")
|
| num_hidden_layers = random.randint(1, 4)
|
| neurons_per_layer = [random.randint(8, 64) for _ in range(num_hidden_layers)]
|
| activations = [random.choice(['relu', 'tanh', 'sigmoid']) for _ in range(num_hidden_layers)]
|
|
|
|
|
| model.add(Input(shape=(seq_length,)))
|
|
|
|
|
| for i in range(num_hidden_layers):
|
| model.add(Dense(neurons_per_layer[i], activation=activations[i]))
|
|
|
|
|
| model.add(Dense(seq_length, activation='linear'))
|
|
|
|
|
|
|
| model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
|
| return model
|
|
|
| @tf.function
|
| def get_predictions(model: Sequential, X: np.ndarray, batch_size: int) -> tf.Tensor:
|
| """Gets model predictions using tf.function."""
|
| return model(X, training=False)
|
|
|
| def calculate_fitness(individual: Sequential, X: np.ndarray, y: np.ndarray, batch_size: int) -> float:
|
| """Calculates fitness based on inverse MSE. Handles potential errors."""
|
| try:
|
|
|
| X_tf = tf.cast(X, tf.float32)
|
| y_tf = tf.cast(y, tf.float32)
|
|
|
|
|
| y_pred_tf = get_predictions(individual, X_tf, batch_size)
|
|
|
|
|
| mse = tf.reduce_mean(tf.square(y_tf - y_pred_tf))
|
| mse_val = mse.numpy()
|
|
|
|
|
| fitness_score = 1.0 / (mse_val + 1e-8)
|
|
|
|
|
| if not np.isfinite(fitness_score):
|
| logging.warning(f"Non-finite fitness detected ({fitness_score}) for model {individual.name}. Assigning low fitness.")
|
| return 1e-8
|
|
|
| return float(fitness_score)
|
|
|
| except Exception as e:
|
| logging.error(f"Error during fitness calculation for model {individual.name}: {e}", exc_info=True)
|
| return 1e-8
|
|
|
|
|
| def mutate_individual(individual: Sequential, weight_mut_rate: float, act_mut_rate: float, mut_strength: float) -> Sequential:
|
| """Applies mutations (weight perturbation, activation change) to an individual."""
|
| mutated_model = clone_model(individual)
|
| mutated_model.set_weights(individual.get_weights())
|
|
|
| mutated = False
|
|
|
| if random.random() < weight_mut_rate:
|
| mutated = True
|
| for layer in mutated_model.layers:
|
| if isinstance(layer, Dense):
|
| weights_biases = layer.get_weights()
|
| new_weights_biases = []
|
| for wb in weights_biases:
|
| noise = np.random.normal(0, mut_strength, wb.shape)
|
| new_weights_biases.append(wb + noise)
|
| if new_weights_biases:
|
| layer.set_weights(new_weights_biases)
|
|
|
|
|
|
|
| if random.random() < act_mut_rate:
|
|
|
| dense_layers = [layer for layer in mutated_model.layers if isinstance(layer, Dense)]
|
| if len(dense_layers) > 1:
|
| mutated = True
|
| layer_to_mutate = random.choice(dense_layers[:-1])
|
| current_activation = layer_to_mutate.get_config().get('activation', 'linear')
|
| possible_activations = ['relu', 'tanh', 'sigmoid']
|
| if current_activation in possible_activations:
|
| possible_activations.remove(current_activation)
|
| new_activation = random.choice(possible_activations)
|
|
|
|
|
|
|
| config = mutated_model.get_config()
|
| for layer_config in config['layers']:
|
| if layer_config['config']['name'] == layer_to_mutate.name:
|
| layer_config['config']['activation'] = new_activation
|
|
|
| break
|
|
|
|
|
|
|
| try:
|
| mutated_model_new_act = Sequential.from_config(config)
|
| mutated_model_new_act.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
|
| mutated_model = mutated_model_new_act
|
| except Exception as e:
|
| logging.error(f"Error rebuilding model after activation mutation for {mutated_model.name}: {e}")
|
|
|
|
|
|
|
|
|
| if mutated:
|
| mutated_model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
|
| mutated_model._name = f"mutated_{individual.name}"
|
|
|
| return mutated_model
|
|
|
|
|
| def tournament_selection(population: List[Sequential], fitness_scores: List[float], k: int) -> Sequential:
|
| """Selects the best individual from a randomly chosen tournament group."""
|
| tournament_indices = random.sample(range(len(population)), k)
|
| tournament_fitness = [fitness_scores[i] for i in tournament_indices]
|
| winner_index_in_tournament = np.argmax(tournament_fitness)
|
| winner_original_index = tournament_indices[winner_index_in_tournament]
|
| return population[winner_original_index]
|
|
|
| def evolve_population(population: List[Sequential], X: np.ndarray, y: np.ndarray, generations: int,
|
| mutation_rate: float, weight_mut_rate: float, act_mut_rate: float, mut_strength: float,
|
| tournament_size: int, elitism_count: int, batch_size: int) -> Tuple[Sequential, List[float], List[float]]:
|
| """Runs the evolutionary process."""
|
| best_fitness_history = []
|
| avg_fitness_history = []
|
| best_model_overall = None
|
| best_fitness_overall = -1.0
|
|
|
| for gen in range(generations):
|
|
|
| fitness_scores = [calculate_fitness(ind, X, y, batch_size) for ind in population]
|
|
|
|
|
| current_best_idx = np.argmax(fitness_scores)
|
| current_best_fitness = fitness_scores[current_best_idx]
|
| if current_best_fitness > best_fitness_overall:
|
| best_fitness_overall = current_best_fitness
|
|
|
| best_model_overall = clone_model(population[current_best_idx])
|
| best_model_overall.set_weights(population[current_best_idx].get_weights())
|
| best_model_overall.compile(optimizer=Adam(), loss='mse')
|
| logging.info(f"Generation {gen+1}: New overall best fitness: {best_fitness_overall:.4f}")
|
|
|
|
|
| avg_fitness = np.mean(fitness_scores)
|
| best_fitness_history.append(current_best_fitness)
|
| avg_fitness_history.append(avg_fitness)
|
|
|
| logging.info(f"Generation {gen+1}/{generations} - Best Fitness: {current_best_fitness:.4f}, Avg Fitness: {avg_fitness:.4f}")
|
|
|
| new_population = []
|
|
|
|
|
| if elitism_count > 0:
|
| elite_indices = np.argsort(fitness_scores)[-elitism_count:]
|
| for idx in elite_indices:
|
|
|
| elite_clone = clone_model(population[idx])
|
| elite_clone.set_weights(population[idx].get_weights())
|
| elite_clone.compile(optimizer=Adam(), loss='mse')
|
| new_population.append(elite_clone)
|
|
|
|
|
|
|
| while len(new_population) < len(population):
|
|
|
| parent = tournament_selection(population, fitness_scores, tournament_size)
|
|
|
|
|
| child = parent
|
| if random.random() < mutation_rate:
|
|
|
| parent_clone = clone_model(parent)
|
| parent_clone.set_weights(parent.get_weights())
|
| parent_clone.compile(optimizer=Adam(), loss='mse')
|
| child = mutate_individual(parent_clone, weight_mut_rate, act_mut_rate, mut_strength)
|
| else:
|
|
|
| child = clone_model(parent)
|
| child.set_weights(parent.get_weights())
|
| child.compile(optimizer=Adam(), loss='mse')
|
|
|
|
|
| new_population.append(child)
|
|
|
| population = new_population[:len(population)]
|
|
|
| if best_model_overall is None:
|
| best_idx = np.argmax([calculate_fitness(ind, X, y, batch_size) for ind in population])
|
| best_model_overall = population[best_idx]
|
|
|
| return best_model_overall, best_fitness_history, avg_fitness_history
|
|
|
|
|
|
|
| def plot_fitness_history(history_best: List[float], history_avg: List[float], output_dir: str) -> None:
|
| """Plots and saves the fitness history."""
|
| plt.figure(figsize=(12, 6))
|
| plt.plot(history_best, label="Best Fitness per Generation", marker='o', linestyle='-')
|
| plt.plot(history_avg, label="Average Fitness per Generation", marker='x', linestyle='--')
|
| plt.xlabel("Generation")
|
| plt.ylabel("Fitness Score (1 / MSE)")
|
| plt.title("Evolutionary Process Fitness History")
|
| plt.legend()
|
| plt.grid(True)
|
| plt.tight_layout()
|
| plot_path = os.path.join(output_dir, "fitness_history.png")
|
| plt.savefig(plot_path)
|
| plt.close()
|
| logging.info(f"Fitness history plot saved to {plot_path}")
|
|
|
|
|
| def evaluate_model(model: Sequential, X_test: np.ndarray, y_test: np.ndarray, batch_size: int) -> Dict[str, float]:
|
| """Evaluates the final model on the test set."""
|
| logging.info("Evaluating final model on test data...")
|
| y_pred = model.predict(X_test, batch_size=batch_size, verbose=0)
|
| test_mse = np.mean(np.square(y_test - y_pred))
|
| logging.info(f"Final Test MSE: {test_mse:.6f}")
|
|
|
|
|
| sample_size = min(100, X_test.shape[0])
|
| taus = []
|
| indices = np.random.choice(X_test.shape[0], sample_size, replace=False)
|
| for i in indices:
|
| tau, _ = kendalltau(y_test[i], y_pred[i])
|
| if not np.isnan(tau):
|
| taus.append(tau)
|
| avg_kendall_tau = np.mean(taus) if taus else 0.0
|
| logging.info(f"Average Kendall's Tau (on {sample_size} samples): {avg_kendall_tau:.4f}")
|
|
|
| return {
|
| "test_mse": float(test_mse),
|
| "avg_kendall_tau": float(avg_kendall_tau)
|
| }
|
|
|
|
|
| def run_pipeline(args: argparse.Namespace):
|
| """Executes the complete neuroevolution pipeline."""
|
|
|
|
|
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
| output_dir = os.path.join(args.output_base_dir, f"evorun_{timestamp}")
|
| os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
| setup_logging(output_dir)
|
| logging.info(f"Starting EvoNet Pipeline Run: {timestamp}")
|
| logging.info(f"Output directory: {output_dir}")
|
|
|
|
|
| logging.info("Configuration:")
|
| args_dict = vars(args)
|
| for k, v in args_dict.items():
|
| logging.info(f" {k}: {v}")
|
|
|
| config_path = os.path.join(output_dir, "config.json")
|
| with open(config_path, 'w') as f:
|
| json.dump(args_dict, f, indent=4)
|
| logging.info(f"Configuration saved to {config_path}")
|
|
|
|
|
|
|
| random.seed(args.seed)
|
| np.random.seed(args.seed)
|
| tf.random.set_seed(args.seed)
|
| logging.info(f"Using random seed: {args.seed}")
|
|
|
|
|
| check_gpu()
|
|
|
|
|
| X_train, y_train = generate_data(args.train_samples, args.seq_length)
|
| X_test, y_test = generate_data(args.test_samples, args.seq_length)
|
|
|
|
|
| logging.info(f"Initializing population of {args.pop_size} individuals...")
|
| population = [create_individual(args.seq_length) for _ in range(args.pop_size)]
|
| logging.info("Population initialized.")
|
|
|
|
|
| logging.info(f"Starting evolution for {args.generations} generations...")
|
| best_model_unevolved, best_fitness_hist, avg_fitness_hist = evolve_population(
|
| population, X_train, y_train, args.generations,
|
| args.mutation_rate, args.weight_mut_rate, args.activation_mut_rate, args.mutation_strength,
|
| args.tournament_size, args.elitism_count, args.batch_size
|
| )
|
| logging.info("Evolution complete.")
|
|
|
|
|
| history_path = os.path.join(output_dir, "fitness_history.csv")
|
| history_data = np.array([best_fitness_hist, avg_fitness_hist]).T
|
| np.savetxt(history_path, history_data, delimiter=',', header='BestFitness,AvgFitness', comments='')
|
| logging.info(f"Fitness history data saved to {history_path}")
|
|
|
|
|
| plot_fitness_history(best_fitness_hist, avg_fitness_hist, output_dir)
|
|
|
|
|
| logging.info("Starting final training of the best evolved model...")
|
|
|
| final_model = clone_model(best_model_unevolved)
|
| final_model.set_weights(best_model_unevolved.get_weights())
|
|
|
| final_model.compile(optimizer=Adam(learning_rate=0.001), loss='mse', metrics=['mae'])
|
|
|
|
|
| early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)
|
| reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6, verbose=1)
|
|
|
|
|
| history = final_model.fit(
|
| X_train, y_train,
|
| epochs=args.epochs_final_train,
|
| batch_size=args.batch_size,
|
| validation_split=0.2,
|
| callbacks=[early_stopping, reduce_lr],
|
| verbose=2
|
| )
|
| logging.info("Final training complete.")
|
|
|
|
|
| final_metrics = evaluate_model(final_model, X_test, y_test, args.batch_size)
|
|
|
|
|
| model_path = os.path.join(output_dir, "best_evolved_model_trained.keras")
|
| final_model.save(model_path)
|
| logging.info(f"Final trained model saved to {model_path}")
|
|
|
|
|
| results = {
|
| "config": args_dict,
|
| "final_evaluation": final_metrics,
|
| "evolution_summary": {
|
| "best_fitness_overall": best_fitness_hist[-1] if best_fitness_hist else None,
|
| "avg_fitness_final_gen": avg_fitness_hist[-1] if avg_fitness_hist else None,
|
| },
|
| "training_history": history.history
|
| }
|
| results_path = os.path.join(output_dir, "final_results.json")
|
|
|
| for key in results['training_history']:
|
| results['training_history'][key] = [float(v) for v in results['training_history'][key]]
|
|
|
| with open(results_path, 'w') as f:
|
| json.dump(results, f, indent=4)
|
| logging.info(f"Final results saved to {results_path}")
|
| logging.info("Pipeline finished successfully!")
|
|
|
|
|
|
|
| def parse_arguments() -> argparse.Namespace:
|
| parser = argparse.ArgumentParser(description="EvoNet: Neuroevolution for Sorting Task")
|
|
|
|
|
| parser.add_argument('--output_base_dir', type=str, default=os.path.join(os.getcwd(), "evonet_runs"),
|
| help='Base directory to store run results.')
|
|
|
|
|
| parser.add_argument('--seq_length', type=int, default=DEFAULT_SEQ_LENGTH,
|
| help='Length of the sequences to sort.')
|
| parser.add_argument('--train_samples', type=int, default=5000, help='Number of training samples.')
|
| parser.add_argument('--test_samples', type=int, default=1000, help='Number of test samples.')
|
|
|
|
|
| parser.add_argument('--pop_size', type=int, default=DEFAULT_POP_SIZE, help='Population size.')
|
| parser.add_argument('--generations', type=int, default=DEFAULT_GENERATIONS, help='Number of generations.')
|
| parser.add_argument('--mutation_rate', type=float, default=DEFAULT_MUTATION_RATE,
|
| help='Overall probability of mutating an individual.')
|
| parser.add_argument('--weight_mut_rate', type=float, default=DEFAULT_WEIGHT_MUT_RATE,
|
| help='Probability of weight perturbation if mutation occurs.')
|
| parser.add_argument('--activation_mut_rate', type=float, default=DEFAULT_ACTIVATION_MUT_RATE,
|
| help='Probability of activation change if mutation occurs.')
|
| parser.add_argument('--mutation_strength', type=float, default=DEFAULT_MUTATION_STRENGTH,
|
| help='Standard deviation of Gaussian noise for weight mutation.')
|
| parser.add_argument('--tournament_size', type=int, default=DEFAULT_TOURNAMENT_SIZE,
|
| help='Number of individuals participating in tournament selection.')
|
| parser.add_argument('--elitism_count', type=int, default=DEFAULT_ELITISM_COUNT,
|
| help='Number of best individuals to carry over directly.')
|
|
|
|
|
| parser.add_argument('--batch_size', type=int, default=DEFAULT_BATCH_SIZE, help='Batch size for predictions and training.')
|
| parser.add_argument('--epochs_final_train', type=int, default=DEFAULT_EPOCHS_FINAL_TRAIN,
|
| help='Max epochs for final training of the best model.')
|
|
|
|
|
| parser.add_argument('--seed', type=int, default=None, help='Random seed for reproducibility (default: random).')
|
|
|
| args = parser.parse_args()
|
|
|
|
|
| if args.seed is None:
|
| args.seed = random.randint(0, 2**32 - 1)
|
|
|
| return args
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
| cli_args = parse_arguments()
|
|
|
|
|
| os.makedirs(cli_args.output_base_dir, exist_ok=True)
|
|
|
|
|
| try:
|
| run_pipeline(cli_args)
|
| except Exception as e:
|
|
|
|
|
| print(f"FATAL ERROR in pipeline execution: {e}", file=sys.stderr)
|
|
|
| if logging.getLogger().hasHandlers():
|
| logging.critical("FATAL ERROR in pipeline execution:", exc_info=True)
|
| else:
|
| import traceback
|
| print(traceback.format_exc(), file=sys.stderr)
|
| sys.exit(1) |