AutomatedSemanticDiscovery / src /utils /build_schema.py
GaetanoParente's picture
rimossi import inutili e blindato utilizzo utente
9cbbfac
import os
import json
from pathlib import Path
from collections import defaultdict
from rdflib import Graph, URIRef, BNode, RDF, RDFS, OWL
# --- MAPPA DEI NAMESPACE--
ARCO_NAMESPACES = {
"https://w3id.org/arco/ontology/arco/": "arco",
"https://w3id.org/arco/ontology/core/": "core",
"https://w3id.org/arco/ontology/location/": "a-loc",
"https://w3id.org/arco/ontology/context-description/": "a-cd",
"https://w3id.org/arco/ontology/denotative-description/": "a-dd",
"https://w3id.org/arco/ontology/cultural-event/": "a-ce",
"https://w3id.org/arco/ontology/catalogue/": "a-cat",
"http://dati.beniculturali.it/cis/": "cis",
"https://w3id.org/italia/onto/l0/": "l0",
"https://w3id.org/italia/onto/CLV/": "clv",
"https://w3id.org/italia/onto/TI/": "ti",
"https://w3id.org/italia/onto/RO/": "ro",
"https://w3id.org/italia/onto/SM/": "sm",
"https://w3id.org/italia/onto/MU/": "mu",
"http://www.cidoc-crm.org/cidoc-crm/": "crm",
"http://www.w3.org/2002/07/owl#": "owl",
"http://www.w3.org/2000/01/rdf-schema#": "rdfs",
"http://www.w3.org/1999/02/22-rdf-syntax-ns#": "rdf",
"http://www.w3.org/2001/XMLSchema#": "xsd",
"http://www.w3.org/2004/02/skos/core#": "skos"
}
def uri_to_qname(uri: URIRef) -> str:
if not uri or isinstance(uri, BNode):
return None
uri_str = str(uri)
best_match = ""
for ns_uri in ARCO_NAMESPACES.keys():
if uri_str.startswith(ns_uri) and len(ns_uri) > len(best_match):
best_match = ns_uri
if best_match:
return f"{ARCO_NAMESPACES[best_match]}:{uri_str[len(best_match):].lstrip('#')}"
if '#' in uri_str: return uri_str.split('#')[-1]
return uri_str.split('/')[-1]
def get_union_classes(g: Graph, bnode: BNode):
union_list = g.value(bnode, OWL.unionOf)
classes = []
if union_list:
current = union_list
while current and current != RDF.nil:
item = g.value(current, RDF.first)
if isinstance(item, URIRef):
classes.append(uri_to_qname(item))
current = g.value(current, RDF.rest)
return [c for c in classes if c]
def build_domain_index_and_shacl(ontology_dir: str, output_json: str, output_shacl: str):
print(f"⏳ Inizializzazione Graph e caricamento da {ontology_dir}...")
g = Graph()
owl_files = list(Path(ontology_dir).glob('**/*.owl'))
for file_path in owl_files:
try:
g.parse(file_path, format="xml")
print(f" -> Caricato: {file_path.name}")
except Exception as e:
print(f" ⚠️ Errore parsing {file_path.name}: {e}")
print("✅ Ontologie caricate in memoria. Compilazione indici in corso...")
classes_dict = {}
properties_list = []
for s in g.subjects(RDF.type, OWL.Class):
if isinstance(s, BNode): continue
qname = uri_to_qname(s)
label = g.value(s, RDFS.label)
comment = g.value(s, RDFS.comment)
label_str = str(label) if label else qname
for lang_label in g.objects(s, RDFS.label):
if lang_label.language == 'it': label_str = str(lang_label)
desc_str = str(comment) if comment else ""
for lang_comment in g.objects(s, RDFS.comment):
if lang_comment.language == 'it': desc_str = str(lang_comment)
parents = [uri_to_qname(p) for p in g.objects(s, RDFS.subClassOf) if isinstance(p, URIRef)]
classes_dict[qname] = {
"label": label_str,
"description": desc_str,
"parents": parents,
"namespace": qname.split(":")[0] if ":" in qname else "unknown"
}
for prop_type in [OWL.ObjectProperty, OWL.DatatypeProperty]:
for s in g.subjects(RDF.type, prop_type):
if isinstance(s, BNode): continue
qname = uri_to_qname(s)
label = g.value(s, RDFS.label)
label_str = str(label) if label else qname
domain_node = g.value(s, RDFS.domain)
domains = []
if isinstance(domain_node, URIRef):
domains.append(uri_to_qname(domain_node))
elif isinstance(domain_node, BNode):
domains.extend(get_union_classes(g, domain_node))
range_node = g.value(s, RDFS.range)
ranges = []
if isinstance(range_node, URIRef):
ranges.append(uri_to_qname(range_node))
elif isinstance(range_node, BNode):
ranges.extend(get_union_classes(g, range_node))
properties_list.append({
"id": qname,
"label": label_str,
"domains": domains,
"ranges": ranges
})
properties_by_domain = defaultdict(list)
for prop in properties_list:
for d in prop["domains"]:
properties_by_domain[d].append({
"id": prop["id"],
"label": prop["label"],
"range": prop["ranges"][0] if prop["ranges"] else "Mixed/Union",
"inherited_from": d
})
def get_inherited_properties(class_qname, visited=None):
if visited is None: visited = set()
if class_qname in visited: return []
visited.add(class_qname)
props = list(properties_by_domain.get(class_qname, []))
for parent in classes_dict.get(class_qname, {}).get("parents", []):
inherited = get_inherited_properties(parent, visited)
for p in inherited:
if not any(existing["id"] == p["id"] for existing in props):
props.append(p)
return props
final_properties_by_domain = {}
for cls in classes_dict.keys():
all_props = get_inherited_properties(cls)
if all_props:
final_properties_by_domain[cls] = all_props
class_embeddings_texts = {
k: f"{v['label']} - {v['description']}" for k, v in classes_dict.items() if v['description']
}
domain_index = {
"classes": classes_dict,
"properties_by_domain": final_properties_by_domain,
"class_embeddings_texts": class_embeddings_texts
}
os.makedirs(os.path.dirname(output_json), exist_ok=True)
with open(output_json, 'w', encoding='utf-8') as f:
json.dump(domain_index, f, ensure_ascii=False, indent=2)
print(f"💾 Salvato Indice di Dominio in: {output_json}")
os.makedirs(os.path.dirname(output_shacl), exist_ok=True)
with open(output_shacl, 'w', encoding='utf-8') as f:
f.write("@prefix sh: <http://www.w3.org/ns/shacl#> .\n")
f.write("@prefix ex: <http://activadigital.it/ontology/> .\n")
for ns_uri, prefix in ARCO_NAMESPACES.items():
f.write(f"@prefix {prefix}: <{ns_uri}> .\n")
f.write("\n")
shape_count = 0
for prop in properties_list:
safe_id = prop["id"].replace(":", "_").replace("-", "_")
if len(prop["domains"]) == 1:
dom = prop["domains"][0]
if ":" in dom and ":" in prop["id"]:
f.write(f"ex:{safe_id}_DomainShape a sh:NodeShape ;\n")
f.write(f" sh:targetSubjectsOf {prop['id']} ;\n")
f.write(f" sh:class {dom} .\n\n")
shape_count += 1
if len(prop["ranges"]) == 1:
rng = prop["ranges"][0]
if ":" in rng and ":" in prop["id"]:
f.write(f"ex:{safe_id}_RangeShape a sh:NodeShape ;\n")
f.write(f" sh:targetObjectsOf {prop['id']} ;\n")
if rng.startswith("xsd:") or rng == "rdfs:Literal":
f.write(f" sh:datatype {rng} .\n\n")
else:
f.write(f" sh:class {rng} .\n\n")
shape_count += 1
print(f"🛡️ Generato SHACL auto_constraints.ttl con {shape_count} regole rigorose in: {output_shacl}")
if __name__ == "__main__":
ONTOLOGY_FOLDER = "./ontology/"
OUTPUT_JSON = "./ontology/domain_index.json"
OUTPUT_SHACL = "./ontology/shapes/auto_constraints.ttl"
build_domain_index_and_shacl(ONTOLOGY_FOLDER, OUTPUT_JSON, OUTPUT_SHACL)