File size: 8,116 Bytes
a968971 c4f394e a968971 c1b1880 a968971 c1b1880 cc3f780 a968971 ed88e58 a968971 8c4201b d2bdec9 c1b1880 8c4201b d2bdec9 a968971 c1b1880 a968971 8c4201b c1b1880 8c4201b c1b1880 cc3f780 8c4201b cc3f780 8c4201b cc3f780 8c4201b a968971 c1b1880 a968971 8c4201b c1b1880 d2bdec9 8c4201b d2bdec9 c1b1880 8c4201b fe271ee 8c4201b a968971 8c4201b a968971 8c4201b a968971 8c4201b a968971 9fb3deb a968971 9fb3deb 8c4201b a968971 8c4201b a968971 8c4201b a968971 cc3f780 c1b1880 cc3f780 c1b1880 cc3f780 8c4201b c1b1880 9fb3deb c1b1880 9fb3deb c1b1880 9fb3deb c1b1880 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | import os
from collections import defaultdict
from neo4j import GraphDatabase
from dotenv import load_dotenv
# Carico le env vars. Su HF Spaces pesca in automatico dai secrets.
load_dotenv()
class KnowledgeGraphPersister:
def __init__(self):
# Setup della connessione a Neo4j.
uri = os.getenv("NEO4J_URI")
user = os.getenv("NEO4J_USER")
password = os.getenv("NEO4J_PASSWORD")
try:
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.driver.verify_connectivity()
print(f"✅ Connesso a Neo4j ({uri}).")
# Chiamo subito la creazione degli indici. Se partiamo a fare ingestion massiva
# senza constraint, il DB collassa al primo blocco di MERGE.
self._create_constraints()
except Exception as e:
print(f"❌ Errore critico connessione Neo4j: {e}")
self.driver = None
def close(self):
# Chiudo pulito il driver (chiamato nel lifecycle shutdown dell'API)
if self.driver:
self.driver.close()
def _create_constraints(self):
if not self.driver: return
# Senza questo vincolo UNIQUE, l'istruzione MERGE fa un Full Table Scan ogni volta.
# Fondamentale per mantenere le transazioni < 10ms anche con migliaia di nodi.
query = "CREATE CONSTRAINT resource_uri_unique IF NOT EXISTS FOR (n:Resource) REQUIRE n.uri IS UNIQUE"
# Indice vettoriale nativo per le ricerche di similarità (dimensionato a 384 per matchare all-MiniLM)
query_vector = """
CREATE VECTOR INDEX entity_embeddings IF NOT EXISTS
FOR (n:Resource) ON (n.embedding)
OPTIONS {indexConfig: {
`vector.dimensions`: 384,
`vector.similarity_function`: 'cosine'
}}
"""
with self.driver.session() as session:
try:
session.run(query)
print("⚡ Vincolo di unicità verificato.")
except Exception as e:
print(f"⚠️ Warning vincolo unicità: {e}")
try:
session.run(query_vector)
print("⚡ Vector Index verificato.")
except Exception as e:
print(f"⚠️ Warning vector index: {e}")
def sanitize_name(self, name):
# Canonicalizzazione molto base: sostituisco spazi inutili e tolgo gli apici che spaccano le query Cypher.
if not name: return "Unknown"
return name.strip().replace(" ", "_").replace("'", "").replace('"', "")
def sanitize_predicate(self, pred):
# Cruciale per evitare Cypher Injection. In Cypher NON si può parametrizzare
# il tipo di relazione in un MERGE (es. non puoi fare -[r:$pred]-). Devo per forza
# iniettarlo nella stringa della query, quindi lo normalizzo in modo drastico.
if not pred: return "RELATED_TO"
pred = pred.replace(":", "_").replace("-", "_").replace(" ", "_")
clean = "".join(x for x in pred if x.isalnum() or x == "_")
# Convenzione Neo4j: le relationships sono sempre in UPPERCASE
return clean.upper() if clean else "RELATED_TO"
def save_triples(self, triples):
if not self.driver or not triples:
return
print(f"💾 Preparazione Batch di {len(triples)} triple...")
batched_by_pred = defaultdict(list)
for t in triples:
safe_pred = self.sanitize_predicate(t.predicate)
item = {
"subj_uri": self.sanitize_name(t.subject),
"subj_label": t.subject,
"subj_type": getattr(t, 'subject_type', '').replace(":", "_").replace("-", "_"),
"obj_uri": self.sanitize_name(t.object),
"obj_label": t.object,
"obj_type": getattr(t, 'object_type', '').replace(":", "_").replace("-", "_"),
"evidence": getattr(t, 'evidence', 'N/A'),
"reasoning": getattr(t, 'reasoning', 'N/A'),
"src": getattr(t, 'source', 'unknown') or 'unknown'
}
batched_by_pred[safe_pred].append(item)
with self.driver.session() as session:
for pred, data_list in batched_by_pred.items():
try:
session.execute_write(self._unwind_write_tx, pred, data_list)
print(f" -> Inserite {len(data_list)} relazioni :{pred}")
except Exception as e:
print(f"⚠️ Errore batch per relazione :{pred} -> {e}")
print("✅ Salvataggio completato.")
def save_entities_and_triples(self, entities_to_save, triples):
if not self.driver: return
# Ingestion a 2 step: prima butto dentro i nodi isolati con tutti i loro payload
# (embedding vettoriali e link a Wikidata), poi in un secondo momento ci aggancio sopra le relazioni.
if entities_to_save:
print(f"💾 Salvataggio di {len(entities_to_save)} nodi singoli con vettori...")
node_batch = []
for item in entities_to_save:
item["uri"] = self.sanitize_name(item["label"])
node_batch.append(item)
with self.driver.session() as session:
session.execute_write(self._unwind_write_nodes, node_batch)
if triples:
self.save_triples(triples)
@staticmethod
def _unwind_write_nodes(tx, batch_data):
# L'UNWIND è l'unico modo per fare VERO batching massivo in Neo4j senza distruggere la RAM.
# Passo un intero array JSON ($batch) e Cypher lo "srotola" inserendo migliaia di nodi al volo.
query = (
"UNWIND $batch AS row "
"MERGE (n:Resource {uri: row.uri}) "
"ON CREATE SET n.label = row.label, "
" n.embedding = row.embedding, "
" n.wikidata_sameAs = row.wikidata_sameAs, "
" n.last_updated = datetime() "
)
tx.run(query, batch=batch_data)
@staticmethod
def _unwind_write_tx(tx, predicate, batch_data):
# Qui avviene la vera traduzione dal mondo RDF a quello Labeled Property Graph (LPG).
if predicate in ["RDF_TYPE", "TYPE", "A", "CORE_HASTYPE"]:
# Se l'LLM ha generato una tripla di classificazione ontologica, NON creo un nodo astratto inutile.
# Uso APOC per convertire l'oggetto della tripla in una vera Label sul nodo di partenza.
query = (
"UNWIND $batch AS row "
"MERGE (s:Resource {uri: row.subj_uri}) "
"ON CREATE SET s.label = row.subj_label, s.last_updated = datetime() "
"WITH s, row "
"SET s:$( [replace(row.obj_label, ':', '_')] ) "
"RETURN count(node)"
)
tx.run(query, batch=batch_data)
else:
# Per tutte le altre relazioni semantiche classiche (es. si_trova_in, ha_autore)
# eseguo un merge standard tra le due entità.
query = (
f"UNWIND $batch AS row "
f"MERGE (s:Resource {{uri: row.subj_uri}}) "
f"ON CREATE SET s.label = row.subj_label "
f"MERGE (o:Resource {{uri: row.obj_uri}}) "
f"ON CREATE SET o.label = row.obj_label "
f"WITH s, o, row, "
f" CASE WHEN row.subj_type <> '' THEN [row.subj_type] ELSE [] END AS s_labels, "
f" CASE WHEN row.obj_type <> '' THEN [row.obj_type] ELSE [] END AS o_labels "
f"SET s:$(s_labels), o:$(o_labels) "
f"MERGE (s)-[r:`{predicate}`]->(o) "
f"SET r.evidence = row.evidence, "
f" r.reasoning = row.reasoning, "
f" r.source = row.src, "
f" r.last_updated = datetime()"
)
tx.run(query, batch=batch_data) |