Instant-SWOT-Agent / src /services /workflow_store.py
vn6295337's picture
Add MCP source data panel above SWOT results
c1e5f17
raw
history blame
10.1 kB
"""
Workflow state management service.
Handles in-memory workflow storage and background execution.
"""
import json
import logging
import os
from datetime import datetime
from src.services.swot_parser import parse_swot_text
from src.utils.analysis_cache import get_cached_analysis, set_cached_analysis
logger = logging.getLogger(__name__)
# In-memory workflow storage
WORKFLOWS: dict = {}
# Configurable delay for granular progress events (ms)
METRIC_DELAY_MS = int(os.getenv("METRIC_DELAY_MS", "300"))
def add_activity_log(workflow_id: str, step: str, message: str):
"""Add an entry to the workflow activity log."""
if workflow_id in WORKFLOWS:
if "activity_log" not in WORKFLOWS[workflow_id]:
WORKFLOWS[workflow_id]["activity_log"] = []
WORKFLOWS[workflow_id]["activity_log"].append({
"timestamp": datetime.utcnow().isoformat() + "Z",
"step": step,
"message": message
})
def add_metric(workflow_id: str, source: str, metric: str, value):
"""Add a metric to the workflow metrics array and activity log."""
if workflow_id in WORKFLOWS:
if "metrics" not in WORKFLOWS[workflow_id]:
WORKFLOWS[workflow_id]["metrics"] = []
WORKFLOWS[workflow_id]["metrics"].append({
"timestamp": datetime.utcnow().isoformat() + "Z",
"source": source,
"metric": metric,
"value": value
})
# Also add to activity log for visibility
display_value = f"{value:.2f}" if isinstance(value, float) else str(value)
add_activity_log(workflow_id, source, f"Fetched {metric}: {display_value}")
# Update MCP status to completed when we get a metric
if "mcp_status" in WORKFLOWS[workflow_id] and source in WORKFLOWS[workflow_id]["mcp_status"]:
WORKFLOWS[workflow_id]["mcp_status"][source] = "completed"
def update_mcp_status(workflow_id: str, source: str, status: str):
"""Update MCP server status (idle/executing/completed/failed)."""
if workflow_id in WORKFLOWS and "mcp_status" in WORKFLOWS[workflow_id]:
if source in WORKFLOWS[workflow_id]["mcp_status"]:
WORKFLOWS[workflow_id]["mcp_status"][source] = status
def run_workflow_background(workflow_id: str, company_name: str, ticker: str, strategy_focus: str):
"""Execute workflow in background thread with progress tracking."""
try:
# Check cache first
add_activity_log(workflow_id, "cache", f"Checking cache for {ticker}")
WORKFLOWS[workflow_id]["current_step"] = "cache"
cached = get_cached_analysis(ticker)
if cached:
# Cache hit - use cached result
add_activity_log(workflow_id, "cache", f"Cache HIT - {ticker} analysis found in history")
add_activity_log(workflow_id, "cache", f"Returning cached result (skipping agentic workflow)")
WORKFLOWS[workflow_id].update({
"status": "completed",
"current_step": "completed",
"revision_count": cached.get("revision_count", 0),
"score": cached.get("score", 0),
"data_source": "cache",
"result": {
"company_name": cached.get("company_name", company_name),
"score": cached.get("score", 0),
"revision_count": cached.get("revision_count", 0),
"report_length": cached.get("report_length", 0),
"critique": cached.get("critique", ""),
"swot_data": cached.get("swot_data", {}),
"raw_report": cached.get("raw_report", ""),
"data_source": "cache",
"provider_used": cached.get("provider_used", "cached"),
"raw_data": cached.get("raw_data", {}),
"_cache_info": cached.get("_cache_info", {})
}
})
return
add_activity_log(workflow_id, "cache", f"Cache MISS - {ticker} not in history")
add_activity_log(workflow_id, "cache", f"Proceeding with full agentic workflow...")
# Import here to avoid circular imports and init issues
from src.workflow.graph import app as graph_app
# Update status to running
WORKFLOWS[workflow_id]["status"] = "running"
WORKFLOWS[workflow_id]["current_step"] = "researcher"
add_activity_log(workflow_id, "input", f"Starting analysis for {company_name} ({ticker})")
# Initialize MCP status
WORKFLOWS[workflow_id]["mcp_status"] = {
"financials": "idle",
"valuation": "idle",
"volatility": "idle",
"macro": "idle",
"news": "idle",
"sentiment": "idle"
}
# Initialize state
state = {
"company_name": company_name,
"ticker": ticker,
"strategy_focus": strategy_focus,
"raw_data": None,
"draft_report": None,
"critique": None,
"revision_count": 0,
"messages": [],
"score": 0,
"data_source": "live",
"provider_used": None,
"workflow_id": workflow_id,
"progress_store": WORKFLOWS
}
# Execute workflow
result = graph_app.invoke(state)
# Update MCP status based on sources
sources_available = result.get("sources_available", [])
sources_failed = result.get("sources_failed", [])
mcp_status = WORKFLOWS[workflow_id]["mcp_status"]
for source in sources_available:
if source in mcp_status:
mcp_status[source] = "completed"
for source in sources_failed:
if source in mcp_status:
mcp_status[source] = "failed"
add_activity_log(workflow_id, source, f"MCP server failed")
# Update LLM status based on failed providers and used provider
llm_providers_failed = result.get("llm_providers_failed", [])
provider_used = result.get("provider_used", "")
llm_status = WORKFLOWS[workflow_id]["llm_status"]
# Mark failed providers
for provider in llm_providers_failed:
if provider in llm_status:
llm_status[provider] = "failed"
# Mark the used provider as completed
if provider_used:
provider_name = provider_used.split(":")[0]
if provider_name in llm_status:
llm_status[provider_name] = "completed"
# Parse SWOT from draft report
swot_data = parse_swot_text(result.get("draft_report", ""))
# Supplement with MCP-aggregated SWOT data (ensures weaknesses/threats aren't lost)
try:
raw_data = result.get("raw_data", "{}")
if isinstance(raw_data, str):
raw_data = json.loads(raw_data)
mcp_swot = raw_data.get("aggregated_swot", {})
if mcp_swot:
# Add MCP items that aren't already in parsed data
for category in ["strengths", "weaknesses", "opportunities", "threats"]:
existing = set(item.lower()[:50] for item in swot_data.get(category, []))
for item in mcp_swot.get(category, []):
# Only add if not a duplicate (check first 50 chars lowercased)
if item.lower()[:50] not in existing:
swot_data[category].append(item)
existing.add(item.lower()[:50])
except Exception as e:
logger.warning(f"Could not merge MCP SWOT data: {e}")
# Check if workflow ended with an error (LLM failures etc)
if result.get("error"):
error_msg = result.get("error")
add_activity_log(workflow_id, "workflow", f"Workflow failed: {error_msg}")
WORKFLOWS[workflow_id].update({
"status": "aborted",
"error": error_msg,
"current_step": "aborted"
})
return
# Parse raw_data for MCP display
raw_data_parsed = {}
try:
raw_data_str = result.get("raw_data", "{}")
if isinstance(raw_data_str, str):
raw_data_parsed = json.loads(raw_data_str)
else:
raw_data_parsed = raw_data_str or {}
except Exception as e:
logger.warning(f"Could not parse raw_data: {e}")
# Build final result
final_result = {
"company_name": company_name,
"score": result.get("score", 0),
"revision_count": result.get("revision_count", 0),
"report_length": len(result.get("draft_report", "")),
"critique": result.get("critique", ""),
"swot_data": swot_data,
"raw_report": result.get("draft_report", ""),
"data_source": result.get("data_source", "unknown"),
"provider_used": result.get("provider_used", "unknown"),
"raw_data": raw_data_parsed
}
# Cache the final result
set_cached_analysis(ticker, company_name, final_result)
add_activity_log(workflow_id, "cache", f"Cached analysis for {ticker}")
# Update with final result
WORKFLOWS[workflow_id].update({
"status": "completed",
"current_step": "completed",
"revision_count": result.get("revision_count", 0),
"score": result.get("score", 0),
"result": final_result
})
except Exception as e:
error_msg = str(e)
# Determine if this is an abort (critical) or error (retryable)
# Aborts: Core MCP failures, insufficient data
is_abort = any(phrase in error_msg for phrase in [
"Insufficient core data",
"All MCP servers failed",
"Need at least 2 of"
])
WORKFLOWS[workflow_id].update({
"status": "aborted" if is_abort else "error",
"error": error_msg
})