import numpy as np from numba import njit, prange from src.config import cfg # Costanti di dominio (mapping per la vettorializzazione) CODE_OFF = -1 CODE_ABSENCE = -2 CONS_TYPE_HARD = 1 CONS_TYPE_SOFT = 2 CONS_TYPE_ABSENCE = 3 @njit(cache=True) def get_valid_start_slots_numba(day_idx, shift_len, cons_type, cons_val, closing_slot, is_closed): """ Calcolo degli slot di inizio turno validi. JIT-compiled per massimizzare il throughput. Ritorna strictly un ndarray. """ # 1. Vincoli Hard (Assenze e Turni Fissi) if cons_type == CONS_TYPE_ABSENCE: return np.array([CODE_ABSENCE], dtype=np.int64) if cons_type == CONS_TYPE_HARD: target_slot = cons_val # Boundary check per evitare sforamenti oltre l'orario di chiusura if is_closed or (target_slot + shift_len > closing_slot): return np.array([CODE_OFF], dtype=np.int64) return np.array([target_slot], dtype=np.int64) # 2. Assegnazione Standard if is_closed: return np.empty(0, dtype=np.int64) max_start = closing_slot - shift_len if max_start < 0: return np.empty(0, dtype=np.int64) # Generazione array di slot contigui validi return np.arange(0, max_start + 1, dtype=np.int64) @njit(cache=True) def calculate_fitness_numba(individual, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights, daily_slots): """ Motore core di valutazione della fitness. Aggrega i vincoli operativi (contratti, weekend, preferenze) e vettorializza le penalità. """ num_emps = len(lengths) num_days = 7 num_slots = daily_slots current_coverage = np.zeros((num_days, num_slots), dtype=np.int64) homogeneity_penalty = 0.0 soft_pref_penalty = 0.0 contract_penalty = 0.0 equity_penalty = 0.0 W_UNDER = weights[0] W_OVER = weights[1] W_HOMO = weights[2] W_SOFT = weights[3] PENALTY_PER_DAY_MISMATCH = W_UNDER * 2.0 sum_wk_work = 0.0 sum_sq_wk_work = 0.0 # Scansione matrice turni per estrazione metriche operative for i in range(num_emps): mask_len = lengths[i] real_mask = masks[i, :mask_len] days_worked_count = 0 weekend_days = 0 sum_slots = 0.0 sum_sq_slots = 0.0 count_slots = 0 for day in range(num_days): start = individual[i, day] if start >= 0: days_worked_count += 1 sum_slots += start sum_sq_slots += start**2 count_slots += 1 if day >= 5: weekend_days += 1 end = min(start + mask_len, num_slots) real_len = end - start # Proiezione del fenotipo (maschera VDT) sulla coverage matrix if real_len > 0: current_coverage[day, start:end] += real_mask[:real_len] # Valutazione desiderata (Soft Constraint) if cons_types[i, day] == CONS_TYPE_SOFT and start >= 0: target_slot = cons_vals[i, day] if start != target_slot: soft_pref_penalty += abs(start - target_slot) # Check target contrattuale (mix lavorativi vs riposi) tgt = target_days_arr[i] if days_worked_count != tgt: contract_penalty += (abs(days_worked_count - tgt) * PENALTY_PER_DAY_MISMATCH) # Calcolo varianza per penalizzare pattern orari troppo frammentati if count_slots > 1: mean = sum_slots / count_slots var = (sum_sq_slots / count_slots) - (mean**2) if var > 0: homogeneity_penalty += np.sqrt(var) sum_wk_work += weekend_days sum_sq_wk_work += weekend_days**2 # Equità aziendale: bilanciamento dei carichi sui weekend tra dipendenti if num_emps > 1: mean_wk = sum_wk_work / num_emps var_wk = (sum_sq_wk_work / num_emps) - (mean_wk**2) if var_wk > 0: equity_penalty = np.sqrt(var_wk) * W_HOMO * 10.0 # Calcolo delta rispetto al fabbisogno (Demand vs Coverage) diff = current_coverage - target_demand under_score = 0.0 flattened_diff = diff.flatten() for val in flattened_diff: if val < 0: under_score += abs(val) under_score *= W_UNDER over_score = 0.0 safe_target = target_demand.flatten() for k in range(len(flattened_diff)): val = flattened_diff[k] if val > 0: tgt_val = safe_target[k] if tgt_val == 0: tgt_val = 1 over_score += (val / tgt_val) over_score *= (W_OVER * 10.0) # Aggregazione Loss Function total_score = (under_score + over_score + (homogeneity_penalty * W_HOMO) + (soft_pref_penalty * W_SOFT) + contract_penalty + equity_penalty) return total_score, current_coverage @njit(parallel=True, cache=True) def calculate_population_fitness_parallel(population, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights, daily_slots): """ Valutazione massiva della popolazione. Sfrutta il multithreading (prange) di Numba. """ pop_size = len(population) scores = np.zeros(pop_size, dtype=np.float64) for i in prange(pop_size): sc, _ = calculate_fitness_numba( population[i], masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights, daily_slots ) scores[i] = sc return scores def tournament_selection(population, fitness_scores): """Selezione a torneo standard.""" k = cfg.genetic_params.get('tournament_size', 5) indices = np.random.choice(len(population), k, replace=False) best_idx = indices[np.argmin(fitness_scores[indices])] return population[best_idx] @njit(cache=True) def calculate_detailed_score(individual, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights, daily_slots): """ Versione verbosa del calcolo fitness usata post-convergenza per l'estrazione delle metriche finali di business da mostrare in UI. """ num_emps = len(lengths) num_days = 7 num_slots = daily_slots current_coverage = np.zeros((num_days, num_slots), dtype=np.int64) homogeneity_penalty = 0.0 soft_pref_penalty = 0.0 contract_penalty = 0.0 equity_penalty = 0.0 W_UNDER = weights[0] W_OVER = weights[1] W_HOMO = weights[2] W_SOFT = weights[3] PENALTY_PER_DAY_MISMATCH = W_UNDER * 2.0 sum_wk_work = 0.0 sum_sq_wk_work = 0.0 for i in range(num_emps): mask_len = lengths[i] real_mask = masks[i, :mask_len] days_worked_count = 0 weekend_days = 0 sum_slots = 0.0 sum_sq_slots = 0.0 count_slots = 0 for day in range(num_days): start = individual[i, day] if start >= 0: days_worked_count += 1 sum_slots += start sum_sq_slots += start**2 count_slots += 1 if day >= 5: weekend_days += 1 end = min(start + mask_len, num_slots) real_len = end - start if real_len > 0: current_coverage[day, start:end] += real_mask[:real_len] if cons_types[i, day] == CONS_TYPE_SOFT and start >= 0: target_slot = cons_vals[i, day] if start != target_slot: soft_pref_penalty += abs(start - target_slot) tgt = target_days_arr[i] if days_worked_count != tgt: contract_penalty += (abs(days_worked_count - tgt) * PENALTY_PER_DAY_MISMATCH) if count_slots > 1: mean = sum_slots / count_slots var = (sum_sq_slots / count_slots) - (mean**2) if var > 0: homogeneity_penalty += np.sqrt(var) sum_wk_work += weekend_days sum_sq_wk_work += weekend_days**2 if num_emps > 1: mean_wk = sum_wk_work / num_emps var_wk = (sum_sq_wk_work / num_emps) - (mean_wk**2) if var_wk > 0: equity_penalty = np.sqrt(var_wk) * W_HOMO * 10.0 diff = current_coverage - target_demand under_score = 0.0 flattened_diff = diff.flatten() for val in flattened_diff: if val < 0: under_score += abs(val) under_score *= W_UNDER over_score = 0.0 safe_target = target_demand.flatten() for k in range(len(flattened_diff)): val = flattened_diff[k] if val > 0: tgt_val = safe_target[k] if tgt_val == 0: tgt_val = 1 over_score += (val / tgt_val) over_score *= (W_OVER * 10.0) cost_homo = homogeneity_penalty * W_HOMO cost_soft = soft_pref_penalty * W_SOFT total = under_score + over_score + cost_homo + cost_soft + contract_penalty + equity_penalty # Ritorna l'array esploso delle loss per i grafici return np.array([total, under_score, over_score, cost_homo, cost_soft, contract_penalty, equity_penalty])