GeneticWFM / src /engine /selection.py
GaetanoParente's picture
first commit
9e62f55
import numpy as np
from numba import njit, prange
from src.config import cfg
# Costanti di dominio (mapping per la vettorializzazione)
CODE_OFF = -1
CODE_ABSENCE = -2
CONS_TYPE_HARD = 1
CONS_TYPE_SOFT = 2
CONS_TYPE_ABSENCE = 3
@njit(cache=True)
def get_valid_start_slots_numba(day_idx, shift_len, cons_type, cons_val, closing_slot, is_closed):
"""
Calcolo degli slot di inizio turno validi.
JIT-compiled per massimizzare il throughput. Ritorna strictly un ndarray.
"""
# 1. Vincoli Hard (Assenze e Turni Fissi)
if cons_type == CONS_TYPE_ABSENCE:
return np.array([CODE_ABSENCE], dtype=np.int64)
if cons_type == CONS_TYPE_HARD:
target_slot = cons_val
# Boundary check per evitare sforamenti oltre l'orario di chiusura
if is_closed or (target_slot + shift_len > closing_slot):
return np.array([CODE_OFF], dtype=np.int64)
return np.array([target_slot], dtype=np.int64)
# 2. Assegnazione Standard
if is_closed:
return np.empty(0, dtype=np.int64)
max_start = closing_slot - shift_len
if max_start < 0:
return np.empty(0, dtype=np.int64)
# Generazione array di slot contigui validi
return np.arange(0, max_start + 1, dtype=np.int64)
@njit(cache=True)
def calculate_fitness_numba(individual, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights, daily_slots):
"""
Motore core di valutazione della fitness.
Aggrega i vincoli operativi (contratti, weekend, preferenze) e vettorializza le penalità.
"""
num_emps = len(lengths)
num_days = 7
num_slots = daily_slots
current_coverage = np.zeros((num_days, num_slots), dtype=np.int64)
homogeneity_penalty = 0.0
soft_pref_penalty = 0.0
contract_penalty = 0.0
equity_penalty = 0.0
W_UNDER = weights[0]
W_OVER = weights[1]
W_HOMO = weights[2]
W_SOFT = weights[3]
PENALTY_PER_DAY_MISMATCH = W_UNDER * 2.0
sum_wk_work = 0.0
sum_sq_wk_work = 0.0
# Scansione matrice turni per estrazione metriche operative
for i in range(num_emps):
mask_len = lengths[i]
real_mask = masks[i, :mask_len]
days_worked_count = 0
weekend_days = 0
sum_slots = 0.0
sum_sq_slots = 0.0
count_slots = 0
for day in range(num_days):
start = individual[i, day]
if start >= 0:
days_worked_count += 1
sum_slots += start
sum_sq_slots += start**2
count_slots += 1
if day >= 5: weekend_days += 1
end = min(start + mask_len, num_slots)
real_len = end - start
# Proiezione del fenotipo (maschera VDT) sulla coverage matrix
if real_len > 0:
current_coverage[day, start:end] += real_mask[:real_len]
# Valutazione desiderata (Soft Constraint)
if cons_types[i, day] == CONS_TYPE_SOFT and start >= 0:
target_slot = cons_vals[i, day]
if start != target_slot:
soft_pref_penalty += abs(start - target_slot)
# Check target contrattuale (mix lavorativi vs riposi)
tgt = target_days_arr[i]
if days_worked_count != tgt:
contract_penalty += (abs(days_worked_count - tgt) * PENALTY_PER_DAY_MISMATCH)
# Calcolo varianza per penalizzare pattern orari troppo frammentati
if count_slots > 1:
mean = sum_slots / count_slots
var = (sum_sq_slots / count_slots) - (mean**2)
if var > 0: homogeneity_penalty += np.sqrt(var)
sum_wk_work += weekend_days
sum_sq_wk_work += weekend_days**2
# Equità aziendale: bilanciamento dei carichi sui weekend tra dipendenti
if num_emps > 1:
mean_wk = sum_wk_work / num_emps
var_wk = (sum_sq_wk_work / num_emps) - (mean_wk**2)
if var_wk > 0:
equity_penalty = np.sqrt(var_wk) * W_HOMO * 10.0
# Calcolo delta rispetto al fabbisogno (Demand vs Coverage)
diff = current_coverage - target_demand
under_score = 0.0
flattened_diff = diff.flatten()
for val in flattened_diff:
if val < 0: under_score += abs(val)
under_score *= W_UNDER
over_score = 0.0
safe_target = target_demand.flatten()
for k in range(len(flattened_diff)):
val = flattened_diff[k]
if val > 0:
tgt_val = safe_target[k]
if tgt_val == 0: tgt_val = 1
over_score += (val / tgt_val)
over_score *= (W_OVER * 10.0)
# Aggregazione Loss Function
total_score = (under_score +
over_score +
(homogeneity_penalty * W_HOMO) +
(soft_pref_penalty * W_SOFT) +
contract_penalty +
equity_penalty)
return total_score, current_coverage
@njit(parallel=True, cache=True)
def calculate_population_fitness_parallel(population, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights, daily_slots):
"""
Valutazione massiva della popolazione. Sfrutta il multithreading (prange) di Numba.
"""
pop_size = len(population)
scores = np.zeros(pop_size, dtype=np.float64)
for i in prange(pop_size):
sc, _ = calculate_fitness_numba(
population[i],
masks, lengths, target_days_arr, cons_types, cons_vals,
target_demand, weights, daily_slots
)
scores[i] = sc
return scores
def tournament_selection(population, fitness_scores):
"""Selezione a torneo standard."""
k = cfg.genetic_params.get('tournament_size', 5)
indices = np.random.choice(len(population), k, replace=False)
best_idx = indices[np.argmin(fitness_scores[indices])]
return population[best_idx]
@njit(cache=True)
def calculate_detailed_score(individual, masks, lengths, target_days_arr, cons_types, cons_vals, target_demand, weights, daily_slots):
"""
Versione verbosa del calcolo fitness usata post-convergenza
per l'estrazione delle metriche finali di business da mostrare in UI.
"""
num_emps = len(lengths)
num_days = 7
num_slots = daily_slots
current_coverage = np.zeros((num_days, num_slots), dtype=np.int64)
homogeneity_penalty = 0.0
soft_pref_penalty = 0.0
contract_penalty = 0.0
equity_penalty = 0.0
W_UNDER = weights[0]
W_OVER = weights[1]
W_HOMO = weights[2]
W_SOFT = weights[3]
PENALTY_PER_DAY_MISMATCH = W_UNDER * 2.0
sum_wk_work = 0.0
sum_sq_wk_work = 0.0
for i in range(num_emps):
mask_len = lengths[i]
real_mask = masks[i, :mask_len]
days_worked_count = 0
weekend_days = 0
sum_slots = 0.0
sum_sq_slots = 0.0
count_slots = 0
for day in range(num_days):
start = individual[i, day]
if start >= 0:
days_worked_count += 1
sum_slots += start
sum_sq_slots += start**2
count_slots += 1
if day >= 5: weekend_days += 1
end = min(start + mask_len, num_slots)
real_len = end - start
if real_len > 0:
current_coverage[day, start:end] += real_mask[:real_len]
if cons_types[i, day] == CONS_TYPE_SOFT and start >= 0:
target_slot = cons_vals[i, day]
if start != target_slot:
soft_pref_penalty += abs(start - target_slot)
tgt = target_days_arr[i]
if days_worked_count != tgt:
contract_penalty += (abs(days_worked_count - tgt) * PENALTY_PER_DAY_MISMATCH)
if count_slots > 1:
mean = sum_slots / count_slots
var = (sum_sq_slots / count_slots) - (mean**2)
if var > 0: homogeneity_penalty += np.sqrt(var)
sum_wk_work += weekend_days
sum_sq_wk_work += weekend_days**2
if num_emps > 1:
mean_wk = sum_wk_work / num_emps
var_wk = (sum_sq_wk_work / num_emps) - (mean_wk**2)
if var_wk > 0:
equity_penalty = np.sqrt(var_wk) * W_HOMO * 10.0
diff = current_coverage - target_demand
under_score = 0.0
flattened_diff = diff.flatten()
for val in flattened_diff:
if val < 0: under_score += abs(val)
under_score *= W_UNDER
over_score = 0.0
safe_target = target_demand.flatten()
for k in range(len(flattened_diff)):
val = flattened_diff[k]
if val > 0:
tgt_val = safe_target[k]
if tgt_val == 0: tgt_val = 1
over_score += (val / tgt_val)
over_score *= (W_OVER * 10.0)
cost_homo = homogeneity_penalty * W_HOMO
cost_soft = soft_pref_penalty * W_SOFT
total = under_score + over_score + cost_homo + cost_soft + contract_penalty + equity_penalty
# Ritorna l'array esploso delle loss per i grafici
return np.array([total, under_score, over_score, cost_homo, cost_soft, contract_penalty, equity_penalty])