| import os |
| import json |
| import time |
| from typing import List, Optional, Dict, Any |
| from pydantic import BaseModel, Field |
| from langchain_core.messages import SystemMessage, HumanMessage |
| from langchain_groq import ChatGroq |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| |
|
|
| |
| class MacroCategoryCandidate(BaseModel): |
| category: str = Field(description="URI della macro-categoria (es. arco:CulturalProperty)") |
| reasoning: str = Field(description="Perché questa macro-categoria è appropriata per l'entità") |
|
|
| class EntityMacroClassification(BaseModel): |
| name: str = Field(description="Nome dell'entità come appare nel testo") |
| candidates: List[MacroCategoryCandidate] = Field( |
| description="1-2 macro-categorie candidate, ordinate per preferenza (la prima è la più probabile)", |
| min_length=1, |
| max_length=2 |
| ) |
|
|
| class MacroClassificationResult(BaseModel): |
| """Output del Livello 1""" |
| entities: List[EntityMacroClassification] |
|
|
| |
| class TypedEntity(BaseModel): |
| name: str = Field(description="Nome dell'entità come appare nel testo") |
| type: str = Field(description="URI del tipo ontologico finale (es. arco:ArchaeologicalProperty)") |
|
|
| class TypeInferenceResult(BaseModel): |
| """Output del Livello 2""" |
| entities: List[TypedEntity] |
|
|
| |
| class GraphTriple(BaseModel): |
| subject: str |
| subject_type: str = Field(description="Tipo ontologico del soggetto (da Pass 1)") |
| predicate: str |
| object: str |
| object_type: str = Field(description="Tipo ontologico dell'oggetto (da Pass 1)") |
| evidence: str = Field(description="Span testuale esatto dal chunk da cui la relazione è estratta") |
| reasoning: str = Field(description="Perché questo predicato è stato scelto per questa coppia di entità") |
| source: Optional[str] = Field(None) |
|
|
| class KnowledgeGraphExtraction(BaseModel): |
| triples: List[GraphTriple] |
|
|
|
|
| class NeuroSymbolicExtractor: |
| def __init__(self, index_path="./ontology/domain_index.json"): |
| print("🧠 Inizializzazione TDDT Extractor (Type-Driven Domain Traversal)...") |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| groq_api_key = os.getenv("GROQ_API_KEY") |
| if not groq_api_key: |
| raise ValueError("❌ GROQ_API_KEY mancante nel file .env.") |
|
|
| |
| self.llm = ChatGroq( |
| model="meta-llama/llama-4-scout-17b-16e-instruct", |
| temperature=0, |
| api_key=groq_api_key, |
| max_retries=5 |
| ) |
| |
| |
| self.chain_pass1_l1 = self.llm.with_structured_output(MacroClassificationResult) |
| self.chain_pass1_l2 = self.llm.with_structured_output(TypeInferenceResult) |
| self.chain_pass2 = self.llm.with_structured_output(KnowledgeGraphExtraction) |
|
|
| |
| self.domain_index = {"classes": {}, "properties_by_domain": {}} |
| if os.path.exists(index_path): |
| with open(index_path, 'r', encoding='utf-8') as f: |
| self.domain_index = json.load(f) |
| print(f"✅ Domain Index caricato: {len(self.domain_index['classes'])} classi disponibili.") |
| else: |
| print(f"⚠️ Attenzione: Domain Index non trovato al percorso {index_path}") |
|
|
| self.root_classes = self._extract_root_classes() |
|
|
| def _extract_root_classes(self) -> Dict[str, Any]: |
| """Estrae il primo livello ontologico per la macro-categorizzazione.""" |
| roots = {} |
| for uri, data in self.domain_index["classes"].items(): |
| |
| if not data["parents"] or "owl:Thing" in data["parents"] or "l0:Entity" in data["parents"]: |
| roots[uri] = data |
| return roots |
|
|
| def _get_subclasses(self, parent_uris: List[str]) -> Dict[str, Any]: |
| """Recupera tutte le sottoclassi dirette (e se stesse) dai rami indicati.""" |
| subclasses = {} |
| for uri, data in self.domain_index["classes"].items(): |
| if uri in parent_uris or any(p in parent_uris for p in data["parents"]): |
| subclasses[uri] = data |
| return subclasses |
|
|
| def _execute_with_retry(self, chain, prompt_messages, max_retries=4): |
| """Self-correction loop con Exponential Backoff per Rate Limits.""" |
| base_delay = 5 |
| |
| for attempt in range(max_retries): |
| try: |
| result = chain.invoke(prompt_messages) |
| return result |
| except Exception as e: |
| error_msg = str(e).upper() |
| print(error_msg) |
| if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg: |
| wait_time = base_delay * (2 ** attempt) |
| print(f"⏳ [Rate Limit] Quota superata. Attendo {wait_time}s prima di riprovare (Tentativo {attempt+1}/{max_retries})...") |
| time.sleep(wait_time) |
| else: |
| print(f"⚠️ Errore (Tentativo {attempt+1}/{max_retries}): {e}") |
| |
| if attempt == max_retries - 1: |
| print("❌ Fallimento critico del task LLM.") |
| return None |
| return None |
|
|
| def extract(self, text_chunk: str, source_id: str = "unknown") -> KnowledgeGraphExtraction: |
| print(f"\n🧩 Processing {source_id} (TDDT Mode)...") |
| |
| |
| |
| |
| roots_text = "\n".join([f"- {uri} — \"{data['label']}: {data['description']}\"" for uri, data in self.root_classes.items()]) |
| |
| sys_l1 = f"""Sei un estrattore esperto di entità semantiche per il dominio dei Beni Culturali. Il tuo unico compito è individuare le entità rilevanti nel testo e classificarle. |
| |
| MACRO-CATEGORIE DISPONIBILI: |
| {roots_text} |
| |
| REGOLE DI ESTRAZIONE (TASSATIVE E OBBLIGATORIE): |
| 1. DIVIETO DI ALLUCINAZIONE URI: Usa ESCLUSIVAMENTE gli URI esatti elencati sopra. È severamente vietato usare etichette inventate come "Person", "Location" o "Group". Se devi categorizzare una persona, usa l'URI corrispondente agli Agenti (es. core:Agent o l0:Agent). |
| 2. RUMORE EDITORIALE: IGNORA e non estrarre MAI riferimenti alla struttura del libro o alle immagini. È vietato estrarre entità che contengono o sono composte da: "Capitolo", "Sezione", "Tavola", "Fig.", "Figura", "Pagina", "Pag.". |
| 3. Estrai SOLO veri monumenti storici, luoghi geografici reali, personaggi storici, popoli e concetti architettonici. |
| 4. Puoi assegnare fino a 2 candidati per entità, ordinandoli per confidenza logica. |
| """ |
| res_l1: MacroClassificationResult = self._execute_with_retry( |
| self.chain_pass1_l1, |
| [SystemMessage(content=sys_l1), HumanMessage(content=text_chunk)] |
| ) |
| |
| if not res_l1 or not res_l1.entities: |
| print(" -> Nessuna entità trovata al Livello 1.") |
| return KnowledgeGraphExtraction(triples=[]) |
|
|
| |
| |
| |
| |
| candidate_uris = set() |
| for ent in res_l1.entities: |
| for cand in ent.candidates: |
| candidate_uris.add(cand.category) |
| |
| subclasses = self._get_subclasses(list(candidate_uris)) |
| |
| |
| subs_text_blocks = [] |
| for parent in candidate_uris: |
| subs_text_blocks.append(f"\n[{parent} →]") |
| children = {k: v for k, v in subclasses.items() if parent in v["parents"] or k == parent} |
| for uri, data in children.items(): |
| subs_text_blocks.append(f"- {uri} — \"{data['label']}: {data['description']}\"") |
| |
| subs_text = "\n".join(subs_text_blocks) |
| |
| ent_text = "\n".join([f"- '{e.name}': " + ", ".join([f"{c.category}" for c in e.candidates]) for e in res_l1.entities]) |
| |
| sys_l2 = f"""Per ciascuna entità identificata, scegli il sotto-tipo più specifico tra quelli elencati. |
| Se non c'è un sotto-tipo rilevante per un'entità, conferma la sua macro-categoria. |
| |
| ENTITÀ IDENTIFICATE (con macro-categorie candidate): |
| {ent_text} |
| |
| SOTTO-TIPI DISPONIBILI: |
| {subs_text}""" |
| |
| res_l2: TypeInferenceResult = self._execute_with_retry( |
| self.chain_pass1_l2, |
| [SystemMessage(content=sys_l2), HumanMessage(content=text_chunk)] |
| ) |
|
|
| if not res_l2 or not res_l2.entities: |
| return KnowledgeGraphExtraction(triples=[]) |
|
|
| |
| |
| |
| |
| typed_entities_map = {e.name: e.type.strip() for e in res_l2.entities} |
| |
| |
| valid_properties = [] |
| seen_props = set() |
| for ent_type in typed_entities_map.values(): |
| props = self.domain_index["properties_by_domain"].get(ent_type, []) |
| for p in props: |
| if p["id"] not in seen_props: |
| valid_properties.append(f"- {p['id']}: {p['inherited_from']} → {p['range']} (Label: {p['label']})") |
| seen_props.add(p["id"]) |
| |
| props_text = "\n".join(valid_properties) if valid_properties else "- (Nessuna proprietà specifica trovata. Usa skos:related)" |
| ent_final_text = "\n".join([f"- {name} ({uri_type})" for name, uri_type in typed_entities_map.items()]) |
|
|
| sys_ext = f"""Estrai le relazioni semantiche tra le entità presenti nel testo. |
| |
| ENTITÀ IDENTIFICATE (con il loro tipo): |
| {ent_final_text} |
| |
| PROPRIETÀ CONSENTITE (con vincoli domain → range): |
| {props_text} |
| - skos:related: Qualsiasi → Qualsiasi (Usa SOLO se nessuna proprietà sopra descrive accuratamente il legame) |
| |
| REGOLE CRITICHE E OBBLIGATORIE: |
| 1. Usa SOLO le proprietà elencate sopra. |
| 2. Usa ESCLUSIVAMENTE le entità presenti nella lista "ENTITÀ IDENTIFICATE". È severamente vietato inventare o aggiungere entità non presenti in questo elenco. |
| 3. I campi 'subject_type' e 'object_type' sono OBBLIGATORI. Devi sempre compilarli copiando esattamente il tipo indicato tra parentesi nella lista delle entità. |
| 4. Rispetta rigorosamente i vincoli ontologici: il tipo del 'subject' DEVE essere compatibile con il domain, e il tipo dell''object' con il range. |
| 5. Compila sempre i campi 'evidence' citando esattamente il testo, e 'reasoning' spiegando la scelta logica. |
| """ |
| |
| final_res: KnowledgeGraphExtraction = self._execute_with_retry( |
| self.chain_pass2, |
| [SystemMessage(content=sys_ext), HumanMessage(content=text_chunk)] |
| ) |
| |
| if final_res and final_res.triples: |
| |
| for t in final_res.triples: |
| t.source = source_id |
| return final_res |
| |
| return KnowledgeGraphExtraction(triples=[]) |