| import os |
| import re |
| from pathlib import Path |
| from rdflib import Graph, Literal, RDF, Namespace |
| from rdflib.namespace import SKOS |
| from pyshacl import validate |
|
|
| class SemanticValidator: |
| def __init__(self, ontology_dir="./ontology", shapes_file="./ontology/shapes/auto_constraints.ttl"): |
| self.shapes_file = shapes_file |
| |
| |
| self.namespaces = { |
| "arco": Namespace("https://w3id.org/arco/ontology/arco/"), |
| "core": Namespace("https://w3id.org/arco/ontology/core/"), |
| "a-loc": Namespace("https://w3id.org/arco/ontology/location/"), |
| "a-cd": Namespace("https://w3id.org/arco/ontology/context-description/"), |
| "cis": Namespace("http://dati.beniculturali.it/cis/"), |
| "crm": Namespace("http://www.cidoc-crm.org/cidoc-crm/"), |
| "ex": Namespace("http://activadigital.it/ontology/") |
| } |
| |
| print("🛡️ Inizializzazione Semantic Validator (OWL RL)...") |
| |
| |
| self.ont_graph = Graph() |
| |
| arco_path = Path(ontology_dir) / "arco" |
| if arco_path.exists(): |
| for owl_file in arco_path.glob("*.owl"): |
| self.ont_graph.parse(str(owl_file), format="xml") |
| |
| cidoc_path = Path(ontology_dir) / "cidoc-crm" / "cidoc-crm.owl" |
| if cidoc_path.exists(): |
| self.ont_graph.parse(str(cidoc_path), format="xml") |
| |
| print(f"✅ Ontologia completa caricata nel reasoner ({len(self.ont_graph)} triple).") |
|
|
| if os.path.exists(self.shapes_file): |
| self.shacl_graph = Graph() |
| self.shacl_graph.parse(self.shapes_file, format="turtle") |
| print("🛡️ SHACL Auto-Constraints caricati.") |
| else: |
| print("⚠️ File SHACL non trovato. Validazione disabilitata (pericoloso in prod!).") |
| self.shacl_graph = None |
|
|
| def _get_uri(self, text_val): |
| if ":" in text_val and not text_val.startswith("http"): |
| prefix, name = text_val.split(":", 1) |
| if prefix in self.namespaces: |
| return self.namespaces[prefix][name] |
| |
| clean_name = text_val.replace(" ", "_") |
| clean_name = re.sub(r'[^a-zA-Z0-9_]', '', clean_name) |
| |
| if not clean_name: |
| clean_name = "UnknownEntity" |
| |
| return self.namespaces["ex"][clean_name] |
|
|
| def _json_to_rdf(self, entities, triples): |
| g = Graph() |
| for prefix, ns in self.namespaces.items(): |
| g.bind(prefix, ns) |
| g.bind("skos", SKOS) |
|
|
| if entities: |
| for ent in entities: |
| label = ent["label"] if isinstance(ent, dict) else str(ent) |
| ent_uri = self._get_uri(label) |
| g.add((ent_uri, SKOS.prefLabel, Literal(label, lang="it"))) |
|
|
| if triples: |
| for t in triples: |
| subj_uri = self._get_uri(t.subject) |
| g.add((subj_uri, SKOS.prefLabel, Literal(t.subject, lang="it"))) |
|
|
| if t.predicate.lower() in ["rdf:type", "a", "type", "rdf_type"]: |
| obj_uri = self._get_uri(t.object) |
| g.add((subj_uri, RDF.type, obj_uri)) |
| else: |
| pred_uri = self._get_uri(t.predicate) |
| obj_uri = self._get_uri(t.object) |
| g.add((subj_uri, pred_uri, obj_uri)) |
| g.add((obj_uri, SKOS.prefLabel, Literal(t.object, lang="it"))) |
| return g |
|
|
| def filter_valid_triples(self, entities, triples): |
| """ |
| Esegue la validazione bloccante (OWL RL). |
| Ritorna le triple valide da salvare su Neo4j e quelle invalide da buttare su Mongo. |
| """ |
| if not self.shacl_graph or not triples: |
| return triples, [], "No Validation" |
|
|
| |
| batch_graph = self._json_to_rdf(entities, triples) |
| conforms, report_graph, report_text = validate( |
| batch_graph, |
| shacl_graph=self.shacl_graph, |
| ont_graph=self.ont_graph, |
| inference='owlrl' |
| ) |
|
|
| if conforms: |
| return triples, [], "All valid" |
|
|
| print("⚠️ Rilevate violazioni SHACL nel blocco. Isolamento colpevoli...") |
| |
| |
| valid_triples = [] |
| invalid_triples = [] |
|
|
| for t in triples: |
| single_graph = self._json_to_rdf(entities, [t]) |
| t_conforms, _, t_report = validate( |
| single_graph, |
| shacl_graph=self.shacl_graph, |
| ont_graph=self.ont_graph, |
| inference='owlrl' |
| ) |
| if t_conforms: |
| valid_triples.append(t) |
| else: |
| invalid_triples.append({ |
| "triple": t.model_dump() if hasattr(t, 'model_dump') else t, |
| "violation_report": t_report |
| }) |
|
|
| return valid_triples, invalid_triples, report_text |