GeneticWFM / src /engine /evolution.py
GaetanoParente's picture
first commit
9e62f55
import numpy as np
from numba import njit, prange
from src.config import cfg
from src.problems.my_problem import convert_employees_to_numpy
from src.engine.selection import tournament_selection, calculate_population_fitness_parallel, calculate_fitness_numba, calculate_detailed_score, get_valid_start_slots_numba, CODE_OFF
from src.engine.crossover import crossover
from src.engine.mutation import mutate
from src.utils.health import calculate_diversity_snapshot
@njit(cache=True)
def _generate_single_random_individual(num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, closing_slots, is_closed_arr):
"""
Genera un individuo con assegnazioni di turni completamente casuali.
"""
roster = np.full((num_emps, num_days), CODE_OFF, dtype=np.int64)
for i in range(num_emps):
shift_len = lengths[i]
days_worked_indices = np.empty(7, dtype=np.int64)
dw_ptr = 0
# Assegnazione casuale degli slot validi
for d in range(num_days):
ctype = cons_types[i, d]
cval = cons_vals[i, d]
c_slot = closing_slots[d]
closed = is_closed_arr[d]
valid = get_valid_start_slots_numba(d, shift_len, ctype, cval, c_slot, closed)
if len(valid) > 0:
chosen = valid[np.random.randint(0, len(valid))]
roster[i, d] = chosen
if chosen >= 0:
days_worked_indices[dw_ptr] = d
dw_ptr += 1
# Drop dei giorni in eccesso rispetto al target contrattuale
tgt = target_days_arr[i]
while dw_ptr > tgt:
idx_in_buffer = np.random.randint(0, dw_ptr)
day_to_remove = days_worked_indices[idx_in_buffer]
roster[i, day_to_remove] = CODE_OFF
days_worked_indices[idx_in_buffer] = days_worked_indices[dw_ptr - 1]
dw_ptr -= 1
return roster
@njit(cache=True)
def _generate_single_heuristic_individual(num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, closing_slots, is_closed_arr, target_demand, randomness):
"""
Genera un individuo usando un approccio greedy basato sulla demand residua.
"""
num_slots = target_demand.shape[1]
roster = np.full((num_emps, num_days), CODE_OFF, dtype=np.int64)
# Copia locale della demand per aggiornare i residui
residual = target_demand.copy().astype(np.float64)
emp_order = np.random.permutation(num_emps)
for i in emp_order:
shift_len = lengths[i]
tgt = target_days_arr[i]
# Calcolo dei giorni con maggior necessità operativa
daily_needs = np.zeros(num_days, dtype=np.float64)
for d in range(num_days):
s = 0.0
for k in range(num_slots):
s += residual[d, k]
daily_needs[d] = s
if randomness > 0:
noise = np.random.randn(num_days) * (np.mean(daily_needs) * randomness)
daily_needs += noise
sorted_days = np.argsort(daily_needs)
start_idx = max(0, num_days - tgt)
preferred_days = sorted_days[start_idx:]
# Assegnazione dello slot migliore sul giorno scelto
for day in preferred_days:
ctype = cons_types[i, day]
cval = cons_vals[i, day]
c_slot = closing_slots[day]
closed = is_closed_arr[day]
valid = get_valid_start_slots_numba(day, shift_len, ctype, cval, c_slot, closed)
if len(valid) == 0:
roster[i, day] = CODE_OFF
continue
# Torneo limitato a 5 candidati per ridurre l'overhead
n_cand = len(valid)
limit = 5
if n_cand <= limit:
candidates = valid
else:
candidates = np.empty(limit, dtype=np.int64)
for k in range(limit):
candidates[k] = valid[np.random.randint(0, n_cand)]
best_slot = candidates[0]
best_score = -1e9
for slot in candidates:
if slot < 0: continue
end = min(slot + shift_len, num_slots)
score = 0.0
for k in range(slot, end):
score += residual[day, k]
if score > best_score:
best_score = score
best_slot = slot
roster[i, day] = best_slot
if best_slot >= 0:
end = min(best_slot + shift_len, num_slots)
for k in range(best_slot, end):
val = residual[day, k] - 1.0
if val < 0: val = 0.0
residual[day, k] = val
return roster
@njit(parallel=True, cache=True)
def initialize_population_fast_wrapper(pop_size, heuristic_limit, numpy_data, target_demand, randomness, closing_slots, is_closed_arr):
"""
Wrapper JIT parallelo per la generazione massiva della popolazione.
"""
_, lengths, target_days_arr, cons_types, cons_vals = numpy_data
num_emps = len(lengths)
num_days = 7
population = np.zeros((pop_size, num_emps, num_days), dtype=np.int64)
for p in prange(pop_size):
if p < heuristic_limit:
population[p] = _generate_single_heuristic_individual(
num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals,
closing_slots, is_closed_arr, target_demand, randomness
)
else:
population[p] = _generate_single_random_individual(
num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals,
closing_slots, is_closed_arr
)
return population
def initialize_population_numba(pop_size, numpy_data, target_demand, cfg):
heuristic_rate = cfg.genetic_params.get('heuristic_rate', 0.8)
heuristic_noise = cfg.genetic_params.get('heuristic_noise', 0.2)
num_heuristic = int(pop_size * heuristic_rate)
print(f"[*] Inizializzazione popolazione: {num_heuristic} greedy, {pop_size - num_heuristic} random")
# Passaggio array espliciti per bypassare le limitazioni di Numba sugli oggetti custom
closing_slots = np.array([cfg.get_closing_slot(d) for d in range(7)], dtype=np.int64)
is_closed_arr = np.array([1 if cfg.is_day_closed(d) else 0 for d in range(7)], dtype=np.int64)
return initialize_population_fast_wrapper(
pop_size, num_heuristic, numpy_data, target_demand, heuristic_noise, closing_slots, is_closed_arr
)
def precompute_slots_cache_numba(numpy_data):
"""Pre-calcolo della cache degli slot validi per velocizzare le mutazioni."""
masks, lengths, target_days_arr, cons_types, cons_vals = numpy_data
num_emps = len(lengths)
cache = []
for i in range(num_emps):
emp_days = []
shift_len = lengths[i]
for d in range(7):
ctype = cons_types[i, d]
cval = cons_vals[i, d]
closing_slot = cfg.get_closing_slot(d)
is_closed = cfg.is_day_closed(d)
slots = get_valid_start_slots_numba(d, shift_len, ctype, cval, closing_slot, is_closed)
emp_days.append(slots)
cache.append(emp_days)
return cache
def run_genetic_algorithm(employees, target_demand, progress_callback=None):
print("[*] Conversione dati in tensori NumPy...")
numpy_data = convert_employees_to_numpy(employees)
if numpy_data[0] is None:
return []
print("[*] Generazione cache degli slot...")
slots_cache = precompute_slots_cache_numba(numpy_data)
pop_size = cfg.genetic_params['population_size']
generations = cfg.genetic_params['generations']
population = initialize_population_numba(pop_size, numpy_data, target_demand, cfg)
weights_arr = np.array([
cfg.weights['understaffing'],
cfg.weights['overstaffing'],
cfg.weights['homogeneity'],
cfg.weights['soft_preference'],
0.0
], dtype=np.float64)
daily_slots_scalar = int(cfg.daily_slots)
elitism_rate = cfg.genetic_params.get('elitism_rate', 0.02)
masks, lengths, target_days_arr, cons_types, cons_vals = numpy_data
best_score = float('inf')
best_coverage = None
best_overall = None
diversity_history = []
current_diversity = 0.0
print(f"[*] Avvio evoluzione: {generations} generazioni, size {len(population)}")
for gen in range(generations):
# 1. Valutazione Fitness
scores = calculate_population_fitness_parallel(
population,
masks, lengths, target_days_arr, cons_types, cons_vals,
target_demand, weights_arr, daily_slots_scalar
)
min_curr = np.min(scores)
best_idx = np.argmin(scores)
if min_curr < best_score:
best_score = min_curr
best_overall = population[best_idx].copy()
_, best_coverage = calculate_fitness_numba(
population[best_idx], masks, lengths, target_days_arr, cons_types, cons_vals,
target_demand, weights_arr, daily_slots_scalar
)
if gen % 5 == 0 or gen == generations - 1:
sample_rate = 0.20
dynamic_sample = int(len(population) * sample_rate)
# Clamp del sample size per bilanciare significatività statistica e overhead Numba
sample_size = max(20, min(dynamic_sample, 200))
sample_size = min(sample_size, len(population))
indices = np.random.choice(len(population), sample_size, replace=False)
pop_sample = population[indices]
current_diversity = calculate_diversity_snapshot(pop_sample)
diversity_history.append(current_diversity)
if progress_callback:
progress_callback(gen + 1, generations, best_score, current_diversity)
# 2. Setup Deficit per le mutazioni guidate
if best_coverage is None:
current_gap = target_demand
else:
current_gap = target_demand - best_coverage
daily_deficit = np.sum(np.maximum(current_gap, 0), axis=1)
total_deficit = np.sum(daily_deficit)
if total_deficit > 0:
daily_deficit_probs = daily_deficit / total_deficit
else:
daily_deficit_probs = np.ones(7) / 7.0
# 3. Next Gen & Elitismo
new_pop = []
n_elites = max(2, int(len(population) * elitism_rate))
elite_indices = np.argsort(scores)[:n_elites]
for idx in elite_indices:
new_pop.append(population[idx].copy())
while len(new_pop) < len(population):
p1 = tournament_selection(population, scores)
p2 = tournament_selection(population, scores)
c1, c2 = crossover(p1, p2)
new_pop.append(mutate(c1, slots_cache, daily_deficit_probs))
if len(new_pop) < len(population):
new_pop.append(mutate(c2, slots_cache, daily_deficit_probs))
population = np.array(new_pop)
print("[*] Estrazione top 5 soluzioni uniche...")
final_scores = calculate_population_fitness_parallel(
population, masks, lengths, target_days_arr, cons_types, cons_vals,
target_demand, weights_arr, daily_slots_scalar
)
sorted_indices = np.argsort(final_scores)
top_solutions = []
seen_hashes = set()
for idx in sorted_indices:
ind = population[idx]
ind_hash = ind.tobytes()
if ind_hash in seen_hashes: continue
seen_hashes.add(ind_hash)
details = calculate_detailed_score(
ind, masks, lengths, target_days_arr, cons_types, cons_vals,
target_demand, weights_arr, daily_slots_scalar
)
sol_data = {
"schedule": ind.copy(),
"total_score": details[0],
"understaffing": details[1],
"overstaffing": details[2],
"homogeneity": details[3],
"soft_preferences": details[4],
"contract": details[5],
"equity": details[6]
}
top_solutions.append(sol_data)
if len(top_solutions) >= 5: break
# Fallback in caso di mancata convergenza su soluzioni uniche
if not top_solutions and best_overall is not None:
details = calculate_detailed_score(
best_overall, masks, lengths, target_days_arr, cons_types, cons_vals,
target_demand, weights_arr, daily_slots_scalar
)
top_solutions.append({
"schedule": best_overall,
"total_score": details[0],
"understaffing": details[1],
"overstaffing": details[2],
"homogeneity": details[3],
"soft_preferences": details[4],
"contract": details[5],
"equity": details[6]
})
return top_solutions, diversity_history