Spaces:
Sleeping
Sleeping
File size: 13,301 Bytes
9e62f55 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 | import numpy as np
from numba import njit, prange
from src.config import cfg
from src.problems.my_problem import convert_employees_to_numpy
from src.engine.selection import tournament_selection, calculate_population_fitness_parallel, calculate_fitness_numba, calculate_detailed_score, get_valid_start_slots_numba, CODE_OFF
from src.engine.crossover import crossover
from src.engine.mutation import mutate
from src.utils.health import calculate_diversity_snapshot
@njit(cache=True)
def _generate_single_random_individual(num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, closing_slots, is_closed_arr):
"""
Genera un individuo con assegnazioni di turni completamente casuali.
"""
roster = np.full((num_emps, num_days), CODE_OFF, dtype=np.int64)
for i in range(num_emps):
shift_len = lengths[i]
days_worked_indices = np.empty(7, dtype=np.int64)
dw_ptr = 0
# Assegnazione casuale degli slot validi
for d in range(num_days):
ctype = cons_types[i, d]
cval = cons_vals[i, d]
c_slot = closing_slots[d]
closed = is_closed_arr[d]
valid = get_valid_start_slots_numba(d, shift_len, ctype, cval, c_slot, closed)
if len(valid) > 0:
chosen = valid[np.random.randint(0, len(valid))]
roster[i, d] = chosen
if chosen >= 0:
days_worked_indices[dw_ptr] = d
dw_ptr += 1
# Drop dei giorni in eccesso rispetto al target contrattuale
tgt = target_days_arr[i]
while dw_ptr > tgt:
idx_in_buffer = np.random.randint(0, dw_ptr)
day_to_remove = days_worked_indices[idx_in_buffer]
roster[i, day_to_remove] = CODE_OFF
days_worked_indices[idx_in_buffer] = days_worked_indices[dw_ptr - 1]
dw_ptr -= 1
return roster
@njit(cache=True)
def _generate_single_heuristic_individual(num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals, closing_slots, is_closed_arr, target_demand, randomness):
"""
Genera un individuo usando un approccio greedy basato sulla demand residua.
"""
num_slots = target_demand.shape[1]
roster = np.full((num_emps, num_days), CODE_OFF, dtype=np.int64)
# Copia locale della demand per aggiornare i residui
residual = target_demand.copy().astype(np.float64)
emp_order = np.random.permutation(num_emps)
for i in emp_order:
shift_len = lengths[i]
tgt = target_days_arr[i]
# Calcolo dei giorni con maggior necessità operativa
daily_needs = np.zeros(num_days, dtype=np.float64)
for d in range(num_days):
s = 0.0
for k in range(num_slots):
s += residual[d, k]
daily_needs[d] = s
if randomness > 0:
noise = np.random.randn(num_days) * (np.mean(daily_needs) * randomness)
daily_needs += noise
sorted_days = np.argsort(daily_needs)
start_idx = max(0, num_days - tgt)
preferred_days = sorted_days[start_idx:]
# Assegnazione dello slot migliore sul giorno scelto
for day in preferred_days:
ctype = cons_types[i, day]
cval = cons_vals[i, day]
c_slot = closing_slots[day]
closed = is_closed_arr[day]
valid = get_valid_start_slots_numba(day, shift_len, ctype, cval, c_slot, closed)
if len(valid) == 0:
roster[i, day] = CODE_OFF
continue
# Torneo limitato a 5 candidati per ridurre l'overhead
n_cand = len(valid)
limit = 5
if n_cand <= limit:
candidates = valid
else:
candidates = np.empty(limit, dtype=np.int64)
for k in range(limit):
candidates[k] = valid[np.random.randint(0, n_cand)]
best_slot = candidates[0]
best_score = -1e9
for slot in candidates:
if slot < 0: continue
end = min(slot + shift_len, num_slots)
score = 0.0
for k in range(slot, end):
score += residual[day, k]
if score > best_score:
best_score = score
best_slot = slot
roster[i, day] = best_slot
if best_slot >= 0:
end = min(best_slot + shift_len, num_slots)
for k in range(best_slot, end):
val = residual[day, k] - 1.0
if val < 0: val = 0.0
residual[day, k] = val
return roster
@njit(parallel=True, cache=True)
def initialize_population_fast_wrapper(pop_size, heuristic_limit, numpy_data, target_demand, randomness, closing_slots, is_closed_arr):
"""
Wrapper JIT parallelo per la generazione massiva della popolazione.
"""
_, lengths, target_days_arr, cons_types, cons_vals = numpy_data
num_emps = len(lengths)
num_days = 7
population = np.zeros((pop_size, num_emps, num_days), dtype=np.int64)
for p in prange(pop_size):
if p < heuristic_limit:
population[p] = _generate_single_heuristic_individual(
num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals,
closing_slots, is_closed_arr, target_demand, randomness
)
else:
population[p] = _generate_single_random_individual(
num_emps, num_days, lengths, target_days_arr, cons_types, cons_vals,
closing_slots, is_closed_arr
)
return population
def initialize_population_numba(pop_size, numpy_data, target_demand, cfg):
heuristic_rate = cfg.genetic_params.get('heuristic_rate', 0.8)
heuristic_noise = cfg.genetic_params.get('heuristic_noise', 0.2)
num_heuristic = int(pop_size * heuristic_rate)
print(f"[*] Inizializzazione popolazione: {num_heuristic} greedy, {pop_size - num_heuristic} random")
# Passaggio array espliciti per bypassare le limitazioni di Numba sugli oggetti custom
closing_slots = np.array([cfg.get_closing_slot(d) for d in range(7)], dtype=np.int64)
is_closed_arr = np.array([1 if cfg.is_day_closed(d) else 0 for d in range(7)], dtype=np.int64)
return initialize_population_fast_wrapper(
pop_size, num_heuristic, numpy_data, target_demand, heuristic_noise, closing_slots, is_closed_arr
)
def precompute_slots_cache_numba(numpy_data):
"""Pre-calcolo della cache degli slot validi per velocizzare le mutazioni."""
masks, lengths, target_days_arr, cons_types, cons_vals = numpy_data
num_emps = len(lengths)
cache = []
for i in range(num_emps):
emp_days = []
shift_len = lengths[i]
for d in range(7):
ctype = cons_types[i, d]
cval = cons_vals[i, d]
closing_slot = cfg.get_closing_slot(d)
is_closed = cfg.is_day_closed(d)
slots = get_valid_start_slots_numba(d, shift_len, ctype, cval, closing_slot, is_closed)
emp_days.append(slots)
cache.append(emp_days)
return cache
def run_genetic_algorithm(employees, target_demand, progress_callback=None):
print("[*] Conversione dati in tensori NumPy...")
numpy_data = convert_employees_to_numpy(employees)
if numpy_data[0] is None:
return []
print("[*] Generazione cache degli slot...")
slots_cache = precompute_slots_cache_numba(numpy_data)
pop_size = cfg.genetic_params['population_size']
generations = cfg.genetic_params['generations']
population = initialize_population_numba(pop_size, numpy_data, target_demand, cfg)
weights_arr = np.array([
cfg.weights['understaffing'],
cfg.weights['overstaffing'],
cfg.weights['homogeneity'],
cfg.weights['soft_preference'],
0.0
], dtype=np.float64)
daily_slots_scalar = int(cfg.daily_slots)
elitism_rate = cfg.genetic_params.get('elitism_rate', 0.02)
masks, lengths, target_days_arr, cons_types, cons_vals = numpy_data
best_score = float('inf')
best_coverage = None
best_overall = None
diversity_history = []
current_diversity = 0.0
print(f"[*] Avvio evoluzione: {generations} generazioni, size {len(population)}")
for gen in range(generations):
# 1. Valutazione Fitness
scores = calculate_population_fitness_parallel(
population,
masks, lengths, target_days_arr, cons_types, cons_vals,
target_demand, weights_arr, daily_slots_scalar
)
min_curr = np.min(scores)
best_idx = np.argmin(scores)
if min_curr < best_score:
best_score = min_curr
best_overall = population[best_idx].copy()
_, best_coverage = calculate_fitness_numba(
population[best_idx], masks, lengths, target_days_arr, cons_types, cons_vals,
target_demand, weights_arr, daily_slots_scalar
)
if gen % 5 == 0 or gen == generations - 1:
sample_rate = 0.20
dynamic_sample = int(len(population) * sample_rate)
# Clamp del sample size per bilanciare significatività statistica e overhead Numba
sample_size = max(20, min(dynamic_sample, 200))
sample_size = min(sample_size, len(population))
indices = np.random.choice(len(population), sample_size, replace=False)
pop_sample = population[indices]
current_diversity = calculate_diversity_snapshot(pop_sample)
diversity_history.append(current_diversity)
if progress_callback:
progress_callback(gen + 1, generations, best_score, current_diversity)
# 2. Setup Deficit per le mutazioni guidate
if best_coverage is None:
current_gap = target_demand
else:
current_gap = target_demand - best_coverage
daily_deficit = np.sum(np.maximum(current_gap, 0), axis=1)
total_deficit = np.sum(daily_deficit)
if total_deficit > 0:
daily_deficit_probs = daily_deficit / total_deficit
else:
daily_deficit_probs = np.ones(7) / 7.0
# 3. Next Gen & Elitismo
new_pop = []
n_elites = max(2, int(len(population) * elitism_rate))
elite_indices = np.argsort(scores)[:n_elites]
for idx in elite_indices:
new_pop.append(population[idx].copy())
while len(new_pop) < len(population):
p1 = tournament_selection(population, scores)
p2 = tournament_selection(population, scores)
c1, c2 = crossover(p1, p2)
new_pop.append(mutate(c1, slots_cache, daily_deficit_probs))
if len(new_pop) < len(population):
new_pop.append(mutate(c2, slots_cache, daily_deficit_probs))
population = np.array(new_pop)
print("[*] Estrazione top 5 soluzioni uniche...")
final_scores = calculate_population_fitness_parallel(
population, masks, lengths, target_days_arr, cons_types, cons_vals,
target_demand, weights_arr, daily_slots_scalar
)
sorted_indices = np.argsort(final_scores)
top_solutions = []
seen_hashes = set()
for idx in sorted_indices:
ind = population[idx]
ind_hash = ind.tobytes()
if ind_hash in seen_hashes: continue
seen_hashes.add(ind_hash)
details = calculate_detailed_score(
ind, masks, lengths, target_days_arr, cons_types, cons_vals,
target_demand, weights_arr, daily_slots_scalar
)
sol_data = {
"schedule": ind.copy(),
"total_score": details[0],
"understaffing": details[1],
"overstaffing": details[2],
"homogeneity": details[3],
"soft_preferences": details[4],
"contract": details[5],
"equity": details[6]
}
top_solutions.append(sol_data)
if len(top_solutions) >= 5: break
# Fallback in caso di mancata convergenza su soluzioni uniche
if not top_solutions and best_overall is not None:
details = calculate_detailed_score(
best_overall, masks, lengths, target_days_arr, cons_types, cons_vals,
target_demand, weights_arr, daily_slots_scalar
)
top_solutions.append({
"schedule": best_overall,
"total_score": details[0],
"understaffing": details[1],
"overstaffing": details[2],
"homogeneity": details[3],
"soft_preferences": details[4],
"contract": details[5],
"equity": details[6]
})
return top_solutions, diversity_history |