Agentic-Reliability-Framework-API / app /api /routes_incidents.py
petter2025's picture
Add FastAPI app
2d521fd verified
from app.causal_explainer import CausalExplainer
from fastapi import APIRouter, Depends, Request, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Optional
from enum import Enum
import time
import json
# ===== USAGE TRACKER IMPORTS =====
from app.core.usage_tracker import enforce_quota, UsageRecord, tracker
class HealingAction(str, Enum):
NO_ACTION = "no_action"
RESTART_CONTAINER = "restart_container"
SCALE_OUT = "scale_out"
ROLLBACK = "rollback"
CIRCUIT_BREAKER = "circuit_breaker"
TRAFFIC_SHIFT = "traffic_shift"
ALERT_TEAM = "alert_team"
class ReliabilityEvent(BaseModel):
component: str
latency_p99: float
error_rate: float
service_mesh: str = "default"
cpu_util: Optional[float] = None
memory_util: Optional[float] = None
router = APIRouter()
incident_history = []
@router.post("/report_incident")
async def report_incident(event: ReliabilityEvent):
incident_history.append(event.dict())
return {"status": "recorded"}
@router.post("/v1/incidents/evaluate")
async def evaluate_incident(
request: Request,
event: ReliabilityEvent,
background_tasks: BackgroundTasks,
quota: dict = Depends(enforce_quota)
):
start_time = time.time()
api_key = quota["api_key"]
tier = quota["tier"]
response_data = None
error_msg = None
try:
# Simple risk score (heuristic)
risk_score = min(1.0, (event.latency_p99 / 1000.0) * 0.7 + event.error_rate * 0.3)
if event.latency_p99 > 500 or event.error_rate > 0.15:
optimal_action = HealingAction.RESTART_CONTAINER
else:
optimal_action = HealingAction.NO_ACTION
current_state = {
"latency": event.latency_p99,
"error_rate": event.error_rate,
"last_action": {"action_type": "no_action"}
}
proposed_action = {"action_type": optimal_action.value, "params": {}}
ce = CausalExplainer()
causal_exp = ce.explain_healing_intent(proposed_action, current_state, "latency")
healing_intent = {
"action": optimal_action.value,
"component": event.component,
"parameters": proposed_action["params"],
"justification": f"Causal: {causal_exp.explanation_text}",
"confidence": 0.85,
"risk_score": risk_score,
"status": "oss_advisory_only"
}
response_data = {
"healing_intent": healing_intent,
"causal_explanation": {
"factual_outcome": causal_exp.factual_outcome,
"counterfactual_outcome": causal_exp.counterfactual_outcome,
"effect": causal_exp.effect,
"explanation_text": causal_exp.explanation_text,
"is_model_based": causal_exp.is_model_based,
"warnings": causal_exp.warnings
},
"utility_decision": {
"best_action": optimal_action.value,
"expected_utility": 0.5,
"explanation": "Heuristic decision based on latency/error thresholds"
}
}
# Asynchronous usage logging
if tracker:
record = UsageRecord(
api_key=api_key,
tier=tier,
timestamp=time.time(),
endpoint="/v1/incidents/evaluate",
request_body=event.dict(),
response=response_data,
processing_ms=(time.time() - start_time) * 1000,
)
await tracker.increment_usage_async(record, background_tasks)
return response_data
except HTTPException:
raise
except Exception as e:
error_msg = str(e)
# Log failure in background
if tracker:
record = UsageRecord(
api_key=api_key,
tier=tier,
timestamp=time.time(),
endpoint="/v1/incidents/evaluate",
request_body=event.dict(),
error=error_msg,
processing_ms=(time.time() - start_time) * 1000,
)
await tracker.increment_usage_async(record, background_tasks)
raise HTTPException(status_code=500, detail=error_msg)