""" solver.py — Entry point principale. Ciclo evolutivo NSGA-II per TOP-TW. Uso: python -m tour_ga.solver --city rome --budget 480 --generations 200 """ from __future__ import annotations import random import time from dataclasses import dataclass from typing import Callable from core.models import Individual from core.distance import DistanceMatrix from core.fitness import FitnessEvaluator from ga.operators import ( tournament_select, order_crossover, poi_aware_crossover, mutate ) from ga.repair import RepairEngine from ga.seeding import GreedySeeder import config @dataclass class SolverConfig: pop_size: int = config.GA_POP_SIZE max_generations: int = config.GA_MAX_GENERATIONS cx_prob: float = config.GA_CX_PROB mut_prob: float = config.GA_MUT_PROB tournament_k: int = config.GA_TOURNAMENT_K stagnation_limit: int = config.GA_STAGNATION_LIMIT start_time: int = config.DEFAULT_START_TIME budget: int = config.DEFAULT_BUDGET start_lat: float = config.DEFAULT_START_LAT start_lon: float = config.DEFAULT_START_LON w_score: float = config.W_SCORE w_dist: float = config.W_DIST w_time: float = config.W_TIME max_wait_min: int = config.GA_MAX_WAIT_MIN ox_crossover_prob: float = config.GA_OX_CROSSOVER_PROB class NSGA2Solver: """ Implementazione NSGA-II adattata al problema TOP-TW. Riceve un TouristProfile e lo propaga a tutti i componenti. """ def __init__(self, pois, dm: DistanceMatrix, config: SolverConfig, profile=None): from core.profile import TouristProfile # import locale per evitare dipendenze circolari self.pois = pois self.dm = dm self.config = config self.profile = profile or TouristProfile() # default: turista generico a piedi # Inietta il profilo nella matrice distanze (per la velocità) self.dm.profile = self.profile self.repair = RepairEngine( dm=dm, profile=self.profile, all_pois=pois, start_time=config.start_time, budget=config.budget, start_lat=config.start_lat, start_lon=config.start_lon, max_wait_min=config.max_wait_min, ) self.evaluator = FitnessEvaluator( dist_matrix=dm, profile=self.profile, start_time=config.start_time, budget=config.budget, start_lat=config.start_lat, start_lon=config.start_lon, w_score=config.w_score, w_dist=config.w_dist, w_time=config.w_time, ) self.seeder = GreedySeeder( pois=pois, dm=dm, repair=self.repair, profile=self.profile, start_time=config.start_time, budget=config.budget, start_lat=config.start_lat, start_lon=config.start_lon, ) self.history: list[dict] = [] # statistiche per generazione def solve(self, callback: Callable | None = None) -> list[Individual]: """ Esegue il ciclo NSGA-II e restituisce il fronte di Pareto finale. callback(gen, pareto_front, stats) viene chiamata ogni generazione. """ cfg = self.config t0 = time.perf_counter() # --- Inizializzazione --- population = self.seeder.build_population(cfg.pop_size) population = self._evaluate_all(population) population = self._nsga2_select(population + [], cfg.pop_size) best_scalar = max(ind.fitness.scalar for ind in population) stagnation = 0 for gen in range(cfg.max_generations): # --- Generazione figli --- offspring = [] while len(offspring) < cfg.pop_size: p1 = tournament_select(population, cfg.tournament_k) p2 = tournament_select(population, cfg.tournament_k) # Alterna tra OX e PoI-aware crossover if random.random() < cfg.cx_prob: if random.random() < cfg.ox_crossover_prob: c1, c2 = order_crossover(p1, p2) else: c1, c2 = poi_aware_crossover(p1, p2) else: c1, c2 = p1.clone(), p2.clone() c1 = mutate(c1, self.seeder.allowed_pois, cfg.mut_prob) c2 = mutate(c2, self.seeder.allowed_pois, cfg.mut_prob) # Riparazione obbligatoria dopo ogni operatore c1 = self.repair.repair(c1) c2 = self.repair.repair(c2) offspring.extend([c1, c2]) # --- Valutazione e selezione NSGA-II --- offspring = self._evaluate_all(offspring) combined = population + offspring population = self._nsga2_select(combined, cfg.pop_size) # --- Statistiche e criterio di stop --- pareto = [ind for ind in population if ind.fitness.rank == 1] new_best = max(ind.fitness.scalar for ind in population) stats = { "gen": gen + 1, "pareto_size": len(pareto), "best_scalar": round(new_best, 4), "avg_score": round( sum(ind.fitness.total_score for ind in population) / len(population), 3 ), "feasible_pct": round( sum(1 for ind in population if ind.fitness.is_feasible) / len(population) * 100, 1 ), "elapsed_s": round(time.perf_counter() - t0, 2), } self.history.append(stats) if callback: callback(gen + 1, pareto, stats) if new_best > best_scalar + 1e-6: best_scalar = new_best stagnation = 0 else: stagnation += 1 if stagnation >= cfg.stagnation_limit: print(f" Early stop a gen {gen+1}: stagnazione per {stagnation} generazioni.") break pareto_front = [ind for ind in population if ind.fitness.rank == 1] return sorted(pareto_front, key=lambda x: -x.fitness.total_score) # ------------------------------------------------------------------ # NSGA-II core: fast non-dominated sort + crowding distance # ------------------------------------------------------------------ def _evaluate_all(self, pop: list[Individual]) -> list[Individual]: for ind in pop: self.evaluator.evaluate(ind) return pop def _nsga2_select( self, combined: list[Individual], target_size: int ) -> list[Individual]: """Selezione NSGA-II: ranking Pareto + crowding distance.""" fronts = self._fast_non_dominated_sort(combined) next_pop: list[Individual] = [] for front in fronts: if len(next_pop) + len(front) <= target_size: next_pop.extend(front) else: self._assign_crowding_distance(front) front.sort(key=lambda x: x.fitness.crowd, reverse=True) next_pop.extend(front[:target_size - len(next_pop)]) break return next_pop def _fast_non_dominated_sort( self, pop: list[Individual] ) -> list[list[Individual]]: """ Algoritmo NSGA-II O(MN²) per la costruzione dei fronti. M = numero obiettivi, N = dimensione popolazione. """ n = len(pop) dom_count = [0] * n # quanti individui dominano i dom_set = [[] for _ in range(n)] # individui dominati da i fronts = [[]] for i in range(n): for j in range(n): if i == j: continue fi, fj = pop[i].fitness, pop[j].fitness if fi.dominates(fj): dom_set[i].append(j) elif fj.dominates(fi): dom_count[i] += 1 if dom_count[i] == 0: pop[i].fitness.rank = 1 fronts[0].append(pop[i]) rank = 1 current_front = fronts[0] while current_front: next_front = [] for ind in current_front: idx_i = pop.index(ind) for idx_j in dom_set[idx_i]: dom_count[idx_j] -= 1 if dom_count[idx_j] == 0: pop[idx_j].fitness.rank = rank + 1 next_front.append(pop[idx_j]) rank += 1 fronts.append(next_front) current_front = next_front return [f for f in fronts if f] def _assign_crowding_distance(self, front: list[Individual]): """Calcola la crowding distance per il front dato.""" n = len(front) if n == 0: return for ind in front: ind.fitness.crowd = 0.0 objectives = [ lambda x: x.fitness.total_score, # massimizza lambda x: -x.fitness.total_distance, # minimizza → negativo lambda x: -x.fitness.total_time, # minimizza → negativo ] for obj_fn in objectives: sorted_f = sorted(front, key=obj_fn) sorted_f[0].fitness.crowd = float('inf') sorted_f[-1].fitness.crowd = float('inf') f_min = obj_fn(sorted_f[0]) f_max = obj_fn(sorted_f[-1]) f_range = f_max - f_min or 1e-9 for i in range(1, n - 1): sorted_f[i].fitness.crowd += ( obj_fn(sorted_f[i+1]) - obj_fn(sorted_f[i-1]) ) / f_range