File size: 5,163 Bytes
a968971 9fb3deb b70d82f 9cbbfac a968971 9fb3deb b70d82f a968971 b70d82f 2e93420 b70d82f 2e93420 b70d82f c1b1880 2e93420 b70d82f a968971 b70d82f a968971 c1b1880 a968971 2e93420 9fb3deb 2e93420 cc3f780 a968971 2e93420 a968971 cc3f780 2e93420 cc3f780 2e93420 cc3f780 a968971 c1b1880 2e93420 cc3f780 2e93420 a968971 b70d82f a968971 b70d82f a968971 b70d82f a968971 b70d82f a968971 b70d82f a968971 b70d82f a968971 b70d82f a968971 b70d82f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | import os
import re
from pathlib import Path
from rdflib import Graph, Literal, RDF, Namespace
from rdflib.namespace import SKOS
from pyshacl import validate
class SemanticValidator:
def __init__(self, ontology_dir="./ontology", shapes_file="./ontology/shapes/auto_constraints.ttl"):
self.shapes_file = shapes_file
# Mappatura namespace
self.namespaces = {
"arco": Namespace("https://w3id.org/arco/ontology/arco/"),
"core": Namespace("https://w3id.org/arco/ontology/core/"),
"a-loc": Namespace("https://w3id.org/arco/ontology/location/"),
"a-cd": Namespace("https://w3id.org/arco/ontology/context-description/"),
"cis": Namespace("http://dati.beniculturali.it/cis/"),
"crm": Namespace("http://www.cidoc-crm.org/cidoc-crm/"),
"ex": Namespace("http://activadigital.it/ontology/")
}
print("🛡️ Inizializzazione Semantic Validator (OWL RL)...")
# Caricamento massivo dell'Ontologia in memoria per il Reasoner
self.ont_graph = Graph()
arco_path = Path(ontology_dir) / "arco"
if arco_path.exists():
for owl_file in arco_path.glob("*.owl"):
self.ont_graph.parse(str(owl_file), format="xml")
cidoc_path = Path(ontology_dir) / "cidoc-crm" / "cidoc-crm.owl"
if cidoc_path.exists():
self.ont_graph.parse(str(cidoc_path), format="xml")
print(f"✅ Ontologia completa caricata nel reasoner ({len(self.ont_graph)} triple).")
if os.path.exists(self.shapes_file):
self.shacl_graph = Graph()
self.shacl_graph.parse(self.shapes_file, format="turtle")
print("🛡️ SHACL Auto-Constraints caricati.")
else:
print("⚠️ File SHACL non trovato. Validazione disabilitata (pericoloso in prod!).")
self.shacl_graph = None
def _get_uri(self, text_val):
if ":" in text_val and not text_val.startswith("http"):
prefix, name = text_val.split(":", 1)
if prefix in self.namespaces:
return self.namespaces[prefix][name]
clean_name = text_val.replace(" ", "_")
clean_name = re.sub(r'[^a-zA-Z0-9_]', '', clean_name)
if not clean_name:
clean_name = "UnknownEntity"
return self.namespaces["ex"][clean_name]
def _json_to_rdf(self, entities, triples):
g = Graph()
for prefix, ns in self.namespaces.items():
g.bind(prefix, ns)
g.bind("skos", SKOS)
if entities:
for ent in entities:
label = ent["label"] if isinstance(ent, dict) else str(ent)
ent_uri = self._get_uri(label)
g.add((ent_uri, SKOS.prefLabel, Literal(label, lang="it")))
if triples:
for t in triples:
subj_uri = self._get_uri(t.subject)
g.add((subj_uri, SKOS.prefLabel, Literal(t.subject, lang="it")))
if t.predicate.lower() in ["rdf:type", "a", "type", "rdf_type"]:
obj_uri = self._get_uri(t.object)
g.add((subj_uri, RDF.type, obj_uri))
else:
pred_uri = self._get_uri(t.predicate)
obj_uri = self._get_uri(t.object)
g.add((subj_uri, pred_uri, obj_uri))
g.add((obj_uri, SKOS.prefLabel, Literal(t.object, lang="it")))
return g
def filter_valid_triples(self, entities, triples):
"""
Esegue la validazione bloccante (OWL RL).
Ritorna le triple valide da salvare su Neo4j e quelle invalide da buttare su Mongo.
"""
if not self.shacl_graph or not triples:
return triples, [], "No Validation"
# 1. Testiamo l'intero batch in un colpo solo per massima velocità
batch_graph = self._json_to_rdf(entities, triples)
conforms, report_graph, report_text = validate(
batch_graph,
shacl_graph=self.shacl_graph,
ont_graph=self.ont_graph,
inference='owlrl'
)
if conforms:
return triples, [], "All valid"
print("⚠️ Rilevate violazioni SHACL nel blocco. Isolamento colpevoli...")
# 2. Se fallisce, isoliamo chirurgicamente le triple non conformi
valid_triples = []
invalid_triples = []
for t in triples:
single_graph = self._json_to_rdf(entities, [t])
t_conforms, _, t_report = validate(
single_graph,
shacl_graph=self.shacl_graph,
ont_graph=self.ont_graph,
inference='owlrl'
)
if t_conforms:
valid_triples.append(t)
else:
invalid_triples.append({
"triple": t.model_dump() if hasattr(t, 'model_dump') else t,
"violation_report": t_report
})
return valid_triples, invalid_triples, report_text |