Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import datetime | |
| import traceback | |
| import io | |
| from src.config import cfg | |
| from src.utils.helpers import load_employees_from_json, check_hours_balance, minutes_to_time | |
| from src.utils.visualization import get_final_coverage_matrix | |
| from src.utils.health import calculate_ibe, interpret_ibe, analyze_convergence_quality | |
| from src.utils.demand_processing import sanitize_weekly_demand | |
| from src.utils.generator import generate_scenario_files | |
| from src.utils.hf_storage import load_json, save_json, list_activities | |
| from src.problems.my_problem import process_demand | |
| from src.engine.evolution import run_genetic_algorithm | |
| # --- SETUP INTERFACCIA --- | |
| st.set_page_config(page_title="AI Workforce Scheduler", layout="wide", page_icon="🧬") | |
| # ============================================================================== | |
| # 1. ROUTING E GESTIONE WORKSPACE (SIDEBAR) | |
| # ============================================================================== | |
| st.sidebar.title("🏢 Seleziona Attività") | |
| # Recupero dinamicamente la lista dei workspace dal repository remoto (HF Datasets) | |
| available_activities = list_activities() | |
| if not available_activities: | |
| st.warning("⚠️ Nessuna attività trovata nel Dataset remoto.") | |
| st.sidebar.info("Utilizza il Generatore per istanziare un nuovo workspace.") | |
| st.stop() | |
| selected_activity = st.sidebar.selectbox("Attività da gestire:", available_activities) | |
| st.title(f"Gestione Turni: {selected_activity}") | |
| st.markdown("Pianificazione intelligente dei turni di lavoro tramite intelligenza artificiale evolutiva.") | |
| # Gestione dello stato di sessione: se l'utente cambia attività, forzo il ricaricamento | |
| # del Singleton di configurazione e pulisco la cache dei risultati precedenti. | |
| if 'current_activity' not in st.session_state or st.session_state['current_activity'] != selected_activity: | |
| try: | |
| cfg.load_configurations(selected_activity) | |
| st.session_state['current_activity'] = selected_activity | |
| if 'ga_results' in st.session_state: | |
| del st.session_state['ga_results'] | |
| except Exception as e: | |
| st.error(f"Errore caricamento config per {selected_activity}: {e}") | |
| if st.sidebar.button("♻️ Ricarica App"): | |
| st.cache_data.clear() | |
| st.rerun() | |
| # ============================================================================== | |
| # 2. VIEWPORT & TAB ROUTING | |
| # ============================================================================== | |
| tab1, tab2, tab3, tab4, tab5 = st.tabs(["⚙️ Configurazione Attività", "👥 Dipendenti", "📈 Domanda (Fabbisogno)", "🚀 Esecuzione & Risultati", "⚡ Generatore"]) | |
| # ------------------------------------------------------------------------------ | |
| # TAB 1: PARAMETRI DI SISTEMA E BUSINESS (CONFIG L2) | |
| # ------------------------------------------------------------------------------ | |
| with tab1: | |
| st.header("⚙️ Parametri Generali") | |
| config_filename = "activity_config.json" | |
| config_data = load_json(selected_activity, config_filename) | |
| if config_data: | |
| client_settings = config_data.get('client_settings', {}) | |
| curr_slot = client_settings.get('planning_slot_minutes', 30) | |
| # --- 1. SETUP GRIGLIA TEMPORALE --- | |
| st.subheader("🗓️ Calendario Operativo") | |
| st.caption("Definisci la granularità della pianificazione e gli orari di apertura del servizio.") | |
| new_slot = st.selectbox( | |
| "Granularità Pianificazione (minuti)", | |
| options=[15, 30, 60], | |
| index=[15, 30, 60].index(curr_slot) if curr_slot in [15, 30, 60] else 1, | |
| key=f"{selected_activity}_slot_select" | |
| ) | |
| with st.expander("Modifica Orari di Apertura/Chiusura Settimanali"): | |
| st.info("I turni generati dall'algoritmo non supereranno mai i limiti impostati qui. Seleziona 'Chiuso' per impedire la pianificazione in un giorno specifico.") | |
| # Costruisco i form per le regole orarie. Uso un dict temporaneo per raccogliere gli input. | |
| day_names = ["Lunedì", "Martedì", "Mercoledì", "Giovedì", "Venerdì", "Sabato", "Domenica"] | |
| user_schedule = {} | |
| current_hours = config_data.get('operating_hours', {}) | |
| default_rule = current_hours.get('default', "09:00-18:00") | |
| exceptions = current_hours.get('exceptions', {}) | |
| for i, day_name in enumerate(day_names): | |
| day_idx_str = str(i) | |
| rule = exceptions.get(day_idx_str, default_rule) | |
| is_closed_init = (rule == "CLOSED") | |
| start_init, end_init = datetime.time(8, 0), datetime.time(20, 0) | |
| if not is_closed_init: | |
| try: | |
| s_str, e_str = rule.split('-') | |
| sh, sm = map(int, s_str.split(':')) | |
| eh, em = map(int, e_str.split(':')) | |
| start_init, end_init = datetime.time(sh, sm), datetime.time(eh, em) | |
| except: pass | |
| c1, c2, c3, c4 = st.columns([1, 1, 1, 1]) | |
| c1.markdown(f"**{day_name}**") | |
| unique_key = f"{selected_activity}_day_{i}" | |
| is_closed = c2.checkbox("Chiuso", value=is_closed_init, key=f"{unique_key}_closed") | |
| start_t = c3.time_input("Apertura", value=start_init, key=f"{unique_key}_start", disabled=is_closed, step=900) | |
| end_t = c4.time_input("Chiusura", value=end_init, key=f"{unique_key}_end", disabled=is_closed, step=900) | |
| user_schedule[i] = {"closed": is_closed, "start": start_t, "end": end_t} | |
| st.divider() | |
| # --- 2. PESI DELLA LOSS FUNCTION --- | |
| st.subheader("⚖️ Obiettivi di Business (Pesi)") | |
| st.caption("Istruisci l'algoritmo su cosa è più importante. Valori più alti indicano una priorità maggiore.") | |
| weights = config_data.get('weights', {}) | |
| wk = f"{selected_activity}_weights" | |
| # Espongo i pesi per permettere il fine-tuning degli obiettivi di business direttamente da UI | |
| col_w1, col_w2 = st.columns(2) | |
| with col_w1: | |
| new_under = st.number_input("Peso Understaffing (Evita Buchi)", value=weights.get('understaffing', 1000.0), step=100.0, key=f"{wk}_under") | |
| new_over = st.number_input("Peso Overstaffing (Evita Eccessi)", value=weights.get('overstaffing', 10.0), step=5.0, key=f"{wk}_over") | |
| with col_w2: | |
| new_homo = st.number_input("Peso Equità (Bilancia i Carichi)", value=weights.get('homogeneity', 20.0), step=5.0, key=f"{wk}_homo") | |
| new_soft = st.number_input("Peso Preferenze (Accontenta gli Operatori)", value=weights.get('soft_preference', 50.0), step=5.0, key=f"{wk}_soft") | |
| # --- 3. HYPER-PARAMETRI DEL MOTORE GENETICO --- | |
| st.subheader("🧬 Motore Genetico") | |
| st.caption("Configura le prestazioni dell'intelligenza artificiale.") | |
| gen_params = config_data.get('genetic_params', {}) | |
| c_gen1, c_gen2 = st.columns(2) | |
| new_pop = c_gen1.number_input("Popolazione (Soluzioni per generazione)", value=gen_params.get('population_size', 500), step=100) | |
| new_gen = c_gen2.number_input("Generazioni (Cicli di apprendimento)", value=gen_params.get('generations', 200), step=50) | |
| # Nascondo i parametri avanzati dell'engine JIT in un expander per mantenere la UI pulita | |
| with st.expander("🔧 Parametri Avanzati Algoritmo (Fine Tuning)"): | |
| st.warning("⚠️ Modifica questi valori solo se sai cosa stai facendo. I default sono ottimizzati per scenari BPO standard. Valori errati possono bloccare l'algoritmo.") | |
| ac1, ac2, ac3 = st.columns(3) | |
| p_mut = ac1.slider("Mutation Rate", 0.0, 1.0, gen_params.get('mutation_rate', 0.4)) | |
| p_cross = ac2.slider("Crossover Rate", 0.0, 1.0, gen_params.get('crossover_rate', 0.85)) | |
| p_elite = ac3.number_input("Elitism Rate (Protezione Migliori)", 0.0, 0.5, gen_params.get('elitism_rate', 0.02), step=0.01, format="%.2f") | |
| st.markdown("---") | |
| bc1, bc2, bc3 = st.columns(3) | |
| p_tourn = bc1.number_input("Tournament Size", 2, 20, gen_params.get('tournament_size', 5)) | |
| p_heur = bc2.slider("Heuristic Init Rate (Partenza Intelligente)", 0.0, 1.0, gen_params.get('heuristic_rate', 0.8)) | |
| p_noise = bc3.number_input("Heuristic Noise (Rumore iniziale)", 0.0, 1.0, gen_params.get('heuristic_noise', 0.2), step=0.1) | |
| st.markdown("---") | |
| p_split = st.slider("Guided Mutation Split (Swap Giorno vs Cambio Orario)", 0.0, 1.0, gen_params.get('guided_mutation_split', 0.4)) | |
| st.markdown("---") | |
| st.subheader("🧪 Analisi Pressione Evolutiva (IBE)") | |
| # Calcolo live dell'IBE (il mio indicatore custom) per dare un feedback | |
| # immediato sulla bontà dei parametri genetici scelti dall'utente. | |
| curr_ibe = calculate_ibe( | |
| pop_size=int(new_pop), generations=int(new_gen), | |
| p_cross=p_cross, p_mut=p_mut, p_heur=p_heur, p_elite=p_elite | |
| ) | |
| status_msg, delta_color = interpret_ibe(curr_ibe) | |
| hc1, hc2, hc3 = st.columns([1, 1.5, 1]) | |
| hc1.metric(label="IBE Score", value=f"{curr_ibe:,.0f}".replace(",", "."), delta="Target: 1k-3k", delta_color=delta_color) | |
| if "OTTIMALE" in status_msg: hc2.success(f"**Stato:** {status_msg}") | |
| elif "STALLO" in status_msg: hc2.warning(f"**Stato:** {status_msg}") | |
| else: hc2.error(f"**Stato:** {status_msg}") | |
| with hc3.expander("Cos'è l'IBE?"): | |
| st.caption(""" | |
| **Indice di Bilanciamento Evolutivo (IBE)** | |
| È un indicatore di salute delle impostazioni che hai inserito. | |
| Misura l'equilibrio tra la capacità dell'AI di esplorare nuove soluzioni (Mutazioni) | |
| e la tendenza a sfruttare quelle già trovate (Euristiche/Elitismo). | |
| - **Troppo basso:** L'AI si "accontenta" subito della prima soluzione mediocre trovata. | |
| - **Troppo alto:** L'AI continua a cercare a caso senza mai focalizzarsi su un piano stabile. | |
| """) | |
| # --- 4. PERSISTENZA I/O --- | |
| st.divider() | |
| if st.button("💾 Calcola e Salva Configurazione", type="primary"): | |
| # Costruisco la mappa delle regole | |
| daily_rules_map = {} | |
| min_h, max_h = 24, 0 | |
| for i in range(7): | |
| d = user_schedule[i] | |
| if d['closed']: daily_rules_map[str(i)] = "CLOSED" | |
| else: | |
| s_str, e_str = d['start'].strftime("%H:%M"), d['end'].strftime("%H:%M") | |
| daily_rules_map[str(i)] = f"{s_str}-{e_str}" | |
| if d['start'].hour < min_h: min_h = d['start'].hour | |
| if d['end'].hour > max_h: max_h = d['end'].hour + (1 if d['end'].minute > 0 else 0) | |
| # Trick per ottimizzare il payload JSON: calcolo l'orario più frequente | |
| # e lo imposto come 'default', salvando gli altri giorni come 'exceptions'. | |
| from collections import Counter | |
| vals = list(daily_rules_map.values()) | |
| most_common = Counter(vals).most_common(1)[0][0] | |
| final_hours = {"default": most_common, "exceptions": {}} | |
| for k, v in daily_rules_map.items(): | |
| if v != most_common: final_hours["exceptions"][k] = v | |
| # Aggiornamento dell'oggetto configurazione | |
| if 'client_settings' not in config_data: config_data['client_settings'] = {} | |
| config_data['client_settings']['planning_slot_minutes'] = new_slot | |
| config_data['client_settings']['day_start_hour'] = int(min_h) if min_h < 24 else 8 | |
| config_data['client_settings']['day_end_hour'] = int(max_h) if max_h > 0 else 20 | |
| config_data['operating_hours'] = final_hours | |
| config_data['weights'] = {"understaffing": new_under, "overstaffing": new_over, "homogeneity": new_homo, "soft_preference": new_soft} | |
| config_data['genetic_params'] = { | |
| "population_size": int(new_pop), "generations": int(new_gen), | |
| "mutation_rate": p_mut, "crossover_rate": p_cross, "elitism_rate": p_elite, | |
| "tournament_size": int(p_tourn), "heuristic_rate": p_heur, | |
| "heuristic_noise": p_noise, "guided_mutation_split": p_split | |
| } | |
| # Push sul cloud | |
| save_json(selected_activity, config_filename, config_data) | |
| cfg.load_configurations(selected_activity) | |
| st.success("✅ Configurazione salvata e sincronizzata con successo.") | |
| else: | |
| st.error("Payload di configurazione mancante dal Dataset.") | |
| # ------------------------------------------------------------------------------ | |
| # TAB 2: ANAGRAFICA E VINCOLI (HR MASTER DATA) | |
| # ------------------------------------------------------------------------------ | |
| with tab2: | |
| emp_filename = "employees.json" | |
| emp_data = load_json(selected_activity, emp_filename) | |
| col_header, col_pie = st.columns([3, 1]) | |
| with col_header: | |
| st.header("👥 Gestione Personale") | |
| st.caption("Gestisci i profili contrattuali, le regole di fairness settimanale e inserisci ferie, permessi o vincoli di orario.") | |
| if emp_data is not None: | |
| with col_pie: | |
| # Rendering del mix contrattuale. Uso matplotlib con patch alpha=0.0 | |
| # per avere uno sfondo trasparente che si adatti al tema di Streamlit. | |
| df_stats = pd.DataFrame(emp_data) | |
| if not df_stats.empty and 'contract' in df_stats.columns: | |
| counts = df_stats['contract'].value_counts() | |
| fig_pie, ax_pie = plt.subplots(figsize=(1.5, 1.5)) | |
| colors = ['#3498db', '#e74c3c', '#f1c40f', '#9b59b6'] | |
| wedges, texts, autotexts = ax_pie.pie( | |
| counts, autopct='%1.0f%%', startangle=90, colors=colors[:len(counts)], | |
| textprops={'fontsize': 5, 'weight': 'bold', 'color': 'white'}, pctdistance=0.7 | |
| ) | |
| ax_pie.axis('equal') | |
| fig_pie.patch.set_alpha(0.0) | |
| leg = ax_pie.legend( | |
| wedges, counts.index, title="Contratti", loc="center left", | |
| bbox_to_anchor=(1, 0, 0.5, 1), fontsize=5, title_fontsize=6, frameon=False, labelcolor='white' | |
| ) | |
| leg.get_title().set_color("white") | |
| leg.get_title().set_fontweight("bold") | |
| buf_pie = io.BytesIO() | |
| fig_pie.savefig(buf_pie, format="png", bbox_inches="tight", transparent=True, dpi=150) | |
| buf_pie.seek(0) | |
| st.image(buf_pie) | |
| st.divider() | |
| col_list, col_editor = st.columns([1, 2]) | |
| with col_list: | |
| st.subheader("Anagrafica Risorse") | |
| emp_ids = [e['id'] for e in emp_data] | |
| selected_id = st.selectbox("Seleziona il Dipendente da ispezionare:", emp_ids, index=0 if emp_ids else None) | |
| st.divider() | |
| if st.button("💾 Salva Modifiche Anagrafica", type="primary"): | |
| save_json(selected_activity, emp_filename, emp_data) | |
| st.success("Anagrafica aggiornata in Cloud.") | |
| # Costruisco la preview tabellare | |
| summary = [] | |
| for e in emp_data: | |
| mix = e.get('shift_mix', {"WORK": 5, "OFF": 2}) | |
| summary.append({"ID": e['id'], "Contratto": e['contract'], "Mix": f"{mix.get('WORK',5)}W/{mix.get('OFF',2)}O"}) | |
| st.dataframe(summary, hide_index=True, width='stretch') | |
| with col_editor: | |
| # Editor del singolo dipendente: permette di iniettare override | |
| # hard/soft/absence direttamente sull'oggetto prima del salvataggio. | |
| if selected_id: | |
| emp_record = next((e for e in emp_data if e['id'] == selected_id), None) | |
| if emp_record: | |
| st.subheader(f"✏️ Proprietà: {emp_record['id']}") | |
| c1, c2 = st.columns(2) | |
| emp_record['id'] = c1.text_input("ID Dipendente", value=emp_record['id']) | |
| ct = emp_record.get('contract', 'FT40') | |
| ct_idx = ["FT40", "PT30", "PT20"].index(ct) if ct in ["FT40", "PT30", "PT20"] else 0 | |
| emp_record['contract'] = c2.selectbox("Tipologia Contratto", ["FT40", "PT30", "PT20"], index=ct_idx) | |
| cc1, cc2 = st.columns(2) | |
| emp_record['work_hours'] = cc1.number_input("Ore Lavorative Giornaliere", value=float(emp_record.get('work_hours', 8.0))) | |
| emp_record['break_duration'] = cc2.number_input("Minuti di Pausa/Pranzo", value=int(emp_record.get('break_duration', 0))) | |
| st.markdown("---") | |
| st.subheader("📅 Regole di Fairness Settimanale") | |
| st.caption("Imposta quanti giorni questa risorsa deve lavorare rispetto a quanti giorni deve riposare nella settimana.") | |
| curr_mix = emp_record.get('shift_mix', {"WORK": 5, "OFF": 2}) | |
| cm1, cm2 = st.columns(2) | |
| w_days = cm1.number_input("Target Giorni Lavorativi", min_value=1, max_value=7, value=int(curr_mix.get("WORK", 5))) | |
| o_days = cm2.number_input("Target Giorni di Riposo", min_value=0, max_value=6, value=int(curr_mix.get("OFF", 2))) | |
| emp_record['shift_mix'] = {"WORK": w_days, "OFF": o_days} | |
| st.info(f"L'algoritmo cercherà in tutti i modi di programmare esattamente {w_days} giorni di lavoro e {o_days} di riposo. Le violazioni verranno penalizzate nel calcolo finale.") | |
| st.markdown("---") | |
| st.subheader("🔒 Vincoli Operativi (Assenze e Permessi)") | |
| st.caption("Aggiungi ferie, malattie o turni fissi inamovibili.") | |
| constraints = emp_record.get('constraints', {}) | |
| day_map = {0: "Lunedì", 1: "Martedì", 2: "Mercoledì", 3: "Giovedì", 4: "Venerdì", 5: "Sabato", 6: "Domenica"} | |
| if constraints: | |
| cons_view = [] | |
| for d, r in constraints.items(): | |
| cons_view.append({"Giorno": day_map.get(int(d), d), "Tipo": r['type'].upper(), "Valore/Motivo": r.get('start_time', r.get('reason',''))}) | |
| st.table(pd.DataFrame(cons_view)) | |
| to_del = st.selectbox("Seleziona Giorno da sbloccare", options=list(constraints.keys()), format_func=lambda x: day_map.get(int(x), x)) | |
| if st.button("🗑️ Rimuovi Vincolo"): | |
| del emp_record['constraints'][to_del] | |
| st.rerun() | |
| with st.expander("➕ Aggiungi una nuova regola per questo dipendente"): | |
| ac1, ac2 = st.columns(2) | |
| add_d = ac1.selectbox("Giorno della settimana", range(7), format_func=lambda x: day_map[x]) | |
| add_t = ac2.selectbox("Tipologia di regola", ["absence", "hard", "soft"], format_func=lambda x: "Assenza" if x=="absence" else ("Turno Obbligato (Hard)" if x=="hard" else "Preferenza Oraria (Soft)")) | |
| new_rule = {"type": add_t} | |
| if add_t == "absence": | |
| new_rule["reason"] = st.selectbox("Motivo Assenza", ["FERIE", "MALATTIA", "PERMESSO"]) | |
| else: | |
| t_val = st.time_input("Orario di inzio desiderato").strftime("%H:%M") | |
| new_rule["start_time"] = t_val | |
| if st.button("Conferma Inserimento"): | |
| if 'constraints' not in emp_record: emp_record['constraints'] = {} | |
| emp_record['constraints'][str(add_d)] = new_rule | |
| st.rerun() | |
| else: | |
| st.error("Payload anagrafico mancante o corrotto.") | |
| # ------------------------------------------------------------------------------ | |
| # TAB 3: DEMAND TIME-SERIES (FABBISOGNO) | |
| # ------------------------------------------------------------------------------ | |
| with tab3: | |
| st.header("📈 Time-Series Fabbisogno Operativo") | |
| st.caption("Visualizza e modifica la curva di traffico o il numero di operatori richiesti per ogni frazione oraria.") | |
| demand_filename = "demand.json" | |
| conf = load_json(selected_activity, "activity_config.json") | |
| raw_demand = load_json(selected_activity, demand_filename) | |
| if conf: | |
| sett = conf.get('client_settings', {}) | |
| start_h = sett.get('day_start_hour', 8) | |
| end_h = sett.get('day_end_hour', 20) | |
| slot_min = sett.get('planning_slot_minutes', 30) | |
| current_conf_for_alignment = {'client_settings': {'day_start_hour': start_h, 'day_end_hour': end_h, 'planning_slot_minutes': slot_min}} | |
| total_min = (end_h - start_h) * 60 | |
| num_slots = int(total_min / slot_min) | |
| # Allineamento dinamico della time-series: gestisco i cambi di granularità oraria | |
| # troncando o paddando la matrice tramite l'helper apposito. | |
| if raw_demand: | |
| sanitized_list = sanitize_weekly_demand(raw_demand, current_conf_for_alignment) | |
| target_demand = np.array(sanitized_list) | |
| else: | |
| target_demand = np.ones((7, num_slots), dtype=int) * 5 | |
| days = ["Lun", "Mar", "Mer", "Gio", "Ven", "Sab", "Dom"] | |
| time_labels = [] | |
| curr_m = start_h * 60 | |
| end_m = end_h * 60 | |
| while curr_m < end_m: | |
| h = int(curr_m // 60) | |
| m = int(curr_m % 60) | |
| time_labels.append(f"{h:02d}:{m:02d}") | |
| curr_m += slot_min | |
| x = np.arange(len(time_labels)) | |
| day_idx = st.selectbox("Ispeziona Giorno:", range(7), format_func=lambda x: days[x]) | |
| # Rendering vettoriale del profilo di carico | |
| fig, ax = plt.subplots(figsize=(10, 3)) | |
| daily_curve = target_demand[day_idx] | |
| ax.plot(x, daily_curve, color='#e74c3c', linestyle='--', marker='o', markersize=3, label="Target Staff (Richiesto)") | |
| ax.fill_between(x, 0, daily_curve, color='#e74c3c', alpha=0.1) | |
| step_x = max(1, len(x) // 15) | |
| ax.set_xticks(x[::step_x]) | |
| ax.set_xticklabels(time_labels[::step_x], rotation=45, fontsize=8) | |
| ax.set_title(f"Profilo di Carico: {days[day_idx]}") | |
| ax.grid(True, linestyle='--', alpha=0.3) | |
| ax.legend() | |
| buf_demand = io.BytesIO() | |
| fig.savefig(buf_demand, format="png", bbox_inches="tight", transparent=False, dpi=150) | |
| buf_demand.seek(0) | |
| st.image(buf_demand) | |
| plt.close(fig) | |
| st.divider() | |
| # Uso il data_editor nativo di Streamlit per permettere l'override manuale | |
| # della demand curva direttamente in UI, molto comodo per i planner. | |
| st.subheader("✏️ Override Manuale (Modifica Volumi)") | |
| st.caption("Fai doppio clic su una cella della tabella per alterare manualmente il numero di operatori richiesti.") | |
| df_demand = pd.DataFrame(target_demand, index=days, columns=time_labels) | |
| edited_df = st.data_editor(df_demand, width='stretch', height=300) | |
| if st.button("💾 Salva Modifiche Curva"): | |
| final_json_structure = [] | |
| for i, row in enumerate(edited_df.values): | |
| row_list = [f"Giorno_{i}"] + row.tolist() | |
| final_json_structure.append(row_list) | |
| save_json(selected_activity, "demand.json", final_json_structure) | |
| st.success("✅ Fabbisogno aggiornato e sincronizzato.") | |
| st.rerun() | |
| else: | |
| st.warning("Impossibile effettuare il render: payload mancante.") | |
| # ------------------------------------------------------------------------------ | |
| # TAB 4: MOTORE DI OTTIMIZZAZIONE | |
| # ------------------------------------------------------------------------------ | |
| with tab4: | |
| st.header("🚀 Motore di Ottimizzazione") | |
| st.caption("Avvia l'AI per calcolare l'incastro dei turni migliore in base ai parametri che hai inserito.") | |
| col_run, col_stat = st.columns([1, 2]) | |
| with col_run: | |
| run_btn = st.button("✨ AVVIA IL CALCOLO DEI TURNI", type="primary") | |
| if run_btn: | |
| prog_bar = st.progress(0) | |
| status_text = st.empty() | |
| # Callback passata all'engine genetico per aggiornare l'interfaccia | |
| # asincronamente durante i pesanti cicli for loop su Numba. | |
| def ui_callback(gen, tot, score, div): | |
| prog_bar.progress(gen / tot) | |
| status_text.markdown(f"🧬 Elaborazione in corso... Generazione: **{gen}/{tot}** | Punteggio Penalità: **{score:.0f}** | Esplorazione: **{div:.2f}%**" ) | |
| with st.spinner("Compilazione codice macchina (JIT) e calcolo in corso. Potrebbe volerci qualche minuto..."): | |
| try: | |
| current_act = st.session_state.get('current_activity') | |
| if not current_act: raise ValueError("Nessun contesto operativo attivo.") | |
| employees = load_employees_from_json(current_act) | |
| raw_d = load_json(selected_activity, "demand.json") | |
| target = process_demand(raw_d) | |
| # Applichiamo le regole di business (le chiusure impostate in L2 config) | |
| # azzerando forzatamente la domanda oraria per non far schedulare turni. | |
| for d in range(7): | |
| closing_slot = cfg.get_closing_slot(d) | |
| if cfg.is_day_closed(d) or closing_slot == 0: | |
| target[d, :] = 0 | |
| else: | |
| if closing_slot < target.shape[1]: | |
| target[d, closing_slot:] = 0 | |
| hours_diff = check_hours_balance(employees, target) | |
| if hours_diff >= 0: | |
| st.success(f"✅ Controllo Preliminare Superato: Lo staff disponibile copre matematicamente le ore richieste (Surplus: {hours_diff:.1f}h).") | |
| else: | |
| st.error(f"⚠️ Attenzione - Sotto-dimensionamento Strutturale: Hai chiesto più ore di quelle contrattualizzate. Verranno generati dei buchi inevitabili (Deficit: {abs(hours_diff):.1f}h).") | |
| # Esecuzione del kernel genetico core | |
| top_solutions, div_history = run_genetic_algorithm(employees, target, progress_callback=ui_callback) | |
| final_pop_sample = np.array([sol['schedule'] for sol in top_solutions]) | |
| final_diversity = div_history[-1] if div_history else 0.0 | |
| # Caching dell'output generato nell'oggetto di sessione. | |
| # Evita di perdere i risultati (o triggerare ricalcoli) se cambio tab. | |
| st.session_state['ga_results'] = { | |
| 'top_solutions': top_solutions, | |
| 'diversity_score': final_diversity, | |
| 'diversity_history': div_history, | |
| 'selected_idx': 0, | |
| 'employees': employees, | |
| 'target': target | |
| } | |
| best_s = top_solutions[0]['total_score'] | |
| status_text.success(f"Ottimizzazione conclusa con successo! Miglior punteggio di penalità raggiunto: {best_s:.0f}") | |
| except Exception as e: | |
| st.error(f"Errore di sistema durante il calcolo: {e}") | |
| traceback.print_exc() | |
| # Blocco di visualizzazione post-run | |
| if 'ga_results' in st.session_state: | |
| res = st.session_state['ga_results'] | |
| solutions = res['top_solutions'] | |
| div_hist = res.get('diversity_history', [res.get('diversity_score', 0.0)]) | |
| final_div = div_hist[-1] | |
| # Diagnostica algoritmica automatizzata (Controlla il drop-rate della diversità) | |
| msg, color_code = analyze_convergence_quality(div_hist) | |
| st.markdown("### 🩺 Diagnostica e Validazione Scientifica") | |
| kpi1, kpi2 = st.columns([1, 3]) | |
| kpi1.metric("Diversità Genetica Finale", f"{final_div:.2f}%") | |
| if color_code == "success": kpi2.success(msg) | |
| elif color_code == "normal": kpi2.info(msg) | |
| else: kpi2.error(msg) | |
| with st.expander("Cos'è la Diversità e come interpretarla?"): | |
| st.caption(""" | |
| La **Diversità** indica quante soluzioni "diverse" l'algoritmo stava ancora testando alla fine del processo. | |
| - **Se è >40%:** L'AI non è riuscita a trovare un pattern vincente e ha continuato a sparare a caso (aumenta le Generazioni o diminuisci la Mutazione). | |
| - **Se crolla subito a <5% (Convergenza Prematura):** L'AI si è "incastrata" su una soluzione mediocre e ha smesso di cercare (aumenta la Mutazione). | |
| - **Se scende gradualmente (Matura):** È lo stato ideale. L'AI ha esplorato bene e poi ha "stretto" verso la soluzione perfetta. | |
| """) | |
| if div_hist: | |
| st.subheader("📉 Profilo Dinamico dell'Apprendimento") | |
| fig_div, ax_div = plt.subplots(figsize=(10, 3)) | |
| x_axis = [i * 5 for i in range(len(div_hist))] | |
| ax_div.plot(x_axis, div_hist, color='#2980b9', linewidth=2, label='Varianza di Popolazione (%)') | |
| ax_div.axhspan(0, 5, color='#e74c3c', alpha=0.1, label='Rischio Collasso (<5%)') | |
| ax_div.axhspan(40, 100, color='#e67e22', alpha=0.1, label='Rischio Divergenza (>40%)') | |
| ax_div.axhspan(5, 40, color='#2ecc71', alpha=0.1, label='Fascia Ottimale') | |
| ax_div.set_ylabel("Hamming Dist (%)") | |
| ax_div.set_xlabel("Epoche di Addestramento") | |
| ax_div.set_ylim(0, max(50, max(div_hist) + 5)) | |
| ax_div.grid(True, linestyle='--', alpha=0.5) | |
| ax_div.legend(loc='upper right', fontsize='small') | |
| buf_div = io.BytesIO() | |
| fig_div.savefig(buf_div, format="png", bbox_inches="tight", transparent=False, dpi=150) | |
| buf_div.seek(0) | |
| st.image(buf_div) | |
| plt.close(fig_div) | |
| with st.expander("Come leggere questo grafico?"): | |
| st.caption(""" | |
| Questo grafico racconta visivamente il lavoro dell'algoritmo: | |
| 1. **Fase Iniziale (Esplorazione):** Il grafico deve partire alto (fuori dal rosso basso). L'AI sta provando incastri creativi. | |
| 2. **Discesa (Sfruttamento):** La curva deve scendere dolcemente verso il basso. | |
| 3. **Atterraggio:** La curva dovrebbe stabilizzarsi nella **Fascia Verde Ottimale**. Se vedi crolli verticali improvvisi all'inizio, c'è un problema di configurazione nei parametri genetici. | |
| """) | |
| st.divider() | |
| st.subheader("🏆 Esplorazione delle Migliori Soluzioni Trovate") | |
| st.caption("L'algoritmo ti propone le varianti più performanti. Lo SCORE TOTALE è la somma delle penalità (più è basso, meglio è).") | |
| comp_data = [] | |
| for i, s in enumerate(solutions): | |
| comp_data.append({ | |
| "Candidato": f"Soluzione #{i+1}", | |
| "SCORE TOTALE (Penalità)": int(s['total_score']), | |
| "❌ Understaffing (Buchi)": int(s['understaffing']), | |
| "⚖️ Inequità Weekend": int(s['equity']), | |
| "⚡ Overstaffing (Eccessi)": int(s['overstaffing']), | |
| "🎨 Pref. Ignorate": int(s['soft_preferences']), | |
| "📝 Mix Contratti Violato": int(s['contract']) | |
| }) | |
| df_comp = pd.DataFrame(comp_data) | |
| st.dataframe( | |
| df_comp.style.background_gradient(cmap="RdYlGn_r", subset=["SCORE TOTALE (Penalità)", "❌ Understaffing (Buchi)"]), | |
| width='stretch', | |
| hide_index=True | |
| ) | |
| sel_opt = st.radio("Seleziona quale piano turni visualizzare in dettaglio:", | |
| options=range(len(solutions)), | |
| format_func=lambda x: f"Apri Dettaglio Soluzione #{x+1}", | |
| horizontal=True, | |
| index=st.session_state.get('selected_idx', 0)) | |
| st.session_state['ga_results']['selected_idx'] = sel_opt | |
| chosen_sol = solutions[sel_opt] | |
| best_sched = chosen_sol['schedule'] | |
| emps = res['employees'] | |
| tgt = res['target'] | |
| st.subheader(f"Dashboard Copertura: Soluzione #{sel_opt+1}") | |
| st.caption("Confronto visivo tra le persone richieste (linea rossa tratteggiata) e le persone messe a turno (area blu).") | |
| d_tabs = st.tabs(["Lunedì", "Martedì", "Mercoledì", "Giovedì", "Venerdì", "Sabato", "Domenica"]) | |
| # Proietto il genoma elaborato (best_sched) sulla matrice di copertura per le charts | |
| cov_mat = get_final_coverage_matrix(best_sched, emps) | |
| for i, t in enumerate(d_tabs): | |
| with t: | |
| fig, ax = plt.subplots(figsize=(8, 2)) | |
| x = range(cfg.daily_slots) | |
| ax.fill_between(x, cov_mat[i], alpha=0.3) | |
| ax.plot(x, cov_mat[i], label="Staff Schedulato") | |
| ax.plot(x, tgt[i], 'r--', label="Staff Richiesto (Target)") | |
| tick_step = 4 | |
| lbls = [minutes_to_time(k * cfg.system_slot_minutes) for k in x[::tick_step]] | |
| ax.set_xticks(list(x)[::tick_step]) | |
| ax.set_xticklabels(lbls, rotation=0, fontsize=4) | |
| ax.legend() | |
| buf_day = io.BytesIO() | |
| fig.savefig(buf_day, format="png", bbox_inches="tight", transparent=False, dpi=150) | |
| buf_day.seek(0) | |
| st.image(buf_day) | |
| plt.close(fig) | |
| st.subheader("Tabellone Turni (Export)") | |
| data_rows = [] | |
| days = ["Lun", "Mar", "Mer", "Gio", "Ven", "Sab", "Dom"] | |
| for idx, e in enumerate(emps): | |
| row = {"Dipendente": e['id']} | |
| for d in range(7): | |
| s = best_sched[idx, d] | |
| if s == -1: txt = "OFF" | |
| elif s == -2: txt = "ABS" | |
| else: | |
| start = s * cfg.system_slot_minutes | |
| end = start + (e['shift_len'] * cfg.system_slot_minutes) | |
| txt = f"{minutes_to_time(start)}-{minutes_to_time(end)}" | |
| row[days[d]] = txt | |
| data_rows.append(row) | |
| st.dataframe(pd.DataFrame(data_rows), width='stretch') | |
| st.markdown("---") | |
| st.subheader("🔬 Ispezione Micro-Turno (Maschere VDT e Pause)") | |
| st.caption("Verifica la corretta allocazione delle pause VDT all'interno dello spezzato del singolo operatore.") | |
| # Micro-rendering della maschera binaria per l'ispezione visiva dei sub-slot | |
| c1, c2 = st.columns(2) | |
| sel_emp = c1.selectbox("Seleziona Operatore", [e['id'] for e in emps]) | |
| sel_day = c2.selectbox("Seleziona Giorno", range(7), format_func=lambda x: days[x]) | |
| e_idx = next(i for i,e in enumerate(emps) if e['id'] == sel_emp) | |
| s_start = best_sched[e_idx, sel_day] | |
| if s_start >= 0: | |
| mask = emps[e_idx]['mask'] | |
| html = "" | |
| for k, bit in enumerate(mask): | |
| t_str = minutes_to_time((s_start + k) * cfg.system_slot_minutes) | |
| col = "#4CAF50" if bit else "#FF5252" | |
| html += f"<div style='display:inline-block;width:35px;background:{col};color:white;font-size:10px;text-align:center;margin:1px;'>{t_str}</div>" | |
| st.markdown(html, unsafe_allow_html=True) | |
| st.caption("**Legenda:** [Verde] = Operatività a Terminale | [Rosso] = Pausa/Pranzo") | |
| else: | |
| st.info("Status per il giorno selezionato: RIPOSO o ASSENTE.") | |
| # ------------------------------------------------------------------------------ | |
| # TAB 5: BOOTSTRAPPING DI MOCK SCENARIOS (GENERATORE) | |
| # ------------------------------------------------------------------------------ | |
| with tab5: | |
| st.header("⚡ Generatore Ambienti di Test (Mock Scenarios)") | |
| st.markdown(""" | |
| Crea rapidamente nuovi scenari completi per testare come il motore AI reagisce a diverse | |
| composizioni della forza lavoro (es. alta rigidità vs alta flessibilità). | |
| """) | |
| col_gen_L, col_gen_R = st.columns([1, 2]) | |
| with col_gen_L: | |
| st.subheader("1. Setup Spazio Dati") | |
| new_scenario_name = st.text_input("Nome del nuovo Scenario", value="Nuovo_Test_BPO") | |
| new_emp_count = st.number_input("Numero Dipendenti Fittizi", min_value=1, max_value=2000, value=300, step=10) | |
| st.markdown("---") | |
| st.subheader("📊 Modello Matematico del Fabbisogno") | |
| st.caption("Scegli una distribuzione che simuli fedelmente il traffico del servizio.") | |
| curve_options = { | |
| "Doppia Campana (Tipico BPO Voice)": "double_bell", | |
| "Campana Centrale (Es. Delivery/Pausa Pranzo)": "single_bell_center", | |
| "Picco Mattutino (Es. Helpdesk IT)": "morning_peak", | |
| "Piatto Costante (Es. Backoffice/Data Entry)": "steady_high" | |
| } | |
| selected_curve_label = st.selectbox("Seleziona Modello di Carico:", options=list(curve_options.keys()), index=0) | |
| curve_key = curve_options[selected_curve_label] | |
| st.caption("Anteprima Forma:") | |
| # Anteprima visiva matematica della distribuzione scelta | |
| preview_x = np.linspace(8, 22, 50) | |
| if curve_key == "double_bell": | |
| preview_y = np.exp(-((preview_x - 11)**2)/4) + np.exp(-((preview_x - 16)**2)/4) | |
| elif curve_key == "single_bell_center": | |
| preview_y = np.exp(-((preview_x - 13)**2)/9) | |
| elif curve_key == "morning_peak": | |
| preview_y = np.exp(-((preview_x - 9.5)**2)/5) | |
| else: | |
| preview_y = np.ones_like(preview_x) * 0.8 | |
| fig_curve_prev, ax_cp = plt.subplots(figsize=(4, 1.5)) | |
| ax_cp.plot(preview_x, preview_y, color='#2ecc71', lw=2) | |
| ax_cp.fill_between(preview_x, preview_y, color='#2ecc71', alpha=0.2) | |
| ax_cp.set_yticks([]) | |
| ax_cp.set_xticks([8, 12, 16, 20]) | |
| ax_cp.set_xlim(8, 22) | |
| fig_curve_prev.patch.set_alpha(0.0) | |
| ax_cp.patch.set_alpha(0.0) | |
| buf_curve = io.BytesIO() | |
| fig_curve_prev.savefig(buf_curve, format="png", bbox_inches="tight", transparent=False, dpi=150) | |
| buf_curve.seek(0) | |
| st.image(buf_curve) | |
| plt.close(fig_curve_prev) | |
| with col_gen_R: | |
| st.subheader("2. Strategia HR (Mix Contrattuale)") | |
| st.caption("Simula il livello di flessibilità del personale.") | |
| pct_ft40 = st.slider("🔵 Full Time (8h - Alta rigidità)", 0, 100, 60) | |
| pct_pt30 = st.slider("🟡 Part Time (6h - Media flessibilità)", 0, 100, 20) | |
| remaining = max(0, 100 - (pct_ft40 + pct_pt30)) | |
| pct_pt20 = st.slider("🔴 Part Time (4h - Alta flessibilità)", 0, 100, remaining) | |
| total_mix = pct_ft40 + pct_pt30 + pct_pt20 | |
| if total_mix != 100: | |
| st.warning(f"⚠️ La somma deve essere 100%. Attuale: {total_mix}%.") | |
| else: | |
| st.success("✅ Composizione valida.") | |
| fig_preview, ax_prev = plt.subplots(figsize=(3, 1.5)) | |
| data_prev, labels_prev, colors_prev = [pct_ft40, pct_pt30, pct_pt20], ['FT 8h', 'PT 6h', 'PT 4h'], ['#3498db', '#f1c40f', '#e74c3c'] | |
| d_clean, l_clean, c_clean = zip(*[(d, l, c) for d, l, c in zip(data_prev, labels_prev, colors_prev) if d > 0]) | |
| wedges, texts, autotexts = ax_prev.pie(d_clean, labels=None, colors=c_clean, autopct='%1.0f%%', textprops={'color':"white", 'fontsize': 8, 'weight': 'bold'}, pctdistance=0.5) | |
| ax_prev.axis('equal') | |
| fig_preview.patch.set_alpha(0.0) | |
| leg = ax_prev.legend(wedges, l_clean, loc="center left", bbox_to_anchor=(1, 0, 0.5, 1), frameon=False, labelcolor='white', fontsize=7) | |
| buf = io.BytesIO() | |
| fig_preview.savefig(buf, format="png", bbox_inches="tight", transparent=True, dpi=150) | |
| buf.seek(0) | |
| st.image(buf) | |
| st.divider() | |
| btn_col, _ = st.columns([1, 3]) | |
| if btn_col.button("🚀 Inizializza Scenario su HF", type="primary", disabled=(total_mix != 100)): | |
| if not new_scenario_name.strip(): | |
| st.error("Nome scenario non valido.") | |
| else: | |
| with st.spinner("Creazione dati fittizi e upload in corso..."): | |
| mix_dict = {'FT40': pct_ft40, 'PT30': pct_pt30, 'PT20': pct_pt20} | |
| success, msg = generate_scenario_files(new_scenario_name, new_emp_count, mix_dict, curve_key) | |
| if success: | |
| st.success(f"{msg}") | |
| st.info("🔄 Clicca su 'Ricarica App' nella barra laterale sinistra per gestire questo nuovo scenario.") | |
| else: | |
| st.error(f"Errore di sistema: {msg}") |