GaetanoParente's picture
rimossi import inutili e blindato utilizzo utente
9cbbfac
import os
import re
from pathlib import Path
from rdflib import Graph, Literal, RDF, Namespace
from rdflib.namespace import SKOS
from pyshacl import validate
class SemanticValidator:
def __init__(self, ontology_dir="./ontology", shapes_file="./ontology/shapes/auto_constraints.ttl"):
self.shapes_file = shapes_file
# Mappatura namespace
self.namespaces = {
"arco": Namespace("https://w3id.org/arco/ontology/arco/"),
"core": Namespace("https://w3id.org/arco/ontology/core/"),
"a-loc": Namespace("https://w3id.org/arco/ontology/location/"),
"a-cd": Namespace("https://w3id.org/arco/ontology/context-description/"),
"cis": Namespace("http://dati.beniculturali.it/cis/"),
"crm": Namespace("http://www.cidoc-crm.org/cidoc-crm/"),
"ex": Namespace("http://activadigital.it/ontology/")
}
print("🛡️ Inizializzazione Semantic Validator (OWL RL)...")
# Caricamento massivo dell'Ontologia in memoria per il Reasoner
self.ont_graph = Graph()
arco_path = Path(ontology_dir) / "arco"
if arco_path.exists():
for owl_file in arco_path.glob("*.owl"):
self.ont_graph.parse(str(owl_file), format="xml")
cidoc_path = Path(ontology_dir) / "cidoc-crm" / "cidoc-crm.owl"
if cidoc_path.exists():
self.ont_graph.parse(str(cidoc_path), format="xml")
print(f"✅ Ontologia completa caricata nel reasoner ({len(self.ont_graph)} triple).")
if os.path.exists(self.shapes_file):
self.shacl_graph = Graph()
self.shacl_graph.parse(self.shapes_file, format="turtle")
print("🛡️ SHACL Auto-Constraints caricati.")
else:
print("⚠️ File SHACL non trovato. Validazione disabilitata (pericoloso in prod!).")
self.shacl_graph = None
def _get_uri(self, text_val):
if ":" in text_val and not text_val.startswith("http"):
prefix, name = text_val.split(":", 1)
if prefix in self.namespaces:
return self.namespaces[prefix][name]
clean_name = text_val.replace(" ", "_")
clean_name = re.sub(r'[^a-zA-Z0-9_]', '', clean_name)
if not clean_name:
clean_name = "UnknownEntity"
return self.namespaces["ex"][clean_name]
def _json_to_rdf(self, entities, triples):
g = Graph()
for prefix, ns in self.namespaces.items():
g.bind(prefix, ns)
g.bind("skos", SKOS)
if entities:
for ent in entities:
label = ent["label"] if isinstance(ent, dict) else str(ent)
ent_uri = self._get_uri(label)
g.add((ent_uri, SKOS.prefLabel, Literal(label, lang="it")))
if triples:
for t in triples:
subj_uri = self._get_uri(t.subject)
g.add((subj_uri, SKOS.prefLabel, Literal(t.subject, lang="it")))
if t.predicate.lower() in ["rdf:type", "a", "type", "rdf_type"]:
obj_uri = self._get_uri(t.object)
g.add((subj_uri, RDF.type, obj_uri))
else:
pred_uri = self._get_uri(t.predicate)
obj_uri = self._get_uri(t.object)
g.add((subj_uri, pred_uri, obj_uri))
g.add((obj_uri, SKOS.prefLabel, Literal(t.object, lang="it")))
return g
def filter_valid_triples(self, entities, triples):
"""
Esegue la validazione bloccante (OWL RL).
Ritorna le triple valide da salvare su Neo4j e quelle invalide da buttare su Mongo.
"""
if not self.shacl_graph or not triples:
return triples, [], "No Validation"
# 1. Testiamo l'intero batch in un colpo solo per massima velocità
batch_graph = self._json_to_rdf(entities, triples)
conforms, report_graph, report_text = validate(
batch_graph,
shacl_graph=self.shacl_graph,
ont_graph=self.ont_graph,
inference='owlrl'
)
if conforms:
return triples, [], "All valid"
print("⚠️ Rilevate violazioni SHACL nel blocco. Isolamento colpevoli...")
# 2. Se fallisce, isoliamo chirurgicamente le triple non conformi
valid_triples = []
invalid_triples = []
for t in triples:
single_graph = self._json_to_rdf(entities, [t])
t_conforms, _, t_report = validate(
single_graph,
shacl_graph=self.shacl_graph,
ont_graph=self.ont_graph,
inference='owlrl'
)
if t_conforms:
valid_triples.append(t)
else:
invalid_triples.append({
"triple": t.model_dump() if hasattr(t, 'model_dump') else t,
"violation_report": t_report
})
return valid_triples, invalid_triples, report_text