from agentic_reliability_framework.core.governance.risk_engine import RiskEngine from agentic_reliability_framework.core.governance.intents import InfrastructureIntent from typing import Optional, List, Dict, Any from agentic_reliability_framework.core.models.event import ReliabilityEvent, HealingAction from agentic_reliability_framework.core.governance.policy_engine import PolicyEngine from agentic_reliability_framework.core.decision.decision_engine import DecisionEngine from agentic_reliability_framework.runtime.memory.rag_graph import RAGGraphMemory # NEW: Import eclipse probe from agentic_reliability_framework.core.research.eclipse_probe import compute_epistemic_risk def evaluate_intent( engine: RiskEngine, intent: InfrastructureIntent, cost_estimate: Optional[float], policy_violations: List[str] ) -> dict: """ Evaluate an infrastructure intent using the Bayesian risk engine. Returns a dictionary with risk score, explanation, and contributions. """ score, explanation, contributions = engine.calculate_risk( intent=intent, cost_estimate=cost_estimate, policy_violations=policy_violations ) return { "risk_score": score, "explanation": explanation, "contributions": contributions } def evaluate_healing_decision( event: ReliabilityEvent, policy_engine: PolicyEngine, decision_engine: Optional[DecisionEngine] = None, rag_graph: Optional[RAGGraphMemory] = None, model=None, # NEW: optional HuggingFace model tokenizer=None, # NEW: optional tokenizer ) -> Dict[str, Any]: """ Evaluate healing actions for a given reliability event using decision‑theoretic selection. Now includes epistemic risk signals from the eclipse probe. Returns: Dictionary with keys: risk_score, selected_action, expected_utility, alternatives, explanation, epistemic_signals (new). """ # If decision_engine not provided, try to get from policy_engine if decision_engine is None and hasattr(policy_engine, 'decision_engine'): decision_engine = policy_engine.decision_engine # If still None, create a minimal one (global stats only) if decision_engine is None: decision_engine = DecisionEngine(rag_graph=rag_graph) # Get raw candidate actions (by temporarily disabling decision engine) orig_use = policy_engine.use_decision_engine try: policy_engine.use_decision_engine = False raw_actions = policy_engine.evaluate_policies(event) finally: policy_engine.use_decision_engine = orig_use # If no actions, return NO_ACTION if not raw_actions or raw_actions == [HealingAction.NO_ACTION]: return { "risk_score": 0.0, "selected_action": HealingAction.NO_ACTION.value, "expected_utility": 0.0, "alternatives": [], "explanation": "No candidate actions triggered.", "epistemic_signals": None, } # === NEW: Compute epistemic signals from triggered policies === # Build reasoning text from the policies that triggered the actions reasoning_parts = [] for policy in policy_engine.policies: # Check if any of the policy's actions are in raw_actions if any(a in policy.actions for a in raw_actions): conditions_str = ", ".join( f"{c.metric} {c.operator} {c.threshold}" for c in policy.conditions ) reasoning_parts.append( f"Policy {policy.name} triggered by {conditions_str} → actions {[a.value for a in policy.actions]}" ) reasoning_text = " ".join(reasoning_parts) # Build evidence text from the event evidence_text = ( f"Component: {event.component}, " f"latency_p99: {event.latency_p99}, " f"error_rate: {event.error_rate}, " f"cpu_util: {event.cpu_util}, " f"memory_util: {event.memory_util}" ) # Compute epistemic signals (if model/tokenizer provided) epistemic_signals = None if model is not None and tokenizer is not None: epistemic_signals = compute_epistemic_risk( reasoning_text, evidence_text, model, tokenizer ) else: # In OSS, we may not have model; use zeros as fallback epistemic_signals = { "entropy": 0.0, "contradiction": 0.0, "evidence_lift": 0.0, "hallucination_risk": 0.0, } # Run decision engine to get best action and alternatives, passing epistemic signals decision = decision_engine.select_optimal_action( raw_actions, event, component=event.component, epistemic_signals=epistemic_signals ) # Risk of the selected action risk_score = None for alt in decision.alternatives: if alt.action == decision.best_action: risk_score = alt.risk break if risk_score is None: # Compute risk separately risk_score = decision_engine.compute_risk(decision.best_action, event, event.component) # Format alternatives (top 3 only) alt_list = [] for alt in decision.alternatives[:3]: alt_list.append({ "action": alt.action.value, "expected_utility": alt.utility, "risk": alt.risk, }) # Build final response response = { "risk_score": risk_score, "selected_action": decision.best_action.value, "expected_utility": decision.expected_utility, "alternatives": alt_list, "explanation": decision.explanation, "raw_decision": decision.raw_data, "epistemic_signals": epistemic_signals, # NEW } return response def get_system_risk() -> float: # Placeholder – this endpoint is being deprecated; we keep it for backward compatibility. import random return round(random.uniform(0, 1), 2)