File size: 7,373 Bytes
b635d04 6bb0212 189570d 642b6b3 6bb0212 642b6b3 c98f35f ff4d74f b635d04 6bb0212 6756da2 642b6b3 6756da2 642b6b3 6756da2 b635d04 d2a0c5e 6c7e606 6bb0212 642b6b3 73939b2 6bb0212 b635d04 6bb0212 7cfde2b 642b6b3 7cfde2b 642b6b3 b635d04 6c7e606 6bb0212 54e37af b635d04 54e37af b635d04 54e37af b635d04 54e37af b635d04 54e37af b635d04 54e37af b635d04 54e37af b635d04 54e37af b635d04 54e37af b635d04 54e37af b635d04 54e37af b635d04 54e37af b635d04 54e37af | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 | # hf_demo.py – ARF v4 dashboard for Hugging Face Spaces
import logging
from datetime import datetime, timezone
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import gradio as gr
# ARF v4 imports
from agentic_reliability_framework.core.governance.risk_engine import RiskEngine
from agentic_reliability_framework.runtime.memory import create_faiss_index, RAGGraphMemory
from agentic_reliability_framework.runtime.memory.constants import MemoryConstants
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="ARF v4 API with Memory")
# Enable CORS for your frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["https://arf-frontend-sandy.vercel.app"],
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# Initialize ARF components
# ---------------------------------------------------------------------------
risk_engine = RiskEngine()
# Create FAISS index and memory (using default dimension from constants)
faiss_index = create_faiss_index(dim=MemoryConstants.VECTOR_DIM)
memory = RAGGraphMemory(faiss_index)
# ---------------------------------------------------------------------------
# API Endpoints
# ---------------------------------------------------------------------------
@app.get("/")
async def root():
return {
"service": "ARF OSS API",
"version": "4.0.0",
"status": "operational",
"memory_stats": memory.get_graph_stats() if memory.has_historical_data() else "empty",
}
@app.get("/health")
async def health():
return {"status": "ok", "version": "4.0.0"}
@app.get("/api/v1/get_risk")
async def get_risk():
"""
Compute a safe risk snapshot using the supported RiskEngine.calculate_risk()
API. This avoids calling the removed get_current_risk() method.
"""
try:
score = _calculate_demo_risk()
return {
"system_risk": score["risk"],
"status": "critical" if score["risk"] > 0.8 else "normal",
"details": score,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/incident")
async def store_incident(event_data: dict, analysis: dict):
try:
incident_id = memory.store_incident(event_data, analysis)
return {"incident_id": incident_id}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/v1/memory/similar")
async def find_similar_incidents(action: str, k: int = 5):
class DummyEvent:
def __init__(self, action: str):
self.component = "user_action"
self.latency_p99 = 0.0
self.error_rate = 0.0
self.throughput = 0
self.cpu_util = 0.0
self.memory_util = 0.0
self.timestamp = datetime.now()
self.severity = "low"
self.action = action
event = DummyEvent(action)
analysis = {"action": action}
similar = memory.find_similar(event, analysis, k=k)
results = []
for node in similar:
results.append(
{
"incident_id": node.incident_id,
"component": node.component,
"severity": node.severity,
"timestamp": node.timestamp,
"metrics": node.metrics,
"agent_analysis": node.agent_analysis,
"similarity_score": node.metadata.get("similarity_score", 0.0),
}
)
return {"similar": results, "count": len(results)}
@app.get("/api/v1/memory/stats")
async def memory_stats():
return memory.get_graph_stats()
# ---------------------------------------------------------------------------
# Gradio dashboard
# ---------------------------------------------------------------------------
class _DemoIntent:
"""
Minimal intent object for demo-only risk snapshots.
RiskEngine.categorize_intent() will fall back to DEFAULT for this object.
"""
environment = "dev"
deployment_target = "dev"
service_name = "demo"
def _calculate_demo_risk():
"""
Use the supported RiskEngine.calculate_risk() API.
Avoids the removed get_current_risk() method.
"""
intent = _DemoIntent()
risk_value, explanation, contributions = risk_engine.calculate_risk(
intent=intent,
cost_estimate=None,
policy_violations=[],
)
return {
"risk": float(risk_value),
"status": "critical" if risk_value > 0.8 else "normal",
"explanation": explanation,
"contributions": contributions,
}
def get_risk_snapshot():
try:
snapshot = _calculate_demo_risk()
snapshot["timestamp"] = datetime.now(timezone.utc).isoformat()
return snapshot
except Exception as e:
logger.exception("Failed to compute risk snapshot")
return {
"status": "error",
"error": str(e),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
def get_health_snapshot():
try:
return {
"status": "ok",
"version": "4.0.0",
"service": "ARF OSS API",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
def get_memory_snapshot():
try:
if memory.has_historical_data():
stats = memory.get_graph_stats()
return {
"status": "ok",
"memory_stats": stats,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
return {
"status": "empty",
"memory_stats": "No historical memory yet.",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
except Exception as e:
logger.exception("Failed to compute memory snapshot")
return {
"status": "error",
"error": str(e),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
with gr.Blocks(title="ARF v4 Demo") as demo:
gr.Markdown("# Agentic Reliability Framework v4")
gr.Markdown("### Status dashboard")
with gr.Row():
health_output = gr.JSON(label="Health")
risk_output = gr.JSON(label="Current Risk")
with gr.Row():
memory_output = gr.JSON(label="Memory Stats")
with gr.Row():
refresh_btn = gr.Button("Refresh Risk")
health_btn = gr.Button("Refresh Health")
memory_btn = gr.Button("Refresh Memory")
refresh_btn.click(fn=get_risk_snapshot, outputs=risk_output)
health_btn.click(fn=get_health_snapshot, outputs=health_output)
memory_btn.click(fn=get_memory_snapshot, outputs=memory_output)
# Load initial state after startup, not during import.
demo.load(fn=get_health_snapshot, outputs=health_output)
demo.load(fn=get_risk_snapshot, outputs=risk_output)
demo.load(fn=get_memory_snapshot, outputs=memory_output)
# ============== MAIN ENTRY POINT ==============
if __name__ == "__main__":
# Launch Gradio directly to keep the Space alive and avoid the startup crash.
demo.launch(server_name="0.0.0.0")
|