import numpy as np from numba import njit, prange from src.config import cfg from src.problems.my_problem import convert_employees_to_numpy from src.engine.selection import tournament_selection, calculate_population_fitness_parallel, calculate_fitness_numba, calculate_detailed_score, get_valid_start_slots_numba, CODE_OFF from src.engine.crossover import crossover from src.engine.mutation import mutate from src.utils.health import calculate_diversity_snapshot @njit(cache=True) def _generate_single_random_individual(num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, closing_slots, is_closed_arr): """ Genera un individuo con assegnazioni di turni completamente casuali. """ roster = np.full((num_emps, num_days), CODE_OFF, dtype=np.int64) for i in range(num_emps): shift_len = lengths[i] days_worked_indices = np.empty(7, dtype=np.int64) dw_ptr = 0 # Assegnazione casuale degli slot validi for d in range(num_days): ctype = cons_types[i, d] cval = cons_vals[i, d] c_slot = closing_slots[d] closed = is_closed_arr[d] valid = get_valid_start_slots_numba(d, shift_len, ctype, cval, c_slot, closed) if len(valid) > 0: chosen = valid[np.random.randint(0, len(valid))] roster[i, d] = chosen if chosen >= 0: days_worked_indices[dw_ptr] = d dw_ptr += 1 # Drop dei giorni in eccesso rispetto al target contrattuale tgt = target_days_arr[i] while dw_ptr > tgt: idx_in_buffer = np.random.randint(0, dw_ptr) day_to_remove = days_worked_indices[idx_in_buffer] roster[i, day_to_remove] = CODE_OFF days_worked_indices[idx_in_buffer] = days_worked_indices[dw_ptr - 1] dw_ptr -= 1 return roster @njit(cache=True) def _generate_single_heuristic_individual(num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, closing_slots, is_closed_arr, target_demand, randomness): """ Genera un individuo usando un approccio greedy basato sulla demand residua. """ num_slots = target_demand.shape[1] roster = np.full((num_emps, num_days), CODE_OFF, dtype=np.int64) # Copia locale della demand per aggiornare i residui residual = target_demand.copy().astype(np.float64) emp_order = np.random.permutation(num_emps) for i in emp_order: shift_len = lengths[i] tgt = target_days_arr[i] # Calcolo dei giorni con maggior necessità operativa daily_needs = np.zeros(num_days, dtype=np.float64) for d in range(num_days): s = 0.0 for k in range(num_slots): s += residual[d, k] daily_needs[d] = s if randomness > 0: noise = np.random.randn(num_days) * (np.mean(daily_needs) * randomness) daily_needs += noise sorted_days = np.argsort(daily_needs) start_idx = max(0, num_days - tgt) preferred_days = sorted_days[start_idx:] # Assegnazione dello slot migliore sul giorno scelto for day in preferred_days: ctype = cons_types[i, day] cval = cons_vals[i, day] c_slot = closing_slots[day] closed = is_closed_arr[day] valid = get_valid_start_slots_numba(day, shift_len, ctype, cval, c_slot, closed) if len(valid) == 0: roster[i, day] = CODE_OFF continue # Torneo limitato a 5 candidati per ridurre l'overhead n_cand = len(valid) limit = 5 if n_cand <= limit: candidates = valid else: candidates = np.empty(limit, dtype=np.int64) for k in range(limit): candidates[k] = valid[np.random.randint(0, n_cand)] best_slot = candidates[0] best_score = -1e9 for slot in candidates: if slot < 0: continue end = min(slot + shift_len, num_slots) score = 0.0 for k in range(slot, end): score += residual[day, k] if score > best_score: best_score = score best_slot = slot roster[i, day] = best_slot if best_slot >= 0: end = min(best_slot + shift_len, num_slots) for k in range(best_slot, end): val = residual[day, k] - 1.0 if val < 0: val = 0.0 residual[day, k] = val return roster @njit(parallel=True, cache=True) def initialize_population_fast_wrapper(pop_size, heuristic_limit, numpy_data, target_demand, randomness, closing_slots, is_closed_arr): """ Wrapper JIT parallelo per la generazione massiva della popolazione. """ _, lengths, target_days_arr, cons_types, cons_vals = numpy_data num_emps = len(lengths) num_days = 7 population = np.zeros((pop_size, num_emps, num_days), dtype=np.int64) for p in prange(pop_size): if p < heuristic_limit: population[p] = _generate_single_heuristic_individual( num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, closing_slots, is_closed_arr, target_demand, randomness ) else: population[p] = _generate_single_random_individual( num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, closing_slots, is_closed_arr ) return population def initialize_population_numba(pop_size, numpy_data, target_demand, cfg): heuristic_rate = cfg.genetic_params.get('heuristic_rate', 0.8) heuristic_noise = cfg.genetic_params.get('heuristic_noise', 0.2) num_heuristic = int(pop_size * heuristic_rate) print(f"[*] Inizializzazione popolazione: {num_heuristic} greedy, {pop_size - num_heuristic} random") # Passaggio array espliciti per bypassare le limitazioni di Numba sugli oggetti custom closing_slots = np.array([cfg.get_closing_slot(d) for d in range(7)], dtype=np.int64) is_closed_arr = np.array([1 if cfg.is_day_closed(d) else 0 for d in range(7)], dtype=np.int64) return initialize_population_fast_wrapper( pop_size, num_heuristic, numpy_data, target_demand, heuristic_noise, closing_slots, is_closed_arr ) def precompute_slots_cache_numba(numpy_data): """Pre-calcolo della cache degli slot validi per velocizzare le mutazioni.""" masks, lengths, target_days_arr, cons_types, cons_vals = numpy_data num_emps = len(lengths) cache = [] for i in range(num_emps): emp_days = [] shift_len = lengths[i] for d in range(7): ctype = cons_types[i, d] cval = cons_vals[i, d] closing_slot = cfg.get_closing_slot(d) is_closed = cfg.is_day_closed(d) slots = get_valid_start_slots_numba(d, shift_len, ctype, cval, closing_slot, is_closed) emp_days.append(slots) cache.append(emp_days) return cache def run_genetic_algorithm(employees, target_demand, progress_callback=None): print("[*] Conversione dati in tensori NumPy...") numpy_data = convert_employees_to_numpy(employees) if numpy_data[0] is None: return [] print("[*] Generazione cache degli slot...") slots_cache = precompute_slots_cache_numba(numpy_data) pop_size = cfg.genetic_params['population_size'] generations = cfg.genetic_params['generations'] population = initialize_population_numba(pop_size, numpy_data, target_demand, cfg) weights_arr = np.array([ cfg.weights['understaffing'], cfg.weights['overstaffing'], cfg.weights['homogeneity'], cfg.weights['soft_preference'], 0.0 ], dtype=np.float64) daily_slots_scalar = int(cfg.daily_slots) elitism_rate = cfg.genetic_params.get('elitism_rate', 0.02) masks, lengths, target_days_arr, cons_types, cons_vals = numpy_data best_score = float('inf') best_coverage = None best_overall = None diversity_history = [] current_diversity = 0.0 print(f"[*] Avvio evoluzione: {generations} generazioni, size {len(population)}") for gen in range(generations): # 1. Valutazione Fitness scores = calculate_population_fitness_parallel( population, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights_arr, daily_slots_scalar ) min_curr = np.min(scores) best_idx = np.argmin(scores) if min_curr < best_score: best_score = min_curr best_overall = population[best_idx].copy() _, best_coverage = calculate_fitness_numba( population[best_idx], masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights_arr, daily_slots_scalar ) if gen % 5 == 0 or gen == generations - 1: sample_rate = 0.20 dynamic_sample = int(len(population) * sample_rate) # Clamp del sample size per bilanciare significatività statistica e overhead Numba sample_size = max(20, min(dynamic_sample, 200)) sample_size = min(sample_size, len(population)) indices = np.random.choice(len(population), sample_size, replace=False) pop_sample = population[indices] current_diversity = calculate_diversity_snapshot(pop_sample) diversity_history.append(current_diversity) if progress_callback: progress_callback(gen + 1, generations, best_score, current_diversity) # 2. Setup Deficit per le mutazioni guidate if best_coverage is None: current_gap = target_demand else: current_gap = target_demand - best_coverage daily_deficit = np.sum(np.maximum(current_gap, 0), axis=1) total_deficit = np.sum(daily_deficit) if total_deficit > 0: daily_deficit_probs = daily_deficit / total_deficit else: daily_deficit_probs = np.ones(7) / 7.0 # 3. Next Gen & Elitismo new_pop = [] n_elites = max(2, int(len(population) * elitism_rate)) elite_indices = np.argsort(scores)[:n_elites] for idx in elite_indices: new_pop.append(population[idx].copy()) while len(new_pop) < len(population): p1 = tournament_selection(population, scores) p2 = tournament_selection(population, scores) c1, c2 = crossover(p1, p2) new_pop.append(mutate(c1, slots_cache, daily_deficit_probs)) if len(new_pop) < len(population): new_pop.append(mutate(c2, slots_cache, daily_deficit_probs)) population = np.array(new_pop) print("[*] Estrazione top 5 soluzioni uniche...") final_scores = calculate_population_fitness_parallel( population, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights_arr, daily_slots_scalar ) sorted_indices = np.argsort(final_scores) top_solutions = [] seen_hashes = set() for idx in sorted_indices: ind = population[idx] ind_hash = ind.tobytes() if ind_hash in seen_hashes: continue seen_hashes.add(ind_hash) details = calculate_detailed_score( ind, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights_arr, daily_slots_scalar ) sol_data = { "schedule": ind.copy(), "total_score": details[0], "understaffing": details[1], "overstaffing": details[2], "homogeneity": details[3], "soft_preferences": details[4], "contract": details[5], "equity": details[6] } top_solutions.append(sol_data) if len(top_solutions) >= 5: break # Fallback in caso di mancata convergenza su soluzioni uniche if not top_solutions and best_overall is not None: details = calculate_detailed_score( best_overall, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights_arr, daily_slots_scalar ) top_solutions.append({ "schedule": best_overall, "total_score": details[0], "understaffing": details[1], "overstaffing": details[2], "homogeneity": details[3], "soft_preferences": details[4], "contract": details[5], "equity": details[6] }) return top_solutions, diversity_history