File size: 2,813 Bytes
c6cea2c b0546d3 ec78ba8 b0546d3 c6cea2c 9c06673 b0546d3 c6cea2c b0546d3 c6cea2c b0546d3 e992ee3 c6cea2c e992ee3 b0546d3 c6cea2c b0546d3 c6cea2c b0546d3 c6cea2c b0546d3 c6cea2c b0546d3 c6cea2c b0546d3 c6cea2c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | # demo/orchestrator.py - FIXED VERSION
from __future__ import annotations
import logging
import asyncio
from typing import Any, Dict, Optional, List
import time
logger = logging.getLogger(__name__)
# Import mock ARF functions
try:
from demo.mock_arf import (
simulate_arf_analysis,
run_rag_similarity_search,
create_mock_healing_intent,
calculate_pattern_confidence
)
MOCK_ARF_AVAILABLE = True
except ImportError:
logger.warning("Mock ARF functions not available")
MOCK_ARF_AVAILABLE = False
class DemoOrchestrator:
"""
Orchestrates demo scenarios with proper agent workflow.
"""
def __init__(self, enable_streamlit: bool = False):
self.enable_streamlit = enable_streamlit
async def analyze_incident(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyze an incident using the ARF agent workflow.
This is the method called by app.py
"""
logger.info(f"Analyzing incident: {scenario_name}")
if not MOCK_ARF_AVAILABLE:
return {
"status": "error",
"message": "Mock ARF functions not available",
"scenario": scenario_name
}
try:
# Step 1: Detection Agent
detection_result = simulate_arf_analysis(scenario_data)
# Step 2: Recall Agent
similar_incidents = run_rag_similarity_search(scenario_data)
# Step 3: Decision Agent
confidence = calculate_pattern_confidence(scenario_data, similar_incidents)
healing_intent = create_mock_healing_intent(scenario_data, similar_incidents, confidence)
# Simulate processing time
await asyncio.sleep(0.5)
return {
"status": "success",
"scenario": scenario_name,
"detection": detection_result,
"recall": similar_incidents,
"decision": healing_intent,
"confidence": confidence,
"processing_time_ms": 450
}
except Exception as e:
logger.error(f"Error analyzing incident: {e}")
return {
"status": "error",
"message": str(e),
"scenario": scenario_name
}
def run_scenario(self, scenario: Dict[str, Any]) -> Dict[str, Any]:
"""
Run a demo scenario (legacy method).
"""
logger.info("Running scenario: %s", scenario.get("name", "unknown"))
return {
"scenario": scenario.get("name"),
"status": "completed",
"output": scenario,
} |