GaetanoParente's picture
rimossi warnings
9acef2c
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
import os
import time
import hashlib
from contextlib import asynccontextmanager
import warnings
import logging
from src.ingestion.semantic_splitter import ActivaSemanticSplitter
from src.extraction.extractor import NeuroSymbolicExtractor
from src.validation.validator import SemanticValidator
from src.graph.graph_loader import KnowledgeGraphPersister
from src.graph.entity_resolver import EntityResolver
from pymongo import MongoClient
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
logging.getLogger("transformers").setLevel(logging.ERROR)
# --- GESTORE DEGLI STATI GLOBALI ---
# Usiamo un dizionario globale per tenere in RAM i pesi dei modelli.
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Nel mondo FastAPI il lifespan è il modo più pulito per fare il setup.
# Mi permette di caricare i modelli di embedding e l'LLM all'avvio del worker, una sola volta.
print("⏳ Inizializzazione modelli (SentenceTransformers e Llama3) nel Lifespan...")
ml_models["splitter"] = ActivaSemanticSplitter(model_name="all-MiniLM-L6-v2")
ml_models["extractor"] = NeuroSymbolicExtractor(index_path="ontology/domain_index.json")
ml_models["persister"] = KnowledgeGraphPersister()
ml_models["resolver"] = EntityResolver(neo4j_driver=ml_models["persister"].driver, similarity_threshold=0.85)
ml_models["validator"] = SemanticValidator()
print("✅ Modelli caricati e pronti a ricevere richieste!")
# Setup connessione MongoDB per i log degli scarti
mongo_ur = os.getenv("MONGO_URI")
mongo_user = os.getenv("MONGO_USER")
mongo_pass = os.getenv("MONGO_PASS")
if mongo_ur and mongo_user and mongo_pass:
try:
client = MongoClient(mongo_ur, username=mongo_user, password=mongo_pass)
# Creo il database "semantic_discovery" e la collection "rejected_triples"
ml_models["mongo_db"] = client["semantic_discovery"]["rejected_triples"]
print("✅ Connesso a MongoDB per lo storage delle allucinazioni LLM.")
except Exception as e:
print(f"⚠️ Errore connessione MongoDB: {e}")
ml_models["mongo_db"] = None
else:
print("⚠️ Credenziali MongoDB mancanti. Gli scarti non verranno tracciati.")
ml_models["mongo_db"] = None
yield # Qui l'API inizia ad ascoltare le chiamate in ingresso
# Chiusura pulita delle connessioni. Evita query appese su Neo4j quando killiamo il container.
print("🛑 Spegnimento in corso... chiusura connessioni e pulizia memoria.")
if "persister" in ml_models and ml_models["persister"]:
ml_models["persister"].close()
ml_models.clear()
app = FastAPI(
title="Automated Semantic Discovery API",
description="Endpoint per l'ingestion testuale e l'estrazione neuro-simbolica",
version="1.0",
lifespan=lifespan
)
class DiscoveryRequest(BaseModel):
documentText: str
class GraphEdge(BaseModel):
start_node_id: str
start_node_label: str
start_node_type: str
relationship_type: str
end_node_label: str
end_node_type: str
evidence: str
reasoning: str
class DiscoveryResponse(BaseModel):
status: str
message: str
execution_time_seconds: float
chunks_processed: int
triples_extracted: int
shacl_valid: bool
graph_data: list[GraphEdge]
@app.post("/api/discover", response_model=DiscoveryResponse)
def run_discovery(payload: DiscoveryRequest):
start_time = time.time()
raw_text = payload.documentText
if not raw_text or not raw_text.strip():
raise HTTPException(status_code=400, detail="Il testo fornito è vuoto.")
# Recupero le istanze
splitter = ml_models["splitter"]
extractor = ml_models["extractor"]
validator = ml_models["validator"]
resolver = ml_models["resolver"]
persister = ml_models["persister"]
# --- FASE 1: INGESTION ---
# Taglio il testo in modo semantico per non sforare la context window dell'LLM
chunks, _, _ = splitter.create_chunks(raw_text, percentile_threshold=90)
# --- FASE 2: EXTRACTION ---
# Invocazione del motore neuro-simbolico per ogni blocco di testo
all_triples = []
all_entities = []
for i, chunk in enumerate(chunks):
chunk_id = f"api_req_chunk_{i+1}"
extraction_result = extractor.extract(chunk, source_id=chunk_id)
if extraction_result:
if extraction_result.triples:
all_triples.extend(extraction_result.triples)
if hasattr(extraction_result, 'entities') and extraction_result.entities:
all_entities.extend(extraction_result.entities)
if i < len(chunks) - 1:
print(f"⏳ Pacing per Groq API: attesa 20s per non sforare i 30K TPM...")
time.sleep(20)
if not all_triples:
return {
"status": "success",
"message": "Nessuna entità trovata.",
"graph_data": []
}
# --- FASE 2.1: SYMBOLIC RESOLUTION ---
# Deduplica in RAM e linking verso Wikidata e Neo4j (Entity Resolution)
entities_to_save = []
try:
all_entities, all_triples, entities_to_save = resolver.resolve_entities(all_entities, all_triples)
except Exception as e:
print(f"⚠️ Errore nel resolver (skip): {e}")
# --- FASE 2.2: VALIDATION ---
# Prima di salvare nel DB, verifico con SHACL
# se l'LLM ha generato allucinazioni o violato i vincoli dell'ontologia.
valid_triples, invalid_triples, report = validator.filter_valid_triples(entities_to_save, all_triples)
if invalid_triples:
print(f"\n❌ [SHACL FAILED] Scartate {len(invalid_triples)} triple per violazione di Domain/Range.")
# Salvataggio asincrono degli scarti su MongoDB (DLQ)
if ml_models.get("mongo_db") is not None:
try:
# Aggiungo un timestamp per rintracciabilità
for doc in invalid_triples:
doc["timestamp"] = time.time()
ml_models["mongo_db"].insert_many(invalid_triples)
print("💾 Triple invalide archiviate su MongoDB.")
except Exception as e:
print(f"⚠️ Errore scrittura su Mongo: {e}")
if len(valid_triples) == len(all_triples) and all_triples:
print("\n✅ [SHACL SUCCESS] Tutte le triple rispettano rigorosamente l'ontologia.")
# --- FASE 3: PERSISTENCE (Neo4j) ---
try:
# Cruciale: passiamo SOLO le valid_triples al database a grafo
persister.save_entities_and_triples(entities_to_save, valid_triples)
except Exception as e:
print(f"⚠️ Errore salvataggio Neo4j: {e}")
# Preparazione payload di risposta
graph_data = []
for t in valid_triples:
# Pydantic ci garantisce che i campi esistano
subj_str = str(t.subject)
obj_str = str(t.object)
# Formattazione della relazione (es. "a-loc:isLocatedIn" -> "A_LOC_ISLOCATEDIN")
# in coerenza con la convenzione Neo4j gestita dal loader
pred_str = str(t.predicate).replace(":", "_").replace("-", "_").upper()
# Genero un ID stabile per facilitare il rendering dei nodi lato client
node_id = hashlib.md5(subj_str.encode('utf-8')).hexdigest()
graph_data.append(GraphEdge(
start_node_id=node_id,
start_node_label=subj_str,
start_node_type=str(t.subject_type),
relationship_type=pred_str,
end_node_label=obj_str,
end_node_type=str(t.object_type),
evidence=str(t.evidence),
reasoning=str(t.reasoning)
))
return DiscoveryResponse(
status="success",
message="Estrazione semantica completata",
execution_time_seconds=round(time.time() - start_time, 2),
chunks_processed=len(chunks),
triples_extracted=len(graph_data),
shacl_valid=len(invalid_triples) == 0, # True se nessuna tripla è stata scartata
graph_data=graph_data
)
if __name__ == "__main__":
uvicorn.run("api:app", host="0.0.0.0", port=5000, reload=True)