import os import json from pathlib import Path from collections import defaultdict from rdflib import Graph, URIRef, BNode, RDF, RDFS, OWL # --- MAPPA DEI NAMESPACE-- ARCO_NAMESPACES = { "https://w3id.org/arco/ontology/arco/": "arco", "https://w3id.org/arco/ontology/core/": "core", "https://w3id.org/arco/ontology/location/": "a-loc", "https://w3id.org/arco/ontology/context-description/": "a-cd", "https://w3id.org/arco/ontology/denotative-description/": "a-dd", "https://w3id.org/arco/ontology/cultural-event/": "a-ce", "https://w3id.org/arco/ontology/catalogue/": "a-cat", "http://dati.beniculturali.it/cis/": "cis", "https://w3id.org/italia/onto/l0/": "l0", "https://w3id.org/italia/onto/CLV/": "clv", "https://w3id.org/italia/onto/TI/": "ti", "https://w3id.org/italia/onto/RO/": "ro", "https://w3id.org/italia/onto/SM/": "sm", "https://w3id.org/italia/onto/MU/": "mu", "http://www.cidoc-crm.org/cidoc-crm/": "crm", "http://www.w3.org/2002/07/owl#": "owl", "http://www.w3.org/2000/01/rdf-schema#": "rdfs", "http://www.w3.org/1999/02/22-rdf-syntax-ns#": "rdf", "http://www.w3.org/2001/XMLSchema#": "xsd", "http://www.w3.org/2004/02/skos/core#": "skos" } def uri_to_qname(uri: URIRef) -> str: if not uri or isinstance(uri, BNode): return None uri_str = str(uri) best_match = "" for ns_uri in ARCO_NAMESPACES.keys(): if uri_str.startswith(ns_uri) and len(ns_uri) > len(best_match): best_match = ns_uri if best_match: return f"{ARCO_NAMESPACES[best_match]}:{uri_str[len(best_match):].lstrip('#')}" if '#' in uri_str: return uri_str.split('#')[-1] return uri_str.split('/')[-1] def get_union_classes(g: Graph, bnode: BNode): union_list = g.value(bnode, OWL.unionOf) classes = [] if union_list: current = union_list while current and current != RDF.nil: item = g.value(current, RDF.first) if isinstance(item, URIRef): classes.append(uri_to_qname(item)) current = g.value(current, RDF.rest) return [c for c in classes if c] def build_domain_index_and_shacl(ontology_dir: str, output_json: str, output_shacl: str): print(f"⏳ Inizializzazione Graph e caricamento da {ontology_dir}...") g = Graph() owl_files = list(Path(ontology_dir).glob('**/*.owl')) for file_path in owl_files: try: g.parse(file_path, format="xml") print(f" -> Caricato: {file_path.name}") except Exception as e: print(f" ⚠️ Errore parsing {file_path.name}: {e}") print("✅ Ontologie caricate in memoria. Compilazione indici in corso...") classes_dict = {} properties_list = [] for s in g.subjects(RDF.type, OWL.Class): if isinstance(s, BNode): continue qname = uri_to_qname(s) label = g.value(s, RDFS.label) comment = g.value(s, RDFS.comment) label_str = str(label) if label else qname for lang_label in g.objects(s, RDFS.label): if lang_label.language == 'it': label_str = str(lang_label) desc_str = str(comment) if comment else "" for lang_comment in g.objects(s, RDFS.comment): if lang_comment.language == 'it': desc_str = str(lang_comment) parents = [uri_to_qname(p) for p in g.objects(s, RDFS.subClassOf) if isinstance(p, URIRef)] classes_dict[qname] = { "label": label_str, "description": desc_str, "parents": parents, "namespace": qname.split(":")[0] if ":" in qname else "unknown" } for prop_type in [OWL.ObjectProperty, OWL.DatatypeProperty]: for s in g.subjects(RDF.type, prop_type): if isinstance(s, BNode): continue qname = uri_to_qname(s) label = g.value(s, RDFS.label) label_str = str(label) if label else qname domain_node = g.value(s, RDFS.domain) domains = [] if isinstance(domain_node, URIRef): domains.append(uri_to_qname(domain_node)) elif isinstance(domain_node, BNode): domains.extend(get_union_classes(g, domain_node)) range_node = g.value(s, RDFS.range) ranges = [] if isinstance(range_node, URIRef): ranges.append(uri_to_qname(range_node)) elif isinstance(range_node, BNode): ranges.extend(get_union_classes(g, range_node)) properties_list.append({ "id": qname, "label": label_str, "domains": domains, "ranges": ranges }) properties_by_domain = defaultdict(list) for prop in properties_list: for d in prop["domains"]: properties_by_domain[d].append({ "id": prop["id"], "label": prop["label"], "range": prop["ranges"][0] if prop["ranges"] else "Mixed/Union", "inherited_from": d }) def get_inherited_properties(class_qname, visited=None): if visited is None: visited = set() if class_qname in visited: return [] visited.add(class_qname) props = list(properties_by_domain.get(class_qname, [])) for parent in classes_dict.get(class_qname, {}).get("parents", []): inherited = get_inherited_properties(parent, visited) for p in inherited: if not any(existing["id"] == p["id"] for existing in props): props.append(p) return props final_properties_by_domain = {} for cls in classes_dict.keys(): all_props = get_inherited_properties(cls) if all_props: final_properties_by_domain[cls] = all_props class_embeddings_texts = { k: f"{v['label']} - {v['description']}" for k, v in classes_dict.items() if v['description'] } domain_index = { "classes": classes_dict, "properties_by_domain": final_properties_by_domain, "class_embeddings_texts": class_embeddings_texts } os.makedirs(os.path.dirname(output_json), exist_ok=True) with open(output_json, 'w', encoding='utf-8') as f: json.dump(domain_index, f, ensure_ascii=False, indent=2) print(f"💾 Salvato Indice di Dominio in: {output_json}") os.makedirs(os.path.dirname(output_shacl), exist_ok=True) with open(output_shacl, 'w', encoding='utf-8') as f: f.write("@prefix sh: .\n") f.write("@prefix ex: .\n") for ns_uri, prefix in ARCO_NAMESPACES.items(): f.write(f"@prefix {prefix}: <{ns_uri}> .\n") f.write("\n") shape_count = 0 for prop in properties_list: safe_id = prop["id"].replace(":", "_").replace("-", "_") if len(prop["domains"]) == 1: dom = prop["domains"][0] if ":" in dom and ":" in prop["id"]: f.write(f"ex:{safe_id}_DomainShape a sh:NodeShape ;\n") f.write(f" sh:targetSubjectsOf {prop['id']} ;\n") f.write(f" sh:class {dom} .\n\n") shape_count += 1 if len(prop["ranges"]) == 1: rng = prop["ranges"][0] if ":" in rng and ":" in prop["id"]: f.write(f"ex:{safe_id}_RangeShape a sh:NodeShape ;\n") f.write(f" sh:targetObjectsOf {prop['id']} ;\n") if rng.startswith("xsd:") or rng == "rdfs:Literal": f.write(f" sh:datatype {rng} .\n\n") else: f.write(f" sh:class {rng} .\n\n") shape_count += 1 print(f"🛡️ Generato SHACL auto_constraints.ttl con {shape_count} regole rigorose in: {output_shacl}") if __name__ == "__main__": ONTOLOGY_FOLDER = "./ontology/" OUTPUT_JSON = "./ontology/domain_index.json" OUTPUT_SHACL = "./ontology/shapes/auto_constraints.ttl" build_domain_index_and_shacl(ONTOLOGY_FOLDER, OUTPUT_JSON, OUTPUT_SHACL)