File size: 8,351 Bytes
cc3f780
 
 
 
 
 
c1b1880
9acef2c
 
cc3f780
 
 
 
 
 
b70d82f
cc3f780
9acef2c
 
 
 
 
c1b1880
 
 
 
 
 
 
 
 
 
 
 
b70d82f
c1b1880
 
 
 
 
 
 
b70d82f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c1b1880
 
 
 
 
 
 
 
cc3f780
 
 
c1b1880
 
cc3f780
 
 
 
 
b70d82f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc3f780
 
 
 
 
 
 
c1b1880
 
 
 
 
 
 
cc3f780
c1b1880
cc3f780
 
 
c1b1880
cc3f780
 
 
 
 
 
 
 
 
 
 
9fb3deb
 
 
 
cc3f780
 
 
 
 
c1b1880
cc3f780
 
 
c1b1880
cc3f780
 
 
 
 
 
 
c1b1880
 
b70d82f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc3f780
 
 
b70d82f
 
cc3f780
 
 
c1b1880
cc3f780
b70d82f
 
 
 
 
cc3f780
b70d82f
 
 
cc3f780
c1b1880
cc3f780
 
b70d82f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc3f780
 
c1b1880
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
import os
import time
import hashlib
from contextlib import asynccontextmanager
import warnings
import logging

from src.ingestion.semantic_splitter import ActivaSemanticSplitter
from src.extraction.extractor import NeuroSymbolicExtractor
from src.validation.validator import SemanticValidator
from src.graph.graph_loader import KnowledgeGraphPersister
from src.graph.entity_resolver import EntityResolver
from pymongo import MongoClient

warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

logging.getLogger("transformers").setLevel(logging.ERROR)

# --- GESTORE DEGLI STATI GLOBALI ---
# Usiamo un dizionario globale per tenere in RAM i pesi dei modelli. 
ml_models = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Nel mondo FastAPI il lifespan è il modo più pulito per fare il setup. 
    # Mi permette di caricare i modelli di embedding e l'LLM all'avvio del worker, una sola volta.
    print("⏳ Inizializzazione modelli (SentenceTransformers e Llama3) nel Lifespan...")
    
    ml_models["splitter"] = ActivaSemanticSplitter(model_name="all-MiniLM-L6-v2")
    
    ml_models["extractor"] = NeuroSymbolicExtractor(index_path="ontology/domain_index.json")
    
    ml_models["persister"] = KnowledgeGraphPersister()
    ml_models["resolver"] = EntityResolver(neo4j_driver=ml_models["persister"].driver, similarity_threshold=0.85)
    ml_models["validator"] = SemanticValidator()
    
    print("✅ Modelli caricati e pronti a ricevere richieste!")
    
    # Setup connessione MongoDB per i log degli scarti
    mongo_ur = os.getenv("MONGO_URI") 
    mongo_user = os.getenv("MONGO_USER")
    mongo_pass = os.getenv("MONGO_PASS")
    
    if mongo_ur and mongo_user and mongo_pass:
        try:
            client = MongoClient(mongo_ur, username=mongo_user, password=mongo_pass)
            # Creo il database "semantic_discovery" e la collection "rejected_triples"
            ml_models["mongo_db"] = client["semantic_discovery"]["rejected_triples"]
            print("✅ Connesso a MongoDB per lo storage delle allucinazioni LLM.")
        except Exception as e:
            print(f"⚠️ Errore connessione MongoDB: {e}")
            ml_models["mongo_db"] = None
    else:
        print("⚠️ Credenziali MongoDB mancanti. Gli scarti non verranno tracciati.")
        ml_models["mongo_db"] = None

    yield # Qui l'API inizia ad ascoltare le chiamate in ingresso
    
    # Chiusura pulita delle connessioni. Evita query appese su Neo4j quando killiamo il container.
    print("🛑 Spegnimento in corso... chiusura connessioni e pulizia memoria.")
    if "persister" in ml_models and ml_models["persister"]:
        ml_models["persister"].close() 
    ml_models.clear()

app = FastAPI(
    title="Automated Semantic Discovery API", 
    description="Endpoint per l'ingestion testuale e l'estrazione neuro-simbolica",
    version="1.0",
    lifespan=lifespan
)

class DiscoveryRequest(BaseModel):
    documentText: str

class GraphEdge(BaseModel):
    start_node_id: str
    start_node_label: str
    start_node_type: str
    relationship_type: str
    end_node_label: str
    end_node_type: str
    evidence: str
    reasoning: str

class DiscoveryResponse(BaseModel):
    status: str
    message: str
    execution_time_seconds: float
    chunks_processed: int
    triples_extracted: int
    shacl_valid: bool
    graph_data: list[GraphEdge]

@app.post("/api/discover", response_model=DiscoveryResponse)
def run_discovery(payload: DiscoveryRequest):
    start_time = time.time()
    raw_text = payload.documentText

    if not raw_text or not raw_text.strip():
        raise HTTPException(status_code=400, detail="Il testo fornito è vuoto.")

    # Recupero le istanze
    splitter = ml_models["splitter"]
    extractor = ml_models["extractor"]
    validator = ml_models["validator"]
    resolver = ml_models["resolver"]
    persister = ml_models["persister"]

    # --- FASE 1: INGESTION ---
    # Taglio il testo in modo semantico per non sforare la context window dell'LLM
    chunks, _, _ = splitter.create_chunks(raw_text, percentile_threshold=90)
    
    # --- FASE 2: EXTRACTION ---
    # Invocazione del motore neuro-simbolico per ogni blocco di testo
    all_triples = []
    all_entities = []
    for i, chunk in enumerate(chunks):
        chunk_id = f"api_req_chunk_{i+1}"
        extraction_result = extractor.extract(chunk, source_id=chunk_id)
        
        if extraction_result:
            if extraction_result.triples:
                all_triples.extend(extraction_result.triples)
            if hasattr(extraction_result, 'entities') and extraction_result.entities:
                all_entities.extend(extraction_result.entities)
        
        if i < len(chunks) - 1:
            print(f"⏳ Pacing per Groq API: attesa 20s per non sforare i 30K TPM...")
            time.sleep(20)

    if not all_triples:
        return {
            "status": "success", 
            "message": "Nessuna entità trovata.", 
            "graph_data": []
        }

    # --- FASE 2.1: SYMBOLIC RESOLUTION ---
    # Deduplica in RAM e linking verso Wikidata e Neo4j (Entity Resolution)
    entities_to_save = []
    try:
        all_entities, all_triples, entities_to_save = resolver.resolve_entities(all_entities, all_triples)
    except Exception as e:
        print(f"⚠️ Errore nel resolver (skip): {e}")

    # --- FASE 2.2: VALIDATION ---
    # Prima di salvare nel DB, verifico con SHACL 
    # se l'LLM ha generato allucinazioni o violato i vincoli dell'ontologia.
    valid_triples, invalid_triples, report = validator.filter_valid_triples(entities_to_save, all_triples)
    
    if invalid_triples:
        print(f"\n❌ [SHACL FAILED] Scartate {len(invalid_triples)} triple per violazione di Domain/Range.")
        
        # Salvataggio asincrono degli scarti su MongoDB (DLQ)
        if ml_models.get("mongo_db") is not None:
            try:
                # Aggiungo un timestamp per rintracciabilità
                for doc in invalid_triples:
                    doc["timestamp"] = time.time()
                ml_models["mongo_db"].insert_many(invalid_triples)
                print("💾 Triple invalide archiviate su MongoDB.")
            except Exception as e:
                print(f"⚠️ Errore scrittura su Mongo: {e}")
    
    if len(valid_triples) == len(all_triples) and all_triples:
        print("\n✅ [SHACL SUCCESS] Tutte le triple rispettano rigorosamente l'ontologia.")

    # --- FASE 3: PERSISTENCE (Neo4j) ---
    try:
        # Cruciale: passiamo SOLO le valid_triples al database a grafo
        persister.save_entities_and_triples(entities_to_save, valid_triples)
    except Exception as e:
        print(f"⚠️ Errore salvataggio Neo4j: {e}")

    # Preparazione payload di risposta
    graph_data = []

    for t in valid_triples:
        # Pydantic ci garantisce che i campi esistano
        subj_str = str(t.subject)
        obj_str = str(t.object)
        
        # Formattazione della relazione (es. "a-loc:isLocatedIn" -> "A_LOC_ISLOCATEDIN")
        # in coerenza con la convenzione Neo4j gestita dal loader
        pred_str = str(t.predicate).replace(":", "_").replace("-", "_").upper() 

        # Genero un ID stabile per facilitare il rendering dei nodi lato client
        node_id = hashlib.md5(subj_str.encode('utf-8')).hexdigest()

        graph_data.append(GraphEdge(
            start_node_id=node_id,
            start_node_label=subj_str,
            start_node_type=str(t.subject_type),
            relationship_type=pred_str,
            end_node_label=obj_str,
            end_node_type=str(t.object_type),
            evidence=str(t.evidence),
            reasoning=str(t.reasoning)
        ))

    return DiscoveryResponse(
        status="success",
        message="Estrazione semantica completata",
        execution_time_seconds=round(time.time() - start_time, 2),
        chunks_processed=len(chunks),
        triples_extracted=len(graph_data),
        shacl_valid=len(invalid_triples) == 0, # True se nessuna tripla è stata scartata
        graph_data=graph_data 
    )

if __name__ == "__main__":
    uvicorn.run("api:app", host="0.0.0.0", port=5000, reload=True)