import os import re from pathlib import Path from rdflib import Graph, Literal, RDF, Namespace from rdflib.namespace import SKOS from pyshacl import validate class SemanticValidator: def __init__(self, ontology_dir="./ontology", shapes_file="./ontology/shapes/auto_constraints.ttl"): self.shapes_file = shapes_file # Mappatura namespace self.namespaces = { "arco": Namespace("https://w3id.org/arco/ontology/arco/"), "core": Namespace("https://w3id.org/arco/ontology/core/"), "a-loc": Namespace("https://w3id.org/arco/ontology/location/"), "a-cd": Namespace("https://w3id.org/arco/ontology/context-description/"), "cis": Namespace("http://dati.beniculturali.it/cis/"), "crm": Namespace("http://www.cidoc-crm.org/cidoc-crm/"), "ex": Namespace("http://activadigital.it/ontology/") } print("🛡️ Inizializzazione Semantic Validator (OWL RL)...") # Caricamento massivo dell'Ontologia in memoria per il Reasoner self.ont_graph = Graph() arco_path = Path(ontology_dir) / "arco" if arco_path.exists(): for owl_file in arco_path.glob("*.owl"): self.ont_graph.parse(str(owl_file), format="xml") cidoc_path = Path(ontology_dir) / "cidoc-crm" / "cidoc-crm.owl" if cidoc_path.exists(): self.ont_graph.parse(str(cidoc_path), format="xml") print(f"✅ Ontologia completa caricata nel reasoner ({len(self.ont_graph)} triple).") if os.path.exists(self.shapes_file): self.shacl_graph = Graph() self.shacl_graph.parse(self.shapes_file, format="turtle") print("🛡️ SHACL Auto-Constraints caricati.") else: print("⚠️ File SHACL non trovato. Validazione disabilitata (pericoloso in prod!).") self.shacl_graph = None def _get_uri(self, text_val): if ":" in text_val and not text_val.startswith("http"): prefix, name = text_val.split(":", 1) if prefix in self.namespaces: return self.namespaces[prefix][name] clean_name = text_val.replace(" ", "_") clean_name = re.sub(r'[^a-zA-Z0-9_]', '', clean_name) if not clean_name: clean_name = "UnknownEntity" return self.namespaces["ex"][clean_name] def _json_to_rdf(self, entities, triples): g = Graph() for prefix, ns in self.namespaces.items(): g.bind(prefix, ns) g.bind("skos", SKOS) if entities: for ent in entities: label = ent["label"] if isinstance(ent, dict) else str(ent) ent_uri = self._get_uri(label) g.add((ent_uri, SKOS.prefLabel, Literal(label, lang="it"))) if triples: for t in triples: subj_uri = self._get_uri(t.subject) g.add((subj_uri, SKOS.prefLabel, Literal(t.subject, lang="it"))) if t.predicate.lower() in ["rdf:type", "a", "type", "rdf_type"]: obj_uri = self._get_uri(t.object) g.add((subj_uri, RDF.type, obj_uri)) else: pred_uri = self._get_uri(t.predicate) obj_uri = self._get_uri(t.object) g.add((subj_uri, pred_uri, obj_uri)) g.add((obj_uri, SKOS.prefLabel, Literal(t.object, lang="it"))) return g def filter_valid_triples(self, entities, triples): """ Esegue la validazione bloccante (OWL RL). Ritorna le triple valide da salvare su Neo4j e quelle invalide da buttare su Mongo. """ if not self.shacl_graph or not triples: return triples, [], "No Validation" # 1. Testiamo l'intero batch in un colpo solo per massima velocità batch_graph = self._json_to_rdf(entities, triples) conforms, report_graph, report_text = validate( batch_graph, shacl_graph=self.shacl_graph, ont_graph=self.ont_graph, inference='owlrl' ) if conforms: return triples, [], "All valid" print("⚠️ Rilevate violazioni SHACL nel blocco. Isolamento colpevoli...") # 2. Se fallisce, isoliamo chirurgicamente le triple non conformi valid_triples = [] invalid_triples = [] for t in triples: single_graph = self._json_to_rdf(entities, [t]) t_conforms, _, t_report = validate( single_graph, shacl_graph=self.shacl_graph, ont_graph=self.ont_graph, inference='owlrl' ) if t_conforms: valid_triples.append(t) else: invalid_triples.append({ "triple": t.model_dump() if hasattr(t, 'model_dump') else t, "violation_report": t_report }) return valid_triples, invalid_triples, report_text