| import os |
| import json |
| from pathlib import Path |
| from collections import defaultdict |
| from rdflib import Graph, URIRef, BNode, RDF, RDFS, OWL |
|
|
| |
| ARCO_NAMESPACES = { |
| "https://w3id.org/arco/ontology/arco/": "arco", |
| "https://w3id.org/arco/ontology/core/": "core", |
| "https://w3id.org/arco/ontology/location/": "a-loc", |
| "https://w3id.org/arco/ontology/context-description/": "a-cd", |
| "https://w3id.org/arco/ontology/denotative-description/": "a-dd", |
| "https://w3id.org/arco/ontology/cultural-event/": "a-ce", |
| "https://w3id.org/arco/ontology/catalogue/": "a-cat", |
| "http://dati.beniculturali.it/cis/": "cis", |
| "https://w3id.org/italia/onto/l0/": "l0", |
| "https://w3id.org/italia/onto/CLV/": "clv", |
| "https://w3id.org/italia/onto/TI/": "ti", |
| "https://w3id.org/italia/onto/RO/": "ro", |
| "https://w3id.org/italia/onto/SM/": "sm", |
| "https://w3id.org/italia/onto/MU/": "mu", |
| "http://www.cidoc-crm.org/cidoc-crm/": "crm", |
| "http://www.w3.org/2002/07/owl#": "owl", |
| "http://www.w3.org/2000/01/rdf-schema#": "rdfs", |
| "http://www.w3.org/1999/02/22-rdf-syntax-ns#": "rdf", |
| "http://www.w3.org/2001/XMLSchema#": "xsd", |
| "http://www.w3.org/2004/02/skos/core#": "skos" |
| } |
|
|
| def uri_to_qname(uri: URIRef) -> str: |
| if not uri or isinstance(uri, BNode): |
| return None |
| uri_str = str(uri) |
| best_match = "" |
| for ns_uri in ARCO_NAMESPACES.keys(): |
| if uri_str.startswith(ns_uri) and len(ns_uri) > len(best_match): |
| best_match = ns_uri |
| if best_match: |
| return f"{ARCO_NAMESPACES[best_match]}:{uri_str[len(best_match):].lstrip('#')}" |
| |
| if '#' in uri_str: return uri_str.split('#')[-1] |
| return uri_str.split('/')[-1] |
|
|
| def get_union_classes(g: Graph, bnode: BNode): |
| union_list = g.value(bnode, OWL.unionOf) |
| classes = [] |
| if union_list: |
| current = union_list |
| while current and current != RDF.nil: |
| item = g.value(current, RDF.first) |
| if isinstance(item, URIRef): |
| classes.append(uri_to_qname(item)) |
| current = g.value(current, RDF.rest) |
| return [c for c in classes if c] |
|
|
| def build_domain_index_and_shacl(ontology_dir: str, output_json: str, output_shacl: str): |
| print(f"⏳ Inizializzazione Graph e caricamento da {ontology_dir}...") |
| g = Graph() |
| |
| owl_files = list(Path(ontology_dir).glob('**/*.owl')) |
| for file_path in owl_files: |
| try: |
| g.parse(file_path, format="xml") |
| print(f" -> Caricato: {file_path.name}") |
| except Exception as e: |
| print(f" ⚠️ Errore parsing {file_path.name}: {e}") |
|
|
| print("✅ Ontologie caricate in memoria. Compilazione indici in corso...") |
|
|
| classes_dict = {} |
| properties_list = [] |
| |
| for s in g.subjects(RDF.type, OWL.Class): |
| if isinstance(s, BNode): continue |
| |
| qname = uri_to_qname(s) |
| label = g.value(s, RDFS.label) |
| comment = g.value(s, RDFS.comment) |
| |
| label_str = str(label) if label else qname |
| for lang_label in g.objects(s, RDFS.label): |
| if lang_label.language == 'it': label_str = str(lang_label) |
| |
| desc_str = str(comment) if comment else "" |
| for lang_comment in g.objects(s, RDFS.comment): |
| if lang_comment.language == 'it': desc_str = str(lang_comment) |
|
|
| parents = [uri_to_qname(p) for p in g.objects(s, RDFS.subClassOf) if isinstance(p, URIRef)] |
| |
| classes_dict[qname] = { |
| "label": label_str, |
| "description": desc_str, |
| "parents": parents, |
| "namespace": qname.split(":")[0] if ":" in qname else "unknown" |
| } |
|
|
| for prop_type in [OWL.ObjectProperty, OWL.DatatypeProperty]: |
| for s in g.subjects(RDF.type, prop_type): |
| if isinstance(s, BNode): continue |
| |
| qname = uri_to_qname(s) |
| label = g.value(s, RDFS.label) |
| label_str = str(label) if label else qname |
| |
| domain_node = g.value(s, RDFS.domain) |
| domains = [] |
| if isinstance(domain_node, URIRef): |
| domains.append(uri_to_qname(domain_node)) |
| elif isinstance(domain_node, BNode): |
| domains.extend(get_union_classes(g, domain_node)) |
| |
| range_node = g.value(s, RDFS.range) |
| ranges = [] |
| if isinstance(range_node, URIRef): |
| ranges.append(uri_to_qname(range_node)) |
| elif isinstance(range_node, BNode): |
| ranges.extend(get_union_classes(g, range_node)) |
|
|
| properties_list.append({ |
| "id": qname, |
| "label": label_str, |
| "domains": domains, |
| "ranges": ranges |
| }) |
|
|
| properties_by_domain = defaultdict(list) |
| for prop in properties_list: |
| for d in prop["domains"]: |
| properties_by_domain[d].append({ |
| "id": prop["id"], |
| "label": prop["label"], |
| "range": prop["ranges"][0] if prop["ranges"] else "Mixed/Union", |
| "inherited_from": d |
| }) |
|
|
| def get_inherited_properties(class_qname, visited=None): |
| if visited is None: visited = set() |
| if class_qname in visited: return [] |
| visited.add(class_qname) |
| |
| props = list(properties_by_domain.get(class_qname, [])) |
| for parent in classes_dict.get(class_qname, {}).get("parents", []): |
| inherited = get_inherited_properties(parent, visited) |
| for p in inherited: |
| if not any(existing["id"] == p["id"] for existing in props): |
| props.append(p) |
| return props |
|
|
| final_properties_by_domain = {} |
| for cls in classes_dict.keys(): |
| all_props = get_inherited_properties(cls) |
| if all_props: |
| final_properties_by_domain[cls] = all_props |
|
|
| class_embeddings_texts = { |
| k: f"{v['label']} - {v['description']}" for k, v in classes_dict.items() if v['description'] |
| } |
|
|
| domain_index = { |
| "classes": classes_dict, |
| "properties_by_domain": final_properties_by_domain, |
| "class_embeddings_texts": class_embeddings_texts |
| } |
| |
| os.makedirs(os.path.dirname(output_json), exist_ok=True) |
| with open(output_json, 'w', encoding='utf-8') as f: |
| json.dump(domain_index, f, ensure_ascii=False, indent=2) |
| print(f"💾 Salvato Indice di Dominio in: {output_json}") |
|
|
| os.makedirs(os.path.dirname(output_shacl), exist_ok=True) |
| with open(output_shacl, 'w', encoding='utf-8') as f: |
| f.write("@prefix sh: <http://www.w3.org/ns/shacl#> .\n") |
| f.write("@prefix ex: <http://activadigital.it/ontology/> .\n") |
| for ns_uri, prefix in ARCO_NAMESPACES.items(): |
| f.write(f"@prefix {prefix}: <{ns_uri}> .\n") |
| f.write("\n") |
| |
| shape_count = 0 |
| for prop in properties_list: |
| safe_id = prop["id"].replace(":", "_").replace("-", "_") |
| |
| if len(prop["domains"]) == 1: |
| dom = prop["domains"][0] |
| if ":" in dom and ":" in prop["id"]: |
| f.write(f"ex:{safe_id}_DomainShape a sh:NodeShape ;\n") |
| f.write(f" sh:targetSubjectsOf {prop['id']} ;\n") |
| f.write(f" sh:class {dom} .\n\n") |
| shape_count += 1 |
| |
| if len(prop["ranges"]) == 1: |
| rng = prop["ranges"][0] |
| if ":" in rng and ":" in prop["id"]: |
| f.write(f"ex:{safe_id}_RangeShape a sh:NodeShape ;\n") |
| f.write(f" sh:targetObjectsOf {prop['id']} ;\n") |
| if rng.startswith("xsd:") or rng == "rdfs:Literal": |
| f.write(f" sh:datatype {rng} .\n\n") |
| else: |
| f.write(f" sh:class {rng} .\n\n") |
| shape_count += 1 |
|
|
| print(f"🛡️ Generato SHACL auto_constraints.ttl con {shape_count} regole rigorose in: {output_shacl}") |
|
|
| if __name__ == "__main__": |
| ONTOLOGY_FOLDER = "./ontology/" |
| OUTPUT_JSON = "./ontology/domain_index.json" |
| OUTPUT_SHACL = "./ontology/shapes/auto_constraints.ttl" |
| |
| build_domain_index_and_shacl(ONTOLOGY_FOLDER, OUTPUT_JSON, OUTPUT_SHACL) |