Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from numba import njit, prange | |
| from src.config import cfg | |
| # Costanti di dominio (mapping per la vettorializzazione) | |
| CODE_OFF = -1 | |
| CODE_ABSENCE = -2 | |
| CONS_TYPE_HARD = 1 | |
| CONS_TYPE_SOFT = 2 | |
| CONS_TYPE_ABSENCE = 3 | |
| def get_valid_start_slots_numba(day_idx, shift_len, cons_type, cons_val, closing_slot, is_closed): | |
| """ | |
| Calcolo degli slot di inizio turno validi. | |
| JIT-compiled per massimizzare il throughput. Ritorna strictly un ndarray. | |
| """ | |
| # 1. Vincoli Hard (Assenze e Turni Fissi) | |
| if cons_type == CONS_TYPE_ABSENCE: | |
| return np.array([CODE_ABSENCE], dtype=np.int64) | |
| if cons_type == CONS_TYPE_HARD: | |
| target_slot = cons_val | |
| # Boundary check per evitare sforamenti oltre l'orario di chiusura | |
| if is_closed or (target_slot + shift_len > closing_slot): | |
| return np.array([CODE_OFF], dtype=np.int64) | |
| return np.array([target_slot], dtype=np.int64) | |
| # 2. Assegnazione Standard | |
| if is_closed: | |
| return np.empty(0, dtype=np.int64) | |
| max_start = closing_slot - shift_len | |
| if max_start < 0: | |
| return np.empty(0, dtype=np.int64) | |
| # Generazione array di slot contigui validi | |
| return np.arange(0, max_start + 1, dtype=np.int64) | |
| def calculate_fitness_numba(individual, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights, daily_slots): | |
| """ | |
| Motore core di valutazione della fitness. | |
| Aggrega i vincoli operativi (contratti, weekend, preferenze) e vettorializza le penalità. | |
| """ | |
| num_emps = len(lengths) | |
| num_days = 7 | |
| num_slots = daily_slots | |
| current_coverage = np.zeros((num_days, num_slots), dtype=np.int64) | |
| homogeneity_penalty = 0.0 | |
| soft_pref_penalty = 0.0 | |
| contract_penalty = 0.0 | |
| equity_penalty = 0.0 | |
| W_UNDER = weights[0] | |
| W_OVER = weights[1] | |
| W_HOMO = weights[2] | |
| W_SOFT = weights[3] | |
| PENALTY_PER_DAY_MISMATCH = W_UNDER * 2.0 | |
| sum_wk_work = 0.0 | |
| sum_sq_wk_work = 0.0 | |
| # Scansione matrice turni per estrazione metriche operative | |
| for i in range(num_emps): | |
| mask_len = lengths[i] | |
| real_mask = masks[i, :mask_len] | |
| days_worked_count = 0 | |
| weekend_days = 0 | |
| sum_slots = 0.0 | |
| sum_sq_slots = 0.0 | |
| count_slots = 0 | |
| for day in range(num_days): | |
| start = individual[i, day] | |
| if start >= 0: | |
| days_worked_count += 1 | |
| sum_slots += start | |
| sum_sq_slots += start**2 | |
| count_slots += 1 | |
| if day >= 5: weekend_days += 1 | |
| end = min(start + mask_len, num_slots) | |
| real_len = end - start | |
| # Proiezione del fenotipo (maschera VDT) sulla coverage matrix | |
| if real_len > 0: | |
| current_coverage[day, start:end] += real_mask[:real_len] | |
| # Valutazione desiderata (Soft Constraint) | |
| if cons_types[i, day] == CONS_TYPE_SOFT and start >= 0: | |
| target_slot = cons_vals[i, day] | |
| if start != target_slot: | |
| soft_pref_penalty += abs(start - target_slot) | |
| # Check target contrattuale (mix lavorativi vs riposi) | |
| tgt = target_days_arr[i] | |
| if days_worked_count != tgt: | |
| contract_penalty += (abs(days_worked_count - tgt) * PENALTY_PER_DAY_MISMATCH) | |
| # Calcolo varianza per penalizzare pattern orari troppo frammentati | |
| if count_slots > 1: | |
| mean = sum_slots / count_slots | |
| var = (sum_sq_slots / count_slots) - (mean**2) | |
| if var > 0: homogeneity_penalty += np.sqrt(var) | |
| sum_wk_work += weekend_days | |
| sum_sq_wk_work += weekend_days**2 | |
| # Equità aziendale: bilanciamento dei carichi sui weekend tra dipendenti | |
| if num_emps > 1: | |
| mean_wk = sum_wk_work / num_emps | |
| var_wk = (sum_sq_wk_work / num_emps) - (mean_wk**2) | |
| if var_wk > 0: | |
| equity_penalty = np.sqrt(var_wk) * W_HOMO * 10.0 | |
| # Calcolo delta rispetto al fabbisogno (Demand vs Coverage) | |
| diff = current_coverage - target_demand | |
| under_score = 0.0 | |
| flattened_diff = diff.flatten() | |
| for val in flattened_diff: | |
| if val < 0: under_score += abs(val) | |
| under_score *= W_UNDER | |
| over_score = 0.0 | |
| safe_target = target_demand.flatten() | |
| for k in range(len(flattened_diff)): | |
| val = flattened_diff[k] | |
| if val > 0: | |
| tgt_val = safe_target[k] | |
| if tgt_val == 0: tgt_val = 1 | |
| over_score += (val / tgt_val) | |
| over_score *= (W_OVER * 10.0) | |
| # Aggregazione Loss Function | |
| total_score = (under_score + | |
| over_score + | |
| (homogeneity_penalty * W_HOMO) + | |
| (soft_pref_penalty * W_SOFT) + | |
| contract_penalty + | |
| equity_penalty) | |
| return total_score, current_coverage | |
| def calculate_population_fitness_parallel(population, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights, daily_slots): | |
| """ | |
| Valutazione massiva della popolazione. Sfrutta il multithreading (prange) di Numba. | |
| """ | |
| pop_size = len(population) | |
| scores = np.zeros(pop_size, dtype=np.float64) | |
| for i in prange(pop_size): | |
| sc, _ = calculate_fitness_numba( | |
| population[i], | |
| masks, lengths, target_days_arr, cons_types, cons_vals, | |
| target_demand, weights, daily_slots | |
| ) | |
| scores[i] = sc | |
| return scores | |
| def tournament_selection(population, fitness_scores): | |
| """Selezione a torneo standard.""" | |
| k = cfg.genetic_params.get('tournament_size', 5) | |
| indices = np.random.choice(len(population), k, replace=False) | |
| best_idx = indices[np.argmin(fitness_scores[indices])] | |
| return population[best_idx] | |
| def calculate_detailed_score(individual, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights, daily_slots): | |
| """ | |
| Versione verbosa del calcolo fitness usata post-convergenza | |
| per l'estrazione delle metriche finali di business da mostrare in UI. | |
| """ | |
| num_emps = len(lengths) | |
| num_days = 7 | |
| num_slots = daily_slots | |
| current_coverage = np.zeros((num_days, num_slots), dtype=np.int64) | |
| homogeneity_penalty = 0.0 | |
| soft_pref_penalty = 0.0 | |
| contract_penalty = 0.0 | |
| equity_penalty = 0.0 | |
| W_UNDER = weights[0] | |
| W_OVER = weights[1] | |
| W_HOMO = weights[2] | |
| W_SOFT = weights[3] | |
| PENALTY_PER_DAY_MISMATCH = W_UNDER * 2.0 | |
| sum_wk_work = 0.0 | |
| sum_sq_wk_work = 0.0 | |
| for i in range(num_emps): | |
| mask_len = lengths[i] | |
| real_mask = masks[i, :mask_len] | |
| days_worked_count = 0 | |
| weekend_days = 0 | |
| sum_slots = 0.0 | |
| sum_sq_slots = 0.0 | |
| count_slots = 0 | |
| for day in range(num_days): | |
| start = individual[i, day] | |
| if start >= 0: | |
| days_worked_count += 1 | |
| sum_slots += start | |
| sum_sq_slots += start**2 | |
| count_slots += 1 | |
| if day >= 5: weekend_days += 1 | |
| end = min(start + mask_len, num_slots) | |
| real_len = end - start | |
| if real_len > 0: | |
| current_coverage[day, start:end] += real_mask[:real_len] | |
| if cons_types[i, day] == CONS_TYPE_SOFT and start >= 0: | |
| target_slot = cons_vals[i, day] | |
| if start != target_slot: | |
| soft_pref_penalty += abs(start - target_slot) | |
| tgt = target_days_arr[i] | |
| if days_worked_count != tgt: | |
| contract_penalty += (abs(days_worked_count - tgt) * PENALTY_PER_DAY_MISMATCH) | |
| if count_slots > 1: | |
| mean = sum_slots / count_slots | |
| var = (sum_sq_slots / count_slots) - (mean**2) | |
| if var > 0: homogeneity_penalty += np.sqrt(var) | |
| sum_wk_work += weekend_days | |
| sum_sq_wk_work += weekend_days**2 | |
| if num_emps > 1: | |
| mean_wk = sum_wk_work / num_emps | |
| var_wk = (sum_sq_wk_work / num_emps) - (mean_wk**2) | |
| if var_wk > 0: | |
| equity_penalty = np.sqrt(var_wk) * W_HOMO * 10.0 | |
| diff = current_coverage - target_demand | |
| under_score = 0.0 | |
| flattened_diff = diff.flatten() | |
| for val in flattened_diff: | |
| if val < 0: under_score += abs(val) | |
| under_score *= W_UNDER | |
| over_score = 0.0 | |
| safe_target = target_demand.flatten() | |
| for k in range(len(flattened_diff)): | |
| val = flattened_diff[k] | |
| if val > 0: | |
| tgt_val = safe_target[k] | |
| if tgt_val == 0: tgt_val = 1 | |
| over_score += (val / tgt_val) | |
| over_score *= (W_OVER * 10.0) | |
| cost_homo = homogeneity_penalty * W_HOMO | |
| cost_soft = soft_pref_penalty * W_SOFT | |
| total = under_score + over_score + cost_homo + cost_soft + contract_penalty + equity_penalty | |
| # Ritorna l'array esploso delle loss per i grafici | |
| return np.array([total, under_score, over_score, cost_homo, cost_soft, contract_penalty, equity_penalty]) |