File size: 8,351 Bytes
cc3f780 c1b1880 9acef2c cc3f780 b70d82f cc3f780 9acef2c c1b1880 b70d82f c1b1880 b70d82f c1b1880 cc3f780 c1b1880 cc3f780 b70d82f cc3f780 c1b1880 cc3f780 c1b1880 cc3f780 c1b1880 cc3f780 9fb3deb cc3f780 c1b1880 cc3f780 c1b1880 cc3f780 c1b1880 b70d82f cc3f780 b70d82f cc3f780 c1b1880 cc3f780 b70d82f cc3f780 b70d82f cc3f780 c1b1880 cc3f780 b70d82f cc3f780 c1b1880 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
import os
import time
import hashlib
from contextlib import asynccontextmanager
import warnings
import logging
from src.ingestion.semantic_splitter import ActivaSemanticSplitter
from src.extraction.extractor import NeuroSymbolicExtractor
from src.validation.validator import SemanticValidator
from src.graph.graph_loader import KnowledgeGraphPersister
from src.graph.entity_resolver import EntityResolver
from pymongo import MongoClient
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
logging.getLogger("transformers").setLevel(logging.ERROR)
# --- GESTORE DEGLI STATI GLOBALI ---
# Usiamo un dizionario globale per tenere in RAM i pesi dei modelli.
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Nel mondo FastAPI il lifespan è il modo più pulito per fare il setup.
# Mi permette di caricare i modelli di embedding e l'LLM all'avvio del worker, una sola volta.
print("⏳ Inizializzazione modelli (SentenceTransformers e Llama3) nel Lifespan...")
ml_models["splitter"] = ActivaSemanticSplitter(model_name="all-MiniLM-L6-v2")
ml_models["extractor"] = NeuroSymbolicExtractor(index_path="ontology/domain_index.json")
ml_models["persister"] = KnowledgeGraphPersister()
ml_models["resolver"] = EntityResolver(neo4j_driver=ml_models["persister"].driver, similarity_threshold=0.85)
ml_models["validator"] = SemanticValidator()
print("✅ Modelli caricati e pronti a ricevere richieste!")
# Setup connessione MongoDB per i log degli scarti
mongo_ur = os.getenv("MONGO_URI")
mongo_user = os.getenv("MONGO_USER")
mongo_pass = os.getenv("MONGO_PASS")
if mongo_ur and mongo_user and mongo_pass:
try:
client = MongoClient(mongo_ur, username=mongo_user, password=mongo_pass)
# Creo il database "semantic_discovery" e la collection "rejected_triples"
ml_models["mongo_db"] = client["semantic_discovery"]["rejected_triples"]
print("✅ Connesso a MongoDB per lo storage delle allucinazioni LLM.")
except Exception as e:
print(f"⚠️ Errore connessione MongoDB: {e}")
ml_models["mongo_db"] = None
else:
print("⚠️ Credenziali MongoDB mancanti. Gli scarti non verranno tracciati.")
ml_models["mongo_db"] = None
yield # Qui l'API inizia ad ascoltare le chiamate in ingresso
# Chiusura pulita delle connessioni. Evita query appese su Neo4j quando killiamo il container.
print("🛑 Spegnimento in corso... chiusura connessioni e pulizia memoria.")
if "persister" in ml_models and ml_models["persister"]:
ml_models["persister"].close()
ml_models.clear()
app = FastAPI(
title="Automated Semantic Discovery API",
description="Endpoint per l'ingestion testuale e l'estrazione neuro-simbolica",
version="1.0",
lifespan=lifespan
)
class DiscoveryRequest(BaseModel):
documentText: str
class GraphEdge(BaseModel):
start_node_id: str
start_node_label: str
start_node_type: str
relationship_type: str
end_node_label: str
end_node_type: str
evidence: str
reasoning: str
class DiscoveryResponse(BaseModel):
status: str
message: str
execution_time_seconds: float
chunks_processed: int
triples_extracted: int
shacl_valid: bool
graph_data: list[GraphEdge]
@app.post("/api/discover", response_model=DiscoveryResponse)
def run_discovery(payload: DiscoveryRequest):
start_time = time.time()
raw_text = payload.documentText
if not raw_text or not raw_text.strip():
raise HTTPException(status_code=400, detail="Il testo fornito è vuoto.")
# Recupero le istanze
splitter = ml_models["splitter"]
extractor = ml_models["extractor"]
validator = ml_models["validator"]
resolver = ml_models["resolver"]
persister = ml_models["persister"]
# --- FASE 1: INGESTION ---
# Taglio il testo in modo semantico per non sforare la context window dell'LLM
chunks, _, _ = splitter.create_chunks(raw_text, percentile_threshold=90)
# --- FASE 2: EXTRACTION ---
# Invocazione del motore neuro-simbolico per ogni blocco di testo
all_triples = []
all_entities = []
for i, chunk in enumerate(chunks):
chunk_id = f"api_req_chunk_{i+1}"
extraction_result = extractor.extract(chunk, source_id=chunk_id)
if extraction_result:
if extraction_result.triples:
all_triples.extend(extraction_result.triples)
if hasattr(extraction_result, 'entities') and extraction_result.entities:
all_entities.extend(extraction_result.entities)
if i < len(chunks) - 1:
print(f"⏳ Pacing per Groq API: attesa 20s per non sforare i 30K TPM...")
time.sleep(20)
if not all_triples:
return {
"status": "success",
"message": "Nessuna entità trovata.",
"graph_data": []
}
# --- FASE 2.1: SYMBOLIC RESOLUTION ---
# Deduplica in RAM e linking verso Wikidata e Neo4j (Entity Resolution)
entities_to_save = []
try:
all_entities, all_triples, entities_to_save = resolver.resolve_entities(all_entities, all_triples)
except Exception as e:
print(f"⚠️ Errore nel resolver (skip): {e}")
# --- FASE 2.2: VALIDATION ---
# Prima di salvare nel DB, verifico con SHACL
# se l'LLM ha generato allucinazioni o violato i vincoli dell'ontologia.
valid_triples, invalid_triples, report = validator.filter_valid_triples(entities_to_save, all_triples)
if invalid_triples:
print(f"\n❌ [SHACL FAILED] Scartate {len(invalid_triples)} triple per violazione di Domain/Range.")
# Salvataggio asincrono degli scarti su MongoDB (DLQ)
if ml_models.get("mongo_db") is not None:
try:
# Aggiungo un timestamp per rintracciabilità
for doc in invalid_triples:
doc["timestamp"] = time.time()
ml_models["mongo_db"].insert_many(invalid_triples)
print("💾 Triple invalide archiviate su MongoDB.")
except Exception as e:
print(f"⚠️ Errore scrittura su Mongo: {e}")
if len(valid_triples) == len(all_triples) and all_triples:
print("\n✅ [SHACL SUCCESS] Tutte le triple rispettano rigorosamente l'ontologia.")
# --- FASE 3: PERSISTENCE (Neo4j) ---
try:
# Cruciale: passiamo SOLO le valid_triples al database a grafo
persister.save_entities_and_triples(entities_to_save, valid_triples)
except Exception as e:
print(f"⚠️ Errore salvataggio Neo4j: {e}")
# Preparazione payload di risposta
graph_data = []
for t in valid_triples:
# Pydantic ci garantisce che i campi esistano
subj_str = str(t.subject)
obj_str = str(t.object)
# Formattazione della relazione (es. "a-loc:isLocatedIn" -> "A_LOC_ISLOCATEDIN")
# in coerenza con la convenzione Neo4j gestita dal loader
pred_str = str(t.predicate).replace(":", "_").replace("-", "_").upper()
# Genero un ID stabile per facilitare il rendering dei nodi lato client
node_id = hashlib.md5(subj_str.encode('utf-8')).hexdigest()
graph_data.append(GraphEdge(
start_node_id=node_id,
start_node_label=subj_str,
start_node_type=str(t.subject_type),
relationship_type=pred_str,
end_node_label=obj_str,
end_node_type=str(t.object_type),
evidence=str(t.evidence),
reasoning=str(t.reasoning)
))
return DiscoveryResponse(
status="success",
message="Estrazione semantica completata",
execution_time_seconds=round(time.time() - start_time, 2),
chunks_processed=len(chunks),
triples_extracted=len(graph_data),
shacl_valid=len(invalid_triples) == 0, # True se nessuna tripla è stata scartata
graph_data=graph_data
)
if __name__ == "__main__":
uvicorn.run("api:app", host="0.0.0.0", port=5000, reload=True) |