GaetanoParente's picture
rimossi import inutili e blindato utilizzo utente
9cbbfac
import os
import json
import time
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_groq import ChatGroq
from dotenv import load_dotenv
load_dotenv()
# --- MODELLI PYDANTIC (Contratti Formali) ---
# PASS 1 - Livello 1
class MacroCategoryCandidate(BaseModel):
category: str = Field(description="URI della macro-categoria (es. arco:CulturalProperty)")
reasoning: str = Field(description="Perché questa macro-categoria è appropriata per l'entità")
class EntityMacroClassification(BaseModel):
name: str = Field(description="Nome dell'entità come appare nel testo")
candidates: List[MacroCategoryCandidate] = Field(
description="1-2 macro-categorie candidate, ordinate per preferenza (la prima è la più probabile)",
min_length=1,
max_length=2
)
class MacroClassificationResult(BaseModel):
"""Output del Livello 1"""
entities: List[EntityMacroClassification]
# PASS 1 - Livello 2
class TypedEntity(BaseModel):
name: str = Field(description="Nome dell'entità come appare nel testo")
type: str = Field(description="URI del tipo ontologico finale (es. arco:ArchaeologicalProperty)")
class TypeInferenceResult(BaseModel):
"""Output del Livello 2"""
entities: List[TypedEntity]
# PASS 2 - Extraction
class GraphTriple(BaseModel):
subject: str
subject_type: str = Field(description="Tipo ontologico del soggetto (da Pass 1)")
predicate: str
object: str
object_type: str = Field(description="Tipo ontologico dell'oggetto (da Pass 1)")
evidence: str = Field(description="Span testuale esatto dal chunk da cui la relazione è estratta")
reasoning: str = Field(description="Perché questo predicato è stato scelto per questa coppia di entità")
source: Optional[str] = Field(None) # Mantenuto per compatibilità con il batching Neo4j a valle
class KnowledgeGraphExtraction(BaseModel):
triples: List[GraphTriple]
class NeuroSymbolicExtractor:
def __init__(self, index_path="./ontology/domain_index.json"):
print("🧠 Inizializzazione TDDT Extractor (Type-Driven Domain Traversal)...")
# google_api_key = os.getenv("GOOGLE_API_KEY")
# if not google_api_key:
# raise ValueError("❌ GOOGLE_API_KEY mancante. Richiesta per Gemini 2.0 Flash.")
# # Inizializzo l'LLM primario. Temperatura 0 per massimizzare il determinismo.
# self.llm = ChatGoogleGenerativeAI(
# model="gemini-2.0-flash",
# temperature=0,
# api_key=google_api_key
# )
groq_api_key = os.getenv("GROQ_API_KEY")
if not groq_api_key:
raise ValueError("❌ GROQ_API_KEY mancante nel file .env.")
# Inizializzo l'LLM primario su Groq.
self.llm = ChatGroq(
model="meta-llama/llama-4-scout-17b-16e-instruct",
temperature=0,
api_key=groq_api_key,
max_retries=5 # Aumentiamo i retry interni di LangChain
)
# Inizializzo le chain con structured output
self.chain_pass1_l1 = self.llm.with_structured_output(MacroClassificationResult)
self.chain_pass1_l2 = self.llm.with_structured_output(TypeInferenceResult)
self.chain_pass2 = self.llm.with_structured_output(KnowledgeGraphExtraction)
# Caricamento del Domain Index in RAM
self.domain_index = {"classes": {}, "properties_by_domain": {}}
if os.path.exists(index_path):
with open(index_path, 'r', encoding='utf-8') as f:
self.domain_index = json.load(f)
print(f"✅ Domain Index caricato: {len(self.domain_index['classes'])} classi disponibili.")
else:
print(f"⚠️ Attenzione: Domain Index non trovato al percorso {index_path}")
self.root_classes = self._extract_root_classes()
def _extract_root_classes(self) -> Dict[str, Any]:
"""Estrae il primo livello ontologico per la macro-categorizzazione."""
roots = {}
for uri, data in self.domain_index["classes"].items():
# Consideriamo root le classi senza padri o figlie dirette di owl:Thing / l0:Entity
if not data["parents"] or "owl:Thing" in data["parents"] or "l0:Entity" in data["parents"]:
roots[uri] = data
return roots
def _get_subclasses(self, parent_uris: List[str]) -> Dict[str, Any]:
"""Recupera tutte le sottoclassi dirette (e se stesse) dai rami indicati."""
subclasses = {}
for uri, data in self.domain_index["classes"].items():
if uri in parent_uris or any(p in parent_uris for p in data["parents"]):
subclasses[uri] = data
return subclasses
def _execute_with_retry(self, chain, prompt_messages, max_retries=4):
"""Self-correction loop con Exponential Backoff per Rate Limits."""
base_delay = 5
for attempt in range(max_retries):
try:
result = chain.invoke(prompt_messages)
return result
except Exception as e:
error_msg = str(e).upper()
print(error_msg)
if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg:
wait_time = base_delay * (2 ** attempt)
print(f"⏳ [Rate Limit] Quota superata. Attendo {wait_time}s prima di riprovare (Tentativo {attempt+1}/{max_retries})...")
time.sleep(wait_time)
else:
print(f"⚠️ Errore (Tentativo {attempt+1}/{max_retries}): {e}")
if attempt == max_retries - 1:
print("❌ Fallimento critico del task LLM.")
return None
return None
def extract(self, text_chunk: str, source_id: str = "unknown") -> KnowledgeGraphExtraction:
print(f"\n🧩 Processing {source_id} (TDDT Mode)...")
# ==========================================
# PASS 1 - LIVELLO 1: Macro-Categorizzazione
# ==========================================
roots_text = "\n".join([f"- {uri} — \"{data['label']}: {data['description']}\"" for uri, data in self.root_classes.items()])
sys_l1 = f"""Sei un estrattore esperto di entità semantiche per il dominio dei Beni Culturali. Il tuo unico compito è individuare le entità rilevanti nel testo e classificarle.
MACRO-CATEGORIE DISPONIBILI:
{roots_text}
REGOLE DI ESTRAZIONE (TASSATIVE E OBBLIGATORIE):
1. DIVIETO DI ALLUCINAZIONE URI: Usa ESCLUSIVAMENTE gli URI esatti elencati sopra. È severamente vietato usare etichette inventate come "Person", "Location" o "Group". Se devi categorizzare una persona, usa l'URI corrispondente agli Agenti (es. core:Agent o l0:Agent).
2. RUMORE EDITORIALE: IGNORA e non estrarre MAI riferimenti alla struttura del libro o alle immagini. È vietato estrarre entità che contengono o sono composte da: "Capitolo", "Sezione", "Tavola", "Fig.", "Figura", "Pagina", "Pag.".
3. Estrai SOLO veri monumenti storici, luoghi geografici reali, personaggi storici, popoli e concetti architettonici.
4. Puoi assegnare fino a 2 candidati per entità, ordinandoli per confidenza logica.
"""
res_l1: MacroClassificationResult = self._execute_with_retry(
self.chain_pass1_l1,
[SystemMessage(content=sys_l1), HumanMessage(content=text_chunk)]
)
if not res_l1 or not res_l1.entities:
print(" -> Nessuna entità trovata al Livello 1.")
return KnowledgeGraphExtraction(triples=[])
# ==========================================
# PASS 1 - LIVELLO 2: Specializzazione
# ==========================================
# Raccogliamo tutti i rami candidati da esplorare
candidate_uris = set()
for ent in res_l1.entities:
for cand in ent.candidates:
candidate_uris.add(cand.category)
subclasses = self._get_subclasses(list(candidate_uris))
# Raggruppo le sottoclassi per visualizzarle ordinate nel prompt
subs_text_blocks = []
for parent in candidate_uris:
subs_text_blocks.append(f"\n[{parent} →]")
children = {k: v for k, v in subclasses.items() if parent in v["parents"] or k == parent}
for uri, data in children.items():
subs_text_blocks.append(f"- {uri} — \"{data['label']}: {data['description']}\"")
subs_text = "\n".join(subs_text_blocks)
ent_text = "\n".join([f"- '{e.name}': " + ", ".join([f"{c.category}" for c in e.candidates]) for e in res_l1.entities])
sys_l2 = f"""Per ciascuna entità identificata, scegli il sotto-tipo più specifico tra quelli elencati.
Se non c'è un sotto-tipo rilevante per un'entità, conferma la sua macro-categoria.
ENTITÀ IDENTIFICATE (con macro-categorie candidate):
{ent_text}
SOTTO-TIPI DISPONIBILI:
{subs_text}"""
res_l2: TypeInferenceResult = self._execute_with_retry(
self.chain_pass1_l2,
[SystemMessage(content=sys_l2), HumanMessage(content=text_chunk)]
)
if not res_l2 or not res_l2.entities:
return KnowledgeGraphExtraction(triples=[])
# ==========================================
# PASS 2: Estrazione Relazionale
# ==========================================
# Mappa dei tipi finali
typed_entities_map = {e.name: e.type.strip() for e in res_l2.entities}
# Recupero deterministico delle proprietà
valid_properties = []
seen_props = set()
for ent_type in typed_entities_map.values():
props = self.domain_index["properties_by_domain"].get(ent_type, [])
for p in props:
if p["id"] not in seen_props:
valid_properties.append(f"- {p['id']}: {p['inherited_from']}{p['range']} (Label: {p['label']})")
seen_props.add(p["id"])
props_text = "\n".join(valid_properties) if valid_properties else "- (Nessuna proprietà specifica trovata. Usa skos:related)"
ent_final_text = "\n".join([f"- {name} ({uri_type})" for name, uri_type in typed_entities_map.items()])
sys_ext = f"""Estrai le relazioni semantiche tra le entità presenti nel testo.
ENTITÀ IDENTIFICATE (con il loro tipo):
{ent_final_text}
PROPRIETÀ CONSENTITE (con vincoli domain → range):
{props_text}
- skos:related: Qualsiasi → Qualsiasi (Usa SOLO se nessuna proprietà sopra descrive accuratamente il legame)
REGOLE CRITICHE E OBBLIGATORIE:
1. Usa SOLO le proprietà elencate sopra.
2. Usa ESCLUSIVAMENTE le entità presenti nella lista "ENTITÀ IDENTIFICATE". È severamente vietato inventare o aggiungere entità non presenti in questo elenco.
3. I campi 'subject_type' e 'object_type' sono OBBLIGATORI. Devi sempre compilarli copiando esattamente il tipo indicato tra parentesi nella lista delle entità.
4. Rispetta rigorosamente i vincoli ontologici: il tipo del 'subject' DEVE essere compatibile con il domain, e il tipo dell''object' con il range.
5. Compila sempre i campi 'evidence' citando esattamente il testo, e 'reasoning' spiegando la scelta logica.
"""
final_res: KnowledgeGraphExtraction = self._execute_with_retry(
self.chain_pass2,
[SystemMessage(content=sys_ext), HumanMessage(content=text_chunk)]
)
if final_res and final_res.triples:
# Propago il source_id prima di inviare l'output
for t in final_res.triples:
t.source = source_id
return final_res
return KnowledgeGraphExtraction(triples=[])