Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from numba import njit, prange | |
| from src.config import cfg | |
| from src.problems.my_problem import convert_employees_to_numpy | |
| from src.engine.selection import tournament_selection, calculate_population_fitness_parallel, calculate_fitness_numba, calculate_detailed_score, get_valid_start_slots_numba, CODE_OFF | |
| from src.engine.crossover import crossover | |
| from src.engine.mutation import mutate | |
| from src.utils.health import calculate_diversity_snapshot | |
| def _generate_single_random_individual(num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, closing_slots, is_closed_arr): | |
| """ | |
| Genera un individuo con assegnazioni di turni completamente casuali. | |
| """ | |
| roster = np.full((num_emps, num_days), CODE_OFF, dtype=np.int64) | |
| for i in range(num_emps): | |
| shift_len = lengths[i] | |
| days_worked_indices = np.empty(7, dtype=np.int64) | |
| dw_ptr = 0 | |
| # Assegnazione casuale degli slot validi | |
| for d in range(num_days): | |
| ctype = cons_types[i, d] | |
| cval = cons_vals[i, d] | |
| c_slot = closing_slots[d] | |
| closed = is_closed_arr[d] | |
| valid = get_valid_start_slots_numba(d, shift_len, ctype, cval, c_slot, closed) | |
| if len(valid) > 0: | |
| chosen = valid[np.random.randint(0, len(valid))] | |
| roster[i, d] = chosen | |
| if chosen >= 0: | |
| days_worked_indices[dw_ptr] = d | |
| dw_ptr += 1 | |
| # Drop dei giorni in eccesso rispetto al target contrattuale | |
| tgt = target_days_arr[i] | |
| while dw_ptr > tgt: | |
| idx_in_buffer = np.random.randint(0, dw_ptr) | |
| day_to_remove = days_worked_indices[idx_in_buffer] | |
| roster[i, day_to_remove] = CODE_OFF | |
| days_worked_indices[idx_in_buffer] = days_worked_indices[dw_ptr - 1] | |
| dw_ptr -= 1 | |
| return roster | |
| def _generate_single_heuristic_individual(num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, closing_slots, is_closed_arr, target_demand, randomness): | |
| """ | |
| Genera un individuo usando un approccio greedy basato sulla demand residua. | |
| """ | |
| num_slots = target_demand.shape[1] | |
| roster = np.full((num_emps, num_days), CODE_OFF, dtype=np.int64) | |
| # Copia locale della demand per aggiornare i residui | |
| residual = target_demand.copy().astype(np.float64) | |
| emp_order = np.random.permutation(num_emps) | |
| for i in emp_order: | |
| shift_len = lengths[i] | |
| tgt = target_days_arr[i] | |
| # Calcolo dei giorni con maggior necessità operativa | |
| daily_needs = np.zeros(num_days, dtype=np.float64) | |
| for d in range(num_days): | |
| s = 0.0 | |
| for k in range(num_slots): | |
| s += residual[d, k] | |
| daily_needs[d] = s | |
| if randomness > 0: | |
| noise = np.random.randn(num_days) * (np.mean(daily_needs) * randomness) | |
| daily_needs += noise | |
| sorted_days = np.argsort(daily_needs) | |
| start_idx = max(0, num_days - tgt) | |
| preferred_days = sorted_days[start_idx:] | |
| # Assegnazione dello slot migliore sul giorno scelto | |
| for day in preferred_days: | |
| ctype = cons_types[i, day] | |
| cval = cons_vals[i, day] | |
| c_slot = closing_slots[day] | |
| closed = is_closed_arr[day] | |
| valid = get_valid_start_slots_numba(day, shift_len, ctype, cval, c_slot, closed) | |
| if len(valid) == 0: | |
| roster[i, day] = CODE_OFF | |
| continue | |
| # Torneo limitato a 5 candidati per ridurre l'overhead | |
| n_cand = len(valid) | |
| limit = 5 | |
| if n_cand <= limit: | |
| candidates = valid | |
| else: | |
| candidates = np.empty(limit, dtype=np.int64) | |
| for k in range(limit): | |
| candidates[k] = valid[np.random.randint(0, n_cand)] | |
| best_slot = candidates[0] | |
| best_score = -1e9 | |
| for slot in candidates: | |
| if slot < 0: continue | |
| end = min(slot + shift_len, num_slots) | |
| score = 0.0 | |
| for k in range(slot, end): | |
| score += residual[day, k] | |
| if score > best_score: | |
| best_score = score | |
| best_slot = slot | |
| roster[i, day] = best_slot | |
| if best_slot >= 0: | |
| end = min(best_slot + shift_len, num_slots) | |
| for k in range(best_slot, end): | |
| val = residual[day, k] - 1.0 | |
| if val < 0: val = 0.0 | |
| residual[day, k] = val | |
| return roster | |
| def initialize_population_fast_wrapper(pop_size, heuristic_limit, numpy_data, target_demand, randomness, closing_slots, is_closed_arr): | |
| """ | |
| Wrapper JIT parallelo per la generazione massiva della popolazione. | |
| """ | |
| _, lengths, target_days_arr, cons_types, cons_vals = numpy_data | |
| num_emps = len(lengths) | |
| num_days = 7 | |
| population = np.zeros((pop_size, num_emps, num_days), dtype=np.int64) | |
| for p in prange(pop_size): | |
| if p < heuristic_limit: | |
| population[p] = _generate_single_heuristic_individual( | |
| num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, | |
| closing_slots, is_closed_arr, target_demand, randomness | |
| ) | |
| else: | |
| population[p] = _generate_single_random_individual( | |
| num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, | |
| closing_slots, is_closed_arr | |
| ) | |
| return population | |
| def initialize_population_numba(pop_size, numpy_data, target_demand, cfg): | |
| heuristic_rate = cfg.genetic_params.get('heuristic_rate', 0.8) | |
| heuristic_noise = cfg.genetic_params.get('heuristic_noise', 0.2) | |
| num_heuristic = int(pop_size * heuristic_rate) | |
| print(f"[*] Inizializzazione popolazione: {num_heuristic} greedy, {pop_size - num_heuristic} random") | |
| # Passaggio array espliciti per bypassare le limitazioni di Numba sugli oggetti custom | |
| closing_slots = np.array([cfg.get_closing_slot(d) for d in range(7)], dtype=np.int64) | |
| is_closed_arr = np.array([1 if cfg.is_day_closed(d) else 0 for d in range(7)], dtype=np.int64) | |
| return initialize_population_fast_wrapper( | |
| pop_size, num_heuristic, numpy_data, target_demand, heuristic_noise, closing_slots, is_closed_arr | |
| ) | |
| def precompute_slots_cache_numba(numpy_data): | |
| """Pre-calcolo della cache degli slot validi per velocizzare le mutazioni.""" | |
| masks, lengths, target_days_arr, cons_types, cons_vals = numpy_data | |
| num_emps = len(lengths) | |
| cache = [] | |
| for i in range(num_emps): | |
| emp_days = [] | |
| shift_len = lengths[i] | |
| for d in range(7): | |
| ctype = cons_types[i, d] | |
| cval = cons_vals[i, d] | |
| closing_slot = cfg.get_closing_slot(d) | |
| is_closed = cfg.is_day_closed(d) | |
| slots = get_valid_start_slots_numba(d, shift_len, ctype, cval, closing_slot, is_closed) | |
| emp_days.append(slots) | |
| cache.append(emp_days) | |
| return cache | |
| def run_genetic_algorithm(employees, target_demand, progress_callback=None): | |
| print("[*] Conversione dati in tensori NumPy...") | |
| numpy_data = convert_employees_to_numpy(employees) | |
| if numpy_data[0] is None: | |
| return [] | |
| print("[*] Generazione cache degli slot...") | |
| slots_cache = precompute_slots_cache_numba(numpy_data) | |
| pop_size = cfg.genetic_params['population_size'] | |
| generations = cfg.genetic_params['generations'] | |
| population = initialize_population_numba(pop_size, numpy_data, target_demand, cfg) | |
| weights_arr = np.array([ | |
| cfg.weights['understaffing'], | |
| cfg.weights['overstaffing'], | |
| cfg.weights['homogeneity'], | |
| cfg.weights['soft_preference'], | |
| 0.0 | |
| ], dtype=np.float64) | |
| daily_slots_scalar = int(cfg.daily_slots) | |
| elitism_rate = cfg.genetic_params.get('elitism_rate', 0.02) | |
| masks, lengths, target_days_arr, cons_types, cons_vals = numpy_data | |
| best_score = float('inf') | |
| best_coverage = None | |
| best_overall = None | |
| diversity_history = [] | |
| current_diversity = 0.0 | |
| print(f"[*] Avvio evoluzione: {generations} generazioni, size {len(population)}") | |
| for gen in range(generations): | |
| # 1. Valutazione Fitness | |
| scores = calculate_population_fitness_parallel( | |
| population, | |
| masks, lengths, target_days_arr, cons_types, cons_vals, | |
| target_demand, weights_arr, daily_slots_scalar | |
| ) | |
| min_curr = np.min(scores) | |
| best_idx = np.argmin(scores) | |
| if min_curr < best_score: | |
| best_score = min_curr | |
| best_overall = population[best_idx].copy() | |
| _, best_coverage = calculate_fitness_numba( | |
| population[best_idx], masks, lengths, target_days_arr, cons_types, cons_vals, | |
| target_demand, weights_arr, daily_slots_scalar | |
| ) | |
| if gen % 5 == 0 or gen == generations - 1: | |
| sample_rate = 0.20 | |
| dynamic_sample = int(len(population) * sample_rate) | |
| # Clamp del sample size per bilanciare significatività statistica e overhead Numba | |
| sample_size = max(20, min(dynamic_sample, 200)) | |
| sample_size = min(sample_size, len(population)) | |
| indices = np.random.choice(len(population), sample_size, replace=False) | |
| pop_sample = population[indices] | |
| current_diversity = calculate_diversity_snapshot(pop_sample) | |
| diversity_history.append(current_diversity) | |
| if progress_callback: | |
| progress_callback(gen + 1, generations, best_score, current_diversity) | |
| # 2. Setup Deficit per le mutazioni guidate | |
| if best_coverage is None: | |
| current_gap = target_demand | |
| else: | |
| current_gap = target_demand - best_coverage | |
| daily_deficit = np.sum(np.maximum(current_gap, 0), axis=1) | |
| total_deficit = np.sum(daily_deficit) | |
| if total_deficit > 0: | |
| daily_deficit_probs = daily_deficit / total_deficit | |
| else: | |
| daily_deficit_probs = np.ones(7) / 7.0 | |
| # 3. Next Gen & Elitismo | |
| new_pop = [] | |
| n_elites = max(2, int(len(population) * elitism_rate)) | |
| elite_indices = np.argsort(scores)[:n_elites] | |
| for idx in elite_indices: | |
| new_pop.append(population[idx].copy()) | |
| while len(new_pop) < len(population): | |
| p1 = tournament_selection(population, scores) | |
| p2 = tournament_selection(population, scores) | |
| c1, c2 = crossover(p1, p2) | |
| new_pop.append(mutate(c1, slots_cache, daily_deficit_probs)) | |
| if len(new_pop) < len(population): | |
| new_pop.append(mutate(c2, slots_cache, daily_deficit_probs)) | |
| population = np.array(new_pop) | |
| print("[*] Estrazione top 5 soluzioni uniche...") | |
| final_scores = calculate_population_fitness_parallel( | |
| population, masks, lengths, target_days_arr, cons_types, cons_vals, | |
| target_demand, weights_arr, daily_slots_scalar | |
| ) | |
| sorted_indices = np.argsort(final_scores) | |
| top_solutions = [] | |
| seen_hashes = set() | |
| for idx in sorted_indices: | |
| ind = population[idx] | |
| ind_hash = ind.tobytes() | |
| if ind_hash in seen_hashes: continue | |
| seen_hashes.add(ind_hash) | |
| details = calculate_detailed_score( | |
| ind, masks, lengths, target_days_arr, cons_types, cons_vals, | |
| target_demand, weights_arr, daily_slots_scalar | |
| ) | |
| sol_data = { | |
| "schedule": ind.copy(), | |
| "total_score": details[0], | |
| "understaffing": details[1], | |
| "overstaffing": details[2], | |
| "homogeneity": details[3], | |
| "soft_preferences": details[4], | |
| "contract": details[5], | |
| "equity": details[6] | |
| } | |
| top_solutions.append(sol_data) | |
| if len(top_solutions) >= 5: break | |
| # Fallback in caso di mancata convergenza su soluzioni uniche | |
| if not top_solutions and best_overall is not None: | |
| details = calculate_detailed_score( | |
| best_overall, masks, lengths, target_days_arr, cons_types, cons_vals, | |
| target_demand, weights_arr, daily_slots_scalar | |
| ) | |
| top_solutions.append({ | |
| "schedule": best_overall, | |
| "total_score": details[0], | |
| "understaffing": details[1], | |
| "overstaffing": details[2], | |
| "homogeneity": details[3], | |
| "soft_preferences": details[4], | |
| "contract": details[5], | |
| "equity": details[6] | |
| }) | |
| return top_solutions, diversity_history |