GaetanoParente commited on
Commit
8c4201b
·
verified ·
1 Parent(s): a28147d

Update src/graph/graph_loader.py

Browse files
Files changed (1) hide show
  1. src/graph/graph_loader.py +76 -69
src/graph/graph_loader.py CHANGED
@@ -10,14 +10,18 @@ class KnowledgeGraphPersister:
10
  """
11
  Inizializza il driver Neo4j usando le variabili d'ambiente per sicurezza.
12
  """
13
- uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
14
  user = os.getenv("NEO4J_USER", "neo4j")
15
- password = os.getenv("NEO4J_PASSWORD", "activa_semantic_lab")
16
 
17
- try:
18
  self.driver = GraphDatabase.driver(uri, auth=(user, password))
19
  self.driver.verify_connectivity()
20
- print(f"✅ Connesso a Neo4j ({uri}) successfully.")
 
 
 
 
21
  except Exception as e:
22
  print(f"❌ Errore critico connessione Neo4j: {e}")
23
  self.driver = None
@@ -26,90 +30,93 @@ class KnowledgeGraphPersister:
26
  if self.driver:
27
  self.driver.close()
28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  def sanitize_name(self, name):
30
  """
31
- Normalizza i nomi per creare URI coerenti (Canonicalization base).
32
  """
33
  if not name: return "Unknown"
34
- # Rimuove caratteri speciali e spazi extra, mantiene coerenza maiuscole/minuscole
 
35
  return name.strip().replace(" ", "_").replace("'", "").replace('"', "")
36
 
 
 
 
 
 
 
 
 
37
  def save_triples(self, triples):
38
  """
39
- Salva le triple in BATCH (ottimizzazione performance).
40
- Usa UNWIND per processare liste di dati in un'unica transazione.
41
  """
42
- if not self.driver:
43
- print("⚠️ Driver non connesso. Impossibile salvare.")
44
  return
45
 
46
- if not triples:
47
- return
48
 
49
- print(f"💾 Salvataggio BATCH di {len(triples)} triple su Neo4j...")
 
 
50
 
51
- # 1. Prepariamo i dati come lista di dizionari (Payload leggero)
52
- batch_data = []
53
  for t in triples:
54
- batch_data.append({
 
 
55
  "subj_uri": self.sanitize_name(t.subject),
56
  "subj_label": t.subject,
57
- "pred": t.predicate, # Nota: Il predicato dinamico richiede attenzione in Cypher
58
  "obj_uri": self.sanitize_name(t.object),
59
  "obj_label": t.object,
60
- "conf": t.confidence,
61
- "src": t.source
62
- })
 
63
 
64
- # 2. Query Batch Ottimizzata
65
- # Nota: In Cypher non si può parametrizzare il TIPO di relazione (es. :RELAZIONE).
66
- # Per performance pura con relazioni dinamiche, usiamo APOC o un approccio ibrido.
67
- # Qui usiamo un approccio sicuro iterando nel driver ma con transazione unica,
68
- # oppure raggruppiamo per tipo di relazione.
69
-
70
- # Approccio Migliore per MVP: Transazione singola
71
  with self.driver.session() as session:
72
- try:
73
- session.execute_write(self._batch_write_tx, batch_data)
74
- print("✅ Batch completato.")
75
- except Exception as e:
76
- print(f"⚠️ Errore durante il salvataggio batch: {e}")
 
77
 
78
- @staticmethod
79
- def _batch_write_tx(tx, batch_data):
80
- """Funzione transazionale interna."""
81
- for item in batch_data:
82
- # Usiamo MERGE per evitare duplicati
83
- # Usiamo apoc.create.relationship se disponibile per predicati dinamici,
84
- # altrimenti usiamo string formatting controllata (safe perché interna).
85
-
86
- # Sanitizzazione predicato per evitare injection (solo caratteri sicuri)
87
- safe_pred = "".join(x for x in item['pred'] if x.isalnum() or x in "_:")
88
- if not safe_pred: safe_pred = "RELATED_TO"
89
-
90
- query = (
91
- f"MERGE (s:Resource {{uri: $subj_uri}}) "
92
- f"ON CREATE SET s.label = $subj_label "
93
- f"MERGE (o:Resource {{uri: $obj_uri}}) "
94
- f"ON CREATE SET o.label = $obj_label "
95
- f"MERGE (s)-[r:`{safe_pred}`]->(o) "
96
- f"SET r.confidence = $conf, r.source = $src"
97
- )
98
-
99
- tx.run(query, item)
100
 
101
- # --- TEST ISOLATO ---
102
- if __name__ == "__main__":
103
- # Creiamo un mock per testare senza dipendenze esterne
104
- from collections import namedtuple
105
- MockTriple = namedtuple("MockTriple", ["subject", "predicate", "object", "confidence", "source"])
106
-
107
- triples = [
108
- MockTriple("Batch Node 1", "TEST_BATCH", "Batch Node 2", 0.99, "test_doc_1"),
109
- MockTriple("Batch Node 2", "IS_RELATED_TO", "Batch Node 3", 0.85, "test_doc_1")
110
- ]
111
-
112
- # Assicurati di avere le variabili d'ambiente o fallback attivi
113
- persister = KnowledgeGraphPersister()
114
- persister.save_triples(triples)
115
- persister.close()
 
 
 
 
 
 
 
10
  """
11
  Inizializza il driver Neo4j usando le variabili d'ambiente per sicurezza.
12
  """
13
+ uri = os.getenv("NEO4J_URI", "")
14
  user = os.getenv("NEO4J_USER", "neo4j")
15
+ password = os.getenv("NEO4J_PASSWORD", "")
16
 
17
+ try:
18
  self.driver = GraphDatabase.driver(uri, auth=(user, password))
19
  self.driver.verify_connectivity()
20
+ print(f"✅ Connesso a Neo4j ({uri}).")
21
+
22
+ # Creazione indici all'avvio (Fondamentale per la velocità dei MERGE)
23
+ self._create_constraints()
24
+
25
  except Exception as e:
26
  print(f"❌ Errore critico connessione Neo4j: {e}")
27
  self.driver = None
 
30
  if self.driver:
31
  self.driver.close()
32
 
33
+ def _create_constraints(self):
34
+ """
35
+ Crea un vincolo di unicità sulla proprietà URI.
36
+ Senza questo, MERGE diventa lentissimo (Full Table Scan).
37
+ """
38
+ if not self.driver: return
39
+ query = "CREATE CONSTRAINT resource_uri_unique IF NOT EXISTS FOR (n:Resource) REQUIRE n.uri IS UNIQUE"
40
+ with self.driver.session() as session:
41
+ try:
42
+ session.run(query)
43
+ print("⚡ Vincoli/Indici Neo4j verificati.")
44
+ except Exception as e:
45
+ print(f"⚠️ Warning creazione indici: {e}")
46
+
47
  def sanitize_name(self, name):
48
  """
49
+ Canonicalization base.
50
  """
51
  if not name: return "Unknown"
52
+ # Rimuove spazi extra e normalizza.
53
+ # Nota: In produzione usare slugify o urllib.parse.quote per URI robusti
54
  return name.strip().replace(" ", "_").replace("'", "").replace('"', "")
55
 
56
+ def sanitize_predicate(self, pred):
57
+ """
58
+ Pulisce il predicato per evitare Cypher Injection, dato che non può essere parametrizzato.
59
+ """
60
+ # Accetta solo caratteri alfanumerici e underscore. Upper case per convenzione Neo4j.
61
+ clean = "".join(x for x in pred if x.isalnum() or x == "_")
62
+ return clean.upper() if clean else "RELATED_TO"
63
+
64
  def save_triples(self, triples):
65
  """
66
+ Salva le triple usando VERO Batching (UNWIND).
67
+ Raggruppa le triple per predicato per aggirare il limite di parametrizzazione delle relazioni.
68
  """
69
+ if not self.driver or not triples:
 
70
  return
71
 
72
+ print(f"💾 Preparazione Batch di {len(triples)} triple...")
 
73
 
74
+ # 1. Raggruppamento per Predicato
75
+ # Struttura: { "LOCATED_IN": [ {subj:..., obj:..., ...}, ... ], "HAS_TYPE": [...] }
76
+ batched_by_pred = defaultdict(list)
77
 
 
 
78
  for t in triples:
79
+ safe_pred = self.sanitize_predicate(t.predicate)
80
+
81
+ item = {
82
  "subj_uri": self.sanitize_name(t.subject),
83
  "subj_label": t.subject,
 
84
  "obj_uri": self.sanitize_name(t.object),
85
  "obj_label": t.object,
86
+ "conf": float(t.confidence), # Assicura float nativo
87
+ "src": t.source or "unknown"
88
+ }
89
+ batched_by_pred[safe_pred].append(item)
90
 
91
+ # 2. Esecuzione Transazioni (Una per tipo di relazione)
 
 
 
 
 
 
92
  with self.driver.session() as session:
93
+ for pred, data_list in batched_by_pred.items():
94
+ try:
95
+ session.execute_write(self._unwind_write_tx, pred, data_list)
96
+ print(f" -> Inserite {len(data_list)} relazioni :{pred}")
97
+ except Exception as e:
98
+ print(f"⚠️ Errore batch per relazione :{pred} -> {e}")
99
 
100
+ print("✅ Salvataggio completato.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
 
102
+ @staticmethod
103
+ def _unwind_write_tx(tx, predicate, batch_data):
104
+ """
105
+ Usa UNWIND per inserire migliaia di righe in un colpo solo.
106
+ Molto più performante su rete cloud.
107
+ """
108
+ # La query è dinamica SOLO sul tipo di relazione (sanitizzato prima),
109
+ # tutto il resto passa come parametro lista ($batch).
110
+ query = (
111
+ f"UNWIND $batch AS row "
112
+ f"MERGE (s:Resource {{uri: row.subj_uri}}) "
113
+ f"ON CREATE SET s.label = row.subj_label "
114
+ f"MERGE (o:Resource {{uri: row.obj_uri}}) "
115
+ f"ON CREATE SET o.label = row.obj_label "
116
+ f"MERGE (s)-[r:`{predicate}`]->(o) "
117
+ f"SET r.confidence = row.conf, "
118
+ f" r.source = row.src, "
119
+ f" r.last_updated = datetime()"
120
+ )
121
+
122
+ tx.run(query, batch=batch_data)