| """ |
| Causal reasoning module for ARF – OSS Edition. |
| Provides counterfactual explanations using deterministic heuristics. |
| No external causal libraries required. |
| """ |
| from typing import Dict, Any, Optional, List, Tuple |
| from dataclasses import dataclass, field |
| import pandas as pd |
|
|
|
|
| @dataclass |
| class CausalExplanation: |
| factual_outcome: float |
| counterfactual_outcome: float |
| effect: float |
| is_model_based: bool |
| confidence_interval: Optional[Tuple[float, float]] = None |
| explanation_text: str = "" |
| warnings: List[str] = field(default_factory=list) |
|
|
|
|
| class CausalExplainer: |
| """ |
| Heuristic causal explainer for healing actions. |
| Uses domain rules and correlation estimates to produce counterfactuals. |
| """ |
|
|
| def __init__(self, memory_store=None): |
| self.memory = memory_store |
| self.treatment = "healing_action" |
| self.outcome = "latency" |
| self._action_impact = { |
| "restart_container": { |
| "latency_effect": -0.15, |
| "error_rate_effect": -0.10}, |
| "scale_out": { |
| "latency_effect": -0.20, |
| "error_rate_effect": -0.05}, |
| "rollback": { |
| "latency_effect": -0.25, |
| "error_rate_effect": -0.20}, |
| "circuit_breaker": { |
| "latency_effect": -0.05, |
| "error_rate_effect": -0.30}, |
| "traffic_shift": { |
| "latency_effect": -0.10, |
| "error_rate_effect": -0.10}, |
| "alert_team": { |
| "latency_effect": 0.0, |
| "error_rate_effect": 0.0}, |
| "no_action": { |
| "latency_effect": 0.0, |
| "error_rate_effect": 0.0}, |
| } |
| self._uncertainty = 0.1 |
|
|
| def _extract_action_intensity(self, action_dict: Dict[str, Any]) -> float: |
| action_type = action_dict.get("action_type", "no_action") |
| if action_type == "no_action": |
| return 0.0 |
| intensity_map = { |
| "restart_container": 0.4, |
| "scale_out": 0.6, |
| "rollback": 0.8, |
| "circuit_breaker": 0.7, |
| "traffic_shift": 0.5, |
| "alert_team": 0.1, |
| } |
| return intensity_map.get(action_type, 0.0) |
|
|
| def _get_effect_for_action( |
| self, action_dict: Dict[str, Any], metric: str) -> float: |
| action_type = action_dict.get("action_type", "no_action") |
| impacts = self._action_impact.get( |
| action_type, self._action_impact["no_action"]) |
| if metric == "latency": |
| return impacts["latency_effect"] |
| elif metric == "error_rate": |
| return impacts["error_rate_effect"] |
| return 0.0 |
|
|
| def counterfactual_explanation( |
| self, |
| observed_context: Dict[str, Any], |
| alternative_action: Dict[str, Any], |
| outcome_name: str = "latency", |
| confidence_level: float = 0.95 |
| ) -> CausalExplanation: |
| factual_outcome = observed_context.get(outcome_name, 0.0) |
| factual_action = observed_context.get("action_taken", {}) |
| factual_intensity = self._extract_action_intensity(factual_action) |
| alt_intensity = self._extract_action_intensity(alternative_action) |
|
|
| effect_frac = self._get_effect_for_action( |
| alternative_action, outcome_name) |
| if alt_intensity == 0.0 and factual_intensity > 0.0: |
| factual_effect = self._get_effect_for_action( |
| factual_action, outcome_name) |
| effect_frac = -factual_effect |
|
|
| counterfactual = factual_outcome * (1.0 + effect_frac) |
| counterfactual = max(0.0, counterfactual) |
| effect = counterfactual - factual_outcome |
| ci_half = abs(effect) * self._uncertainty |
| confidence_interval = ( |
| counterfactual - ci_half, |
| counterfactual + ci_half) |
|
|
| explanation_text = ( |
| f"If we apply { |
| alternative_action.get( |
| 'action_type', |
| 'unknown')} instead of " f"{ |
| factual_action.get( |
| 'action_type', |
| 'no action')}, {outcome_name} would change " f"from { |
| factual_outcome:.2f} to { |
| counterfactual:.2f} (Δ = { |
| effect:.2f}). " f"Based on heuristic causal model.") |
|
|
| return CausalExplanation( |
| factual_outcome=factual_outcome, |
| counterfactual_outcome=counterfactual, |
| effect=effect, |
| is_model_based=False, |
| confidence_interval=confidence_interval, |
| explanation_text=explanation_text, |
| warnings=["Using heuristic causal model (no fitted SCM)."] |
| ) |
|
|
| def explain_healing_intent( |
| self, |
| proposed_action: Dict[str, Any], |
| current_state: Dict[str, Any], |
| outcome_metric: str = "latency" |
| ) -> CausalExplanation: |
| observed = { |
| outcome_metric: current_state.get( |
| outcome_metric, 0.0), "action_taken": current_state.get( |
| "last_action", { |
| "action_type": "no_action"}), **current_state} |
| return self.counterfactual_explanation( |
| observed_context=observed, |
| alternative_action=proposed_action, |
| outcome_name=outcome_metric |
| ) |
|
|
| def discover_graph_from_memory( |
| self, data: pd.DataFrame, method: str = "pc") -> Dict[str, Any]: |
| return {"nodes": list(data.columns), "edges": []} |
|
|
| def fit_scm( |
| self, |
| data: pd.DataFrame, |
| treatment: str, |
| outcome: str, |
| graph: Optional[Dict] = None): |
| self.treatment = treatment |
| self.outcome = outcome |
|
|
| def estimate_effect( |
| self, |
| method_name: str = "backdoor.linear_regression") -> Optional[float]: |
| return None |
|
|