File size: 8,116 Bytes
a968971
c4f394e
a968971
 
 
c1b1880
 
a968971
 
 
c1b1880
cc3f780
 
 
a968971
ed88e58
a968971
 
8c4201b
d2bdec9
c1b1880
 
8c4201b
d2bdec9
a968971
 
 
 
 
c1b1880
a968971
 
 
8c4201b
 
c1b1880
 
 
8c4201b
c1b1880
 
cc3f780
 
 
 
 
 
 
 
8c4201b
 
 
cc3f780
 
 
 
 
 
 
8c4201b
cc3f780
8c4201b
a968971
c1b1880
a968971
 
 
8c4201b
c1b1880
 
 
d2bdec9
 
 
8c4201b
d2bdec9
c1b1880
8c4201b
 
fe271ee
8c4201b
a968971
 
8c4201b
a968971
8c4201b
a968971
 
8c4201b
 
 
a968971
 
9fb3deb
a968971
 
9fb3deb
 
 
 
8c4201b
 
a968971
 
8c4201b
 
 
 
 
 
a968971
8c4201b
a968971
cc3f780
 
 
c1b1880
 
cc3f780
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c1b1880
 
cc3f780
 
 
 
 
 
 
 
 
 
8c4201b
 
c1b1880
 
 
 
 
 
 
 
 
9fb3deb
c1b1880
 
 
 
 
 
 
 
 
 
 
 
 
9fb3deb
 
 
 
c1b1880
9fb3deb
 
c1b1880
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import os
from collections import defaultdict
from neo4j import GraphDatabase
from dotenv import load_dotenv

# Carico le env vars. Su HF Spaces pesca in automatico dai secrets.
load_dotenv() 

class KnowledgeGraphPersister:
    def __init__(self):
        # Setup della connessione a Neo4j. 
        uri = os.getenv("NEO4J_URI")
        user = os.getenv("NEO4J_USER")
        password = os.getenv("NEO4J_PASSWORD")
        
        try:
            self.driver = GraphDatabase.driver(uri, auth=(user, password))
            self.driver.verify_connectivity()
            print(f"✅ Connesso a Neo4j ({uri}).")
            
            # Chiamo subito la creazione degli indici. Se partiamo a fare ingestion massiva 
            # senza constraint, il DB collassa al primo blocco di MERGE.
            self._create_constraints()
            
        except Exception as e:
            print(f"❌ Errore critico connessione Neo4j: {e}")
            self.driver = None

    def close(self):
        # Chiudo pulito il driver (chiamato nel lifecycle shutdown dell'API)
        if self.driver:
            self.driver.close()

    def _create_constraints(self):
        if not self.driver: return
        
        # Senza questo vincolo UNIQUE, l'istruzione MERGE fa un Full Table Scan ogni volta.
        # Fondamentale per mantenere le transazioni < 10ms anche con migliaia di nodi.
        query = "CREATE CONSTRAINT resource_uri_unique IF NOT EXISTS FOR (n:Resource) REQUIRE n.uri IS UNIQUE"
        
        # Indice vettoriale nativo per le ricerche di similarità (dimensionato a 384 per matchare all-MiniLM)
        query_vector = """
        CREATE VECTOR INDEX entity_embeddings IF NOT EXISTS 
        FOR (n:Resource) ON (n.embedding) 
        OPTIONS {indexConfig: {
            `vector.dimensions`: 384, 
            `vector.similarity_function`: 'cosine'
        }}
        """
        with self.driver.session() as session:
            try:
                session.run(query)
                print("⚡ Vincolo di unicità verificato.")
            except Exception as e:
                print(f"⚠️ Warning vincolo unicità: {e}")
                
            try:
                session.run(query_vector)
                print("⚡ Vector Index verificato.")
            except Exception as e:
                print(f"⚠️ Warning vector index: {e}")

    def sanitize_name(self, name):
        # Canonicalizzazione molto base: sostituisco spazi inutili e tolgo gli apici che spaccano le query Cypher.
        if not name: return "Unknown"
        return name.strip().replace(" ", "_").replace("'", "").replace('"', "")

    def sanitize_predicate(self, pred):
        # Cruciale per evitare Cypher Injection. In Cypher NON si può parametrizzare 
        # il tipo di relazione in un MERGE (es. non puoi fare -[r:$pred]-). Devo per forza 
        # iniettarlo nella stringa della query, quindi lo normalizzo in modo drastico.
        if not pred: return "RELATED_TO"
        
        pred = pred.replace(":", "_").replace("-", "_").replace(" ", "_")
        clean = "".join(x for x in pred if x.isalnum() or x == "_")
        
        # Convenzione Neo4j: le relationships sono sempre in UPPERCASE
        return clean.upper() if clean else "RELATED_TO"

    def  save_triples(self, triples):
        if not self.driver or not triples:
            return

        print(f"💾 Preparazione Batch di {len(triples)} triple...")

        batched_by_pred = defaultdict(list)
        
        for t in triples:
            safe_pred = self.sanitize_predicate(t.predicate)
            
            item = {
                "subj_uri": self.sanitize_name(t.subject),
                "subj_label": t.subject,
                "subj_type": getattr(t, 'subject_type', '').replace(":", "_").replace("-", "_"),
                "obj_uri": self.sanitize_name(t.object),
                "obj_label": t.object,
                "obj_type": getattr(t, 'object_type', '').replace(":", "_").replace("-", "_"),
                "evidence": getattr(t, 'evidence', 'N/A'),
                "reasoning": getattr(t, 'reasoning', 'N/A'),
                "src": getattr(t, 'source', 'unknown') or 'unknown'
            }
            batched_by_pred[safe_pred].append(item)

        with self.driver.session() as session:
            for pred, data_list in batched_by_pred.items():
                try:
                    session.execute_write(self._unwind_write_tx, pred, data_list)
                    print(f"   -> Inserite {len(data_list)} relazioni :{pred}")
                except Exception as e:
                    print(f"⚠️ Errore batch per relazione :{pred} -> {e}")

        print("✅ Salvataggio completato.")

    def save_entities_and_triples(self, entities_to_save, triples):
        if not self.driver: return

        # Ingestion a 2 step: prima butto dentro i nodi isolati con tutti i loro payload
        # (embedding vettoriali e link a Wikidata), poi in un secondo momento ci aggancio sopra le relazioni.
        if entities_to_save:
            print(f"💾 Salvataggio di {len(entities_to_save)} nodi singoli con vettori...")
            
            node_batch = []
            for item in entities_to_save:
                item["uri"] = self.sanitize_name(item["label"])
                node_batch.append(item)
                
            with self.driver.session() as session:
                session.execute_write(self._unwind_write_nodes, node_batch)

        if triples:
            self.save_triples(triples)

    @staticmethod
    def _unwind_write_nodes(tx, batch_data):
        # L'UNWIND è l'unico modo per fare VERO batching massivo in Neo4j senza distruggere la RAM.
        # Passo un intero array JSON ($batch) e Cypher lo "srotola" inserendo migliaia di nodi al volo.
        query = (
            "UNWIND $batch AS row "
            "MERGE (n:Resource {uri: row.uri}) "
            "ON CREATE SET n.label = row.label, "
            "              n.embedding = row.embedding, "
            "              n.wikidata_sameAs = row.wikidata_sameAs, "
            "              n.last_updated = datetime() "
        )
        tx.run(query, batch=batch_data)

    @staticmethod
    def _unwind_write_tx(tx, predicate, batch_data):
        # Qui avviene la vera traduzione dal mondo RDF a quello Labeled Property Graph (LPG).
        if predicate in ["RDF_TYPE", "TYPE", "A", "CORE_HASTYPE"]:
            # Se l'LLM ha generato una tripla di classificazione ontologica, NON creo un nodo astratto inutile. 
            # Uso APOC per convertire l'oggetto della tripla in una vera Label sul nodo di partenza.
            query = (
                "UNWIND $batch AS row "
                "MERGE (s:Resource {uri: row.subj_uri}) "
                "ON CREATE SET s.label = row.subj_label, s.last_updated = datetime() "
                "WITH s, row "
                "SET s:$( [replace(row.obj_label, ':', '_')] ) "
                "RETURN count(node)"
            )
            tx.run(query, batch=batch_data)
            
        else:
            # Per tutte le altre relazioni semantiche classiche (es. si_trova_in, ha_autore)
            # eseguo un merge standard tra le due entità.
            query = (
                f"UNWIND $batch AS row "
                f"MERGE (s:Resource {{uri: row.subj_uri}}) "
                f"ON CREATE SET s.label = row.subj_label "
                f"MERGE (o:Resource {{uri: row.obj_uri}}) "
                f"ON CREATE SET o.label = row.obj_label "
                f"WITH s, o, row, "
                f"     CASE WHEN row.subj_type <> '' THEN [row.subj_type] ELSE [] END AS s_labels, "
                f"     CASE WHEN row.obj_type <> '' THEN [row.obj_type] ELSE [] END AS o_labels "
                f"SET s:$(s_labels), o:$(o_labels) "
                f"MERGE (s)-[r:`{predicate}`]->(o) "
                f"SET r.evidence = row.evidence, "
                f"    r.reasoning = row.reasoning, "
                f"    r.source = row.src, "
                f"    r.last_updated = datetime()"
            )
            tx.run(query, batch=batch_data)