File size: 4,520 Bytes
9186179 00342ac 9186179 00342ac 9186179 00342ac 9186179 00342ac 9186179 00342ac 9186179 00342ac 9186179 00342ac | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | import datetime
from models import HealingPolicy, HealingAction, EventSeverity
from typing import Dict, List
# Default healing policies
DEFAULT_HEALING_POLICIES = [
HealingPolicy(
name="high_latency_restart",
conditions={
"latency_p99": {"operator": ">", "value": 300},
"error_rate": {"operator": "<", "value": 0.1},
},
actions=[HealingAction.RESTART_CONTAINER],
priority=2
),
HealingPolicy(
name="cascading_failure",
conditions={
"error_rate": {"operator": ">", "value": 0.15},
},
actions=[HealingAction.CIRCUIT_BREAKER, HealingAction.ALERT_TEAM],
priority=1
),
HealingPolicy(
name="resource_exhaustion",
conditions={
"cpu_util": {"operator": ">", "value": 0.85},
"memory_util": {"operator": ">", "value": 0.85}
},
actions=[HealingAction.SCALE_OUT, HealingAction.ALERT_TEAM],
priority=1
),
HealingPolicy(
name="moderate_performance_issue",
conditions={
"latency_p99": {"operator": ">", "value": 200},
"error_rate": {"operator": ">", "value": 0.05}
},
actions=[HealingAction.TRAFFIC_SHIFT],
priority=3
),
HealingPolicy(
name="critical_failure",
conditions={
"latency_p99": {"operator": ">", "value": 500},
"error_rate": {"operator": ">", "value": 0.1}
},
actions=[HealingAction.RESTART_CONTAINER, HealingAction.ALERT_TEAM, HealingAction.TRAFFIC_SHIFT],
priority=1
)
]
class PolicyEngine:
def __init__(self, policies: List[HealingPolicy] = None):
self.policies = policies or DEFAULT_HEALING_POLICIES
self.last_execution: Dict[str, float] = {}
def evaluate_policies(self, event) -> List[HealingAction]:
"""Evaluate all policies against the event and return matching actions"""
applicable_actions = []
for policy in self.policies:
if not policy.enabled:
continue
# Check cooldown
policy_key = f"{policy.name}_{event.component}"
current_time = datetime.datetime.now().timestamp()
last_exec = self.last_execution.get(policy_key, 0)
if current_time - last_exec < policy.cool_down_seconds:
continue
if self._evaluate_conditions(policy.conditions, event):
applicable_actions.extend(policy.actions)
self.last_execution[policy_key] = current_time
# Remove duplicates while preserving order
seen = set()
unique_actions = []
for action in applicable_actions:
if action not in seen:
seen.add(action)
unique_actions.append(action)
return unique_actions or [HealingAction.NO_ACTION]
def _evaluate_conditions(self, conditions: Dict, event) -> bool:
"""Evaluate individual conditions against event data"""
for field, condition in conditions.items():
operator = condition["operator"]
value = condition["value"]
# Get event field value
event_value = getattr(event, field, None)
if not self._compare_values(event_value, operator, value):
return False
return True
def _compare_values(self, event_value, operator: str, condition_value) -> bool:
"""Compare values based on operator"""
try:
if operator == ">":
return event_value > condition_value
elif operator == "<":
return event_value < condition_value
elif operator == ">=":
return event_value >= condition_value
elif operator == "<=":
return event_value <= condition_value
elif operator == "==":
return event_value == condition_value
elif operator == "in":
return event_value in condition_value
elif operator == "not_empty":
if isinstance(event_value, list):
return len(event_value) > 0 == condition_value
return bool(event_value) == condition_value
else:
return False
except (TypeError, ValueError):
return False |