from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uvicorn import os import time import hashlib from contextlib import asynccontextmanager import warnings import logging from src.ingestion.semantic_splitter import ActivaSemanticSplitter from src.extraction.extractor import NeuroSymbolicExtractor from src.validation.validator import SemanticValidator from src.graph.graph_loader import KnowledgeGraphPersister from src.graph.entity_resolver import EntityResolver from pymongo import MongoClient warnings.filterwarnings("ignore", category=FutureWarning) warnings.filterwarnings("ignore", category=DeprecationWarning) logging.getLogger("transformers").setLevel(logging.ERROR) # --- GESTORE DEGLI STATI GLOBALI --- # Usiamo un dizionario globale per tenere in RAM i pesi dei modelli. ml_models = {} @asynccontextmanager async def lifespan(app: FastAPI): # Nel mondo FastAPI il lifespan è il modo più pulito per fare il setup. # Mi permette di caricare i modelli di embedding e l'LLM all'avvio del worker, una sola volta. print("⏳ Inizializzazione modelli (SentenceTransformers e Llama3) nel Lifespan...") ml_models["splitter"] = ActivaSemanticSplitter(model_name="all-MiniLM-L6-v2") ml_models["extractor"] = NeuroSymbolicExtractor(index_path="ontology/domain_index.json") ml_models["persister"] = KnowledgeGraphPersister() ml_models["resolver"] = EntityResolver(neo4j_driver=ml_models["persister"].driver, similarity_threshold=0.85) ml_models["validator"] = SemanticValidator() print("✅ Modelli caricati e pronti a ricevere richieste!") # Setup connessione MongoDB per i log degli scarti mongo_ur = os.getenv("MONGO_URI") mongo_user = os.getenv("MONGO_USER") mongo_pass = os.getenv("MONGO_PASS") if mongo_ur and mongo_user and mongo_pass: try: client = MongoClient(mongo_ur, username=mongo_user, password=mongo_pass) # Creo il database "semantic_discovery" e la collection "rejected_triples" ml_models["mongo_db"] = client["semantic_discovery"]["rejected_triples"] print("✅ Connesso a MongoDB per lo storage delle allucinazioni LLM.") except Exception as e: print(f"⚠️ Errore connessione MongoDB: {e}") ml_models["mongo_db"] = None else: print("⚠️ Credenziali MongoDB mancanti. Gli scarti non verranno tracciati.") ml_models["mongo_db"] = None yield # Qui l'API inizia ad ascoltare le chiamate in ingresso # Chiusura pulita delle connessioni. Evita query appese su Neo4j quando killiamo il container. print("🛑 Spegnimento in corso... chiusura connessioni e pulizia memoria.") if "persister" in ml_models and ml_models["persister"]: ml_models["persister"].close() ml_models.clear() app = FastAPI( title="Automated Semantic Discovery API", description="Endpoint per l'ingestion testuale e l'estrazione neuro-simbolica", version="1.0", lifespan=lifespan ) class DiscoveryRequest(BaseModel): documentText: str class GraphEdge(BaseModel): start_node_id: str start_node_label: str start_node_type: str relationship_type: str end_node_label: str end_node_type: str evidence: str reasoning: str class DiscoveryResponse(BaseModel): status: str message: str execution_time_seconds: float chunks_processed: int triples_extracted: int shacl_valid: bool graph_data: list[GraphEdge] @app.post("/api/discover", response_model=DiscoveryResponse) def run_discovery(payload: DiscoveryRequest): start_time = time.time() raw_text = payload.documentText if not raw_text or not raw_text.strip(): raise HTTPException(status_code=400, detail="Il testo fornito è vuoto.") # Recupero le istanze splitter = ml_models["splitter"] extractor = ml_models["extractor"] validator = ml_models["validator"] resolver = ml_models["resolver"] persister = ml_models["persister"] # --- FASE 1: INGESTION --- # Taglio il testo in modo semantico per non sforare la context window dell'LLM chunks, _, _ = splitter.create_chunks(raw_text, percentile_threshold=90) # --- FASE 2: EXTRACTION --- # Invocazione del motore neuro-simbolico per ogni blocco di testo all_triples = [] all_entities = [] for i, chunk in enumerate(chunks): chunk_id = f"api_req_chunk_{i+1}" extraction_result = extractor.extract(chunk, source_id=chunk_id) if extraction_result: if extraction_result.triples: all_triples.extend(extraction_result.triples) if hasattr(extraction_result, 'entities') and extraction_result.entities: all_entities.extend(extraction_result.entities) if i < len(chunks) - 1: print(f"⏳ Pacing per Groq API: attesa 20s per non sforare i 30K TPM...") time.sleep(20) if not all_triples: return { "status": "success", "message": "Nessuna entità trovata.", "graph_data": [] } # --- FASE 2.1: SYMBOLIC RESOLUTION --- # Deduplica in RAM e linking verso Wikidata e Neo4j (Entity Resolution) entities_to_save = [] try: all_entities, all_triples, entities_to_save = resolver.resolve_entities(all_entities, all_triples) except Exception as e: print(f"⚠️ Errore nel resolver (skip): {e}") # --- FASE 2.2: VALIDATION --- # Prima di salvare nel DB, verifico con SHACL # se l'LLM ha generato allucinazioni o violato i vincoli dell'ontologia. valid_triples, invalid_triples, report = validator.filter_valid_triples(entities_to_save, all_triples) if invalid_triples: print(f"\n❌ [SHACL FAILED] Scartate {len(invalid_triples)} triple per violazione di Domain/Range.") # Salvataggio asincrono degli scarti su MongoDB (DLQ) if ml_models.get("mongo_db") is not None: try: # Aggiungo un timestamp per rintracciabilità for doc in invalid_triples: doc["timestamp"] = time.time() ml_models["mongo_db"].insert_many(invalid_triples) print("💾 Triple invalide archiviate su MongoDB.") except Exception as e: print(f"⚠️ Errore scrittura su Mongo: {e}") if len(valid_triples) == len(all_triples) and all_triples: print("\n✅ [SHACL SUCCESS] Tutte le triple rispettano rigorosamente l'ontologia.") # --- FASE 3: PERSISTENCE (Neo4j) --- try: # Cruciale: passiamo SOLO le valid_triples al database a grafo persister.save_entities_and_triples(entities_to_save, valid_triples) except Exception as e: print(f"⚠️ Errore salvataggio Neo4j: {e}") # Preparazione payload di risposta graph_data = [] for t in valid_triples: # Pydantic ci garantisce che i campi esistano subj_str = str(t.subject) obj_str = str(t.object) # Formattazione della relazione (es. "a-loc:isLocatedIn" -> "A_LOC_ISLOCATEDIN") # in coerenza con la convenzione Neo4j gestita dal loader pred_str = str(t.predicate).replace(":", "_").replace("-", "_").upper() # Genero un ID stabile per facilitare il rendering dei nodi lato client node_id = hashlib.md5(subj_str.encode('utf-8')).hexdigest() graph_data.append(GraphEdge( start_node_id=node_id, start_node_label=subj_str, start_node_type=str(t.subject_type), relationship_type=pred_str, end_node_label=obj_str, end_node_type=str(t.object_type), evidence=str(t.evidence), reasoning=str(t.reasoning) )) return DiscoveryResponse( status="success", message="Estrazione semantica completata", execution_time_seconds=round(time.time() - start_time, 2), chunks_processed=len(chunks), triples_extracted=len(graph_data), shacl_valid=len(invalid_triples) == 0, # True se nessuna tripla è stata scartata graph_data=graph_data ) if __name__ == "__main__": uvicorn.run("api:app", host="0.0.0.0", port=5000, reload=True)