| from agentic_reliability_framework.core.governance.risk_engine import RiskEngine |
| from agentic_reliability_framework.core.governance.intents import InfrastructureIntent |
| from typing import Optional, List, Dict, Any |
|
|
| from agentic_reliability_framework.core.models.event import ReliabilityEvent, HealingAction |
| from agentic_reliability_framework.core.governance.policy_engine import PolicyEngine |
| from agentic_reliability_framework.core.decision.decision_engine import DecisionEngine |
| from agentic_reliability_framework.runtime.memory.rag_graph import RAGGraphMemory |
|
|
| |
| from agentic_reliability_framework.core.research.eclipse_probe import compute_epistemic_risk |
|
|
|
|
| def evaluate_intent( |
| engine: RiskEngine, |
| intent: InfrastructureIntent, |
| cost_estimate: Optional[float], |
| policy_violations: List[str] |
| ) -> dict: |
| """ |
| Evaluate an infrastructure intent using the Bayesian risk engine. |
| Returns a dictionary with risk score, explanation, and contributions. |
| """ |
| score, explanation, contributions = engine.calculate_risk( |
| intent=intent, |
| cost_estimate=cost_estimate, |
| policy_violations=policy_violations |
| ) |
| return { |
| "risk_score": score, |
| "explanation": explanation, |
| "contributions": contributions |
| } |
|
|
|
|
| def evaluate_healing_decision( |
| event: ReliabilityEvent, |
| policy_engine: PolicyEngine, |
| decision_engine: Optional[DecisionEngine] = None, |
| rag_graph: Optional[RAGGraphMemory] = None, |
| model=None, |
| tokenizer=None, |
| ) -> Dict[str, Any]: |
| """ |
| Evaluate healing actions for a given reliability event using decision‑theoretic selection. |
| Now includes epistemic risk signals from the eclipse probe. |
| |
| Returns: |
| Dictionary with keys: risk_score, selected_action, expected_utility, alternatives, |
| explanation, epistemic_signals (new). |
| """ |
| |
| if decision_engine is None and hasattr(policy_engine, 'decision_engine'): |
| decision_engine = policy_engine.decision_engine |
|
|
| |
| if decision_engine is None: |
| decision_engine = DecisionEngine(rag_graph=rag_graph) |
|
|
| |
| orig_use = policy_engine.use_decision_engine |
| try: |
| policy_engine.use_decision_engine = False |
| raw_actions = policy_engine.evaluate_policies(event) |
| finally: |
| policy_engine.use_decision_engine = orig_use |
|
|
| |
| if not raw_actions or raw_actions == [HealingAction.NO_ACTION]: |
| return { |
| "risk_score": 0.0, |
| "selected_action": HealingAction.NO_ACTION.value, |
| "expected_utility": 0.0, |
| "alternatives": [], |
| "explanation": "No candidate actions triggered.", |
| "epistemic_signals": None, |
| } |
|
|
| |
| |
| reasoning_parts = [] |
| for policy in policy_engine.policies: |
| |
| if any(a in policy.actions for a in raw_actions): |
| conditions_str = ", ".join( |
| f"{c.metric} {c.operator} {c.threshold}" for c in policy.conditions |
| ) |
| reasoning_parts.append( |
| f"Policy {policy.name} triggered by {conditions_str} → actions {[a.value for a in policy.actions]}" |
| ) |
| reasoning_text = " ".join(reasoning_parts) |
|
|
| |
| evidence_text = ( |
| f"Component: {event.component}, " |
| f"latency_p99: {event.latency_p99}, " |
| f"error_rate: {event.error_rate}, " |
| f"cpu_util: {event.cpu_util}, " |
| f"memory_util: {event.memory_util}" |
| ) |
|
|
| |
| epistemic_signals = None |
| if model is not None and tokenizer is not None: |
| epistemic_signals = compute_epistemic_risk( |
| reasoning_text, evidence_text, model, tokenizer |
| ) |
| else: |
| |
| epistemic_signals = { |
| "entropy": 0.0, |
| "contradiction": 0.0, |
| "evidence_lift": 0.0, |
| "hallucination_risk": 0.0, |
| } |
|
|
| |
| decision = decision_engine.select_optimal_action( |
| raw_actions, event, component=event.component, |
| epistemic_signals=epistemic_signals |
| ) |
|
|
| |
| risk_score = None |
| for alt in decision.alternatives: |
| if alt.action == decision.best_action: |
| risk_score = alt.risk |
| break |
| if risk_score is None: |
| |
| risk_score = decision_engine.compute_risk(decision.best_action, event, event.component) |
|
|
| |
| alt_list = [] |
| for alt in decision.alternatives[:3]: |
| alt_list.append({ |
| "action": alt.action.value, |
| "expected_utility": alt.utility, |
| "risk": alt.risk, |
| }) |
|
|
| |
| response = { |
| "risk_score": risk_score, |
| "selected_action": decision.best_action.value, |
| "expected_utility": decision.expected_utility, |
| "alternatives": alt_list, |
| "explanation": decision.explanation, |
| "raw_decision": decision.raw_data, |
| "epistemic_signals": epistemic_signals, |
| } |
| return response |
|
|
|
|
| def get_system_risk() -> float: |
| |
| import random |
| return round(random.uniform(0, 1), 2) |
|
|