Spaces:
Sleeping
Sleeping
| """ | |
| Workflow state management service. | |
| Handles in-memory workflow storage and background execution. | |
| """ | |
| import json | |
| import logging | |
| import os | |
| from datetime import datetime | |
| from src.services.swot_parser import parse_swot_text | |
| from src.utils.analysis_cache import get_cached_analysis, set_cached_analysis | |
| logger = logging.getLogger(__name__) | |
| # In-memory workflow storage | |
| WORKFLOWS: dict = {} | |
| # Configurable delay for granular progress events (ms) | |
| METRIC_DELAY_MS = int(os.getenv("METRIC_DELAY_MS", "300")) | |
| def add_activity_log(workflow_id: str, step: str, message: str): | |
| """Add an entry to the workflow activity log.""" | |
| if workflow_id in WORKFLOWS: | |
| if "activity_log" not in WORKFLOWS[workflow_id]: | |
| WORKFLOWS[workflow_id]["activity_log"] = [] | |
| WORKFLOWS[workflow_id]["activity_log"].append({ | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| "step": step, | |
| "message": message | |
| }) | |
| def add_metric(workflow_id: str, source: str, metric: str, value): | |
| """Add a metric to the workflow metrics array and activity log.""" | |
| if workflow_id in WORKFLOWS: | |
| if "metrics" not in WORKFLOWS[workflow_id]: | |
| WORKFLOWS[workflow_id]["metrics"] = [] | |
| WORKFLOWS[workflow_id]["metrics"].append({ | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| "source": source, | |
| "metric": metric, | |
| "value": value | |
| }) | |
| # Also add to activity log for visibility | |
| display_value = f"{value:.2f}" if isinstance(value, float) else str(value) | |
| add_activity_log(workflow_id, source, f"Fetched {metric}: {display_value}") | |
| # Update MCP status to completed when we get a metric | |
| if "mcp_status" in WORKFLOWS[workflow_id] and source in WORKFLOWS[workflow_id]["mcp_status"]: | |
| WORKFLOWS[workflow_id]["mcp_status"][source] = "completed" | |
| def update_mcp_status(workflow_id: str, source: str, status: str): | |
| """Update MCP server status (idle/executing/completed/failed).""" | |
| if workflow_id in WORKFLOWS and "mcp_status" in WORKFLOWS[workflow_id]: | |
| if source in WORKFLOWS[workflow_id]["mcp_status"]: | |
| WORKFLOWS[workflow_id]["mcp_status"][source] = status | |
| def run_workflow_background(workflow_id: str, company_name: str, ticker: str, strategy_focus: str): | |
| """Execute workflow in background thread with progress tracking.""" | |
| try: | |
| # Check cache first | |
| add_activity_log(workflow_id, "cache", f"Checking cache for {ticker}") | |
| WORKFLOWS[workflow_id]["current_step"] = "cache" | |
| cached = get_cached_analysis(ticker) | |
| if cached: | |
| # Cache hit - use cached result | |
| add_activity_log(workflow_id, "cache", f"Cache HIT - {ticker} analysis found in history") | |
| add_activity_log(workflow_id, "cache", f"Returning cached result (skipping agentic workflow)") | |
| WORKFLOWS[workflow_id].update({ | |
| "status": "completed", | |
| "current_step": "completed", | |
| "revision_count": cached.get("revision_count", 0), | |
| "score": cached.get("score", 0), | |
| "data_source": "cache", | |
| "result": { | |
| "company_name": cached.get("company_name", company_name), | |
| "score": cached.get("score", 0), | |
| "revision_count": cached.get("revision_count", 0), | |
| "report_length": cached.get("report_length", 0), | |
| "critique": cached.get("critique", ""), | |
| "swot_data": cached.get("swot_data", {}), | |
| "raw_report": cached.get("raw_report", ""), | |
| "data_source": "cache", | |
| "provider_used": cached.get("provider_used", "cached"), | |
| "raw_data": cached.get("raw_data", {}), | |
| "_cache_info": cached.get("_cache_info", {}) | |
| } | |
| }) | |
| return | |
| add_activity_log(workflow_id, "cache", f"Cache MISS - {ticker} not in history") | |
| add_activity_log(workflow_id, "cache", f"Proceeding with full agentic workflow...") | |
| # Import here to avoid circular imports and init issues | |
| from src.workflow.graph import app as graph_app | |
| # Update status to running | |
| WORKFLOWS[workflow_id]["status"] = "running" | |
| WORKFLOWS[workflow_id]["current_step"] = "researcher" | |
| add_activity_log(workflow_id, "input", f"Starting analysis for {company_name} ({ticker})") | |
| # Initialize MCP status | |
| WORKFLOWS[workflow_id]["mcp_status"] = { | |
| "financials": "idle", | |
| "valuation": "idle", | |
| "volatility": "idle", | |
| "macro": "idle", | |
| "news": "idle", | |
| "sentiment": "idle" | |
| } | |
| # Initialize state | |
| state = { | |
| "company_name": company_name, | |
| "ticker": ticker, | |
| "strategy_focus": strategy_focus, | |
| "raw_data": None, | |
| "draft_report": None, | |
| "critique": None, | |
| "revision_count": 0, | |
| "messages": [], | |
| "score": 0, | |
| "data_source": "live", | |
| "provider_used": None, | |
| "workflow_id": workflow_id, | |
| "progress_store": WORKFLOWS | |
| } | |
| # Execute workflow | |
| result = graph_app.invoke(state) | |
| # Update MCP status based on sources | |
| sources_available = result.get("sources_available", []) | |
| sources_failed = result.get("sources_failed", []) | |
| mcp_status = WORKFLOWS[workflow_id]["mcp_status"] | |
| for source in sources_available: | |
| if source in mcp_status: | |
| mcp_status[source] = "completed" | |
| for source in sources_failed: | |
| if source in mcp_status: | |
| mcp_status[source] = "failed" | |
| add_activity_log(workflow_id, source, f"MCP server failed") | |
| # Update LLM status based on failed providers and used provider | |
| llm_providers_failed = result.get("llm_providers_failed", []) | |
| provider_used = result.get("provider_used", "") | |
| llm_status = WORKFLOWS[workflow_id]["llm_status"] | |
| # Mark failed providers | |
| for provider in llm_providers_failed: | |
| if provider in llm_status: | |
| llm_status[provider] = "failed" | |
| # Mark the used provider as completed | |
| if provider_used: | |
| provider_name = provider_used.split(":")[0] | |
| if provider_name in llm_status: | |
| llm_status[provider_name] = "completed" | |
| # Parse SWOT from draft report | |
| swot_data = parse_swot_text(result.get("draft_report", "")) | |
| # Supplement with MCP-aggregated SWOT data (ensures weaknesses/threats aren't lost) | |
| try: | |
| raw_data = result.get("raw_data", "{}") | |
| if isinstance(raw_data, str): | |
| raw_data = json.loads(raw_data) | |
| mcp_swot = raw_data.get("aggregated_swot", {}) | |
| if mcp_swot: | |
| # Add MCP items that aren't already in parsed data | |
| for category in ["strengths", "weaknesses", "opportunities", "threats"]: | |
| existing = set(item.lower()[:50] for item in swot_data.get(category, [])) | |
| for item in mcp_swot.get(category, []): | |
| # Only add if not a duplicate (check first 50 chars lowercased) | |
| if item.lower()[:50] not in existing: | |
| swot_data[category].append(item) | |
| existing.add(item.lower()[:50]) | |
| except Exception as e: | |
| logger.warning(f"Could not merge MCP SWOT data: {e}") | |
| # Check if workflow ended with an error (LLM failures etc) | |
| if result.get("error"): | |
| error_msg = result.get("error") | |
| add_activity_log(workflow_id, "workflow", f"Workflow failed: {error_msg}") | |
| WORKFLOWS[workflow_id].update({ | |
| "status": "aborted", | |
| "error": error_msg, | |
| "current_step": "aborted" | |
| }) | |
| return | |
| # Parse raw_data for MCP display | |
| raw_data_parsed = {} | |
| try: | |
| raw_data_str = result.get("raw_data", "{}") | |
| if isinstance(raw_data_str, str): | |
| raw_data_parsed = json.loads(raw_data_str) | |
| else: | |
| raw_data_parsed = raw_data_str or {} | |
| except Exception as e: | |
| logger.warning(f"Could not parse raw_data: {e}") | |
| # Build final result | |
| final_result = { | |
| "company_name": company_name, | |
| "score": result.get("score", 0), | |
| "revision_count": result.get("revision_count", 0), | |
| "report_length": len(result.get("draft_report", "")), | |
| "critique": result.get("critique", ""), | |
| "swot_data": swot_data, | |
| "raw_report": result.get("draft_report", ""), | |
| "data_source": result.get("data_source", "unknown"), | |
| "provider_used": result.get("provider_used", "unknown"), | |
| "raw_data": raw_data_parsed | |
| } | |
| # Cache the final result | |
| set_cached_analysis(ticker, company_name, final_result) | |
| add_activity_log(workflow_id, "cache", f"Cached analysis for {ticker}") | |
| # Update with final result | |
| WORKFLOWS[workflow_id].update({ | |
| "status": "completed", | |
| "current_step": "completed", | |
| "revision_count": result.get("revision_count", 0), | |
| "score": result.get("score", 0), | |
| "result": final_result | |
| }) | |
| except Exception as e: | |
| error_msg = str(e) | |
| # Determine if this is an abort (critical) or error (retryable) | |
| # Aborts: Core MCP failures, insufficient data | |
| is_abort = any(phrase in error_msg for phrase in [ | |
| "Insufficient core data", | |
| "All MCP servers failed", | |
| "Need at least 2 of" | |
| ]) | |
| WORKFLOWS[workflow_id].update({ | |
| "status": "aborted" if is_abort else "error", | |
| "error": error_msg | |
| }) | |