Spaces:
Sleeping
Sleeping
| """ | |
| MCP Client - TRUE MCP protocol via subprocess stdio. | |
| Implements proper MCP handshake: | |
| 1. Send 'initialize' request | |
| 2. Receive initialization response | |
| 3. Send 'initialized' notification | |
| 4. Send 'tools/call' request | |
| 5. Parse response | |
| Also supports HTTP-based load-balanced calls for fundamentals-basket. | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Optional, Callable, Any | |
| import httpx | |
| logger = logging.getLogger(__name__) | |
| # Base path for MCP servers | |
| MCP_SERVERS_PATH = Path(__file__).parent / "mcp-servers" | |
| # Configurable delay for granular progress events (ms) | |
| # Set to 0 for completeness-first mode (no artificial UI delays) | |
| METRIC_DELAY_MS = int(os.getenv("METRIC_DELAY_MS", "0")) | |
| # ============================================================================= | |
| # HTTP LOAD BALANCER CONFIGURATION | |
| # ============================================================================= | |
| # Financials HTTP load balancer URL (nginx on port 8080) | |
| FINANCIALS_HTTP_URL = os.getenv("FINANCIALS_HTTP_URL", "http://localhost:8080") | |
| # Toggle HTTP mode (set to "false" to use subprocess MCP) | |
| USE_HTTP_FINANCIALS = os.getenv("USE_HTTP_FINANCIALS", "false").lower() == "true" | |
| # HTTP client timeout (increased for completeness-first mode) | |
| HTTP_TIMEOUT = float(os.getenv("HTTP_TIMEOUT", "90.0")) | |
| # ============================================================================= | |
| # HTTP CLIENT FOR LOAD-BALANCED CALLS | |
| # ============================================================================= | |
| async def call_fundamentals_http(tool_name: str, arguments: dict, timeout: float = None) -> dict: | |
| """ | |
| Call fundamentals-basket via HTTP load balancer (nginx). | |
| This bypasses MCP subprocess spawning for better performance. | |
| Requires the HTTP cluster to be running (./start_cluster.sh). | |
| Args: | |
| tool_name: Name of the tool (e.g., 'get_sec_fundamentals') | |
| arguments: Tool arguments dict | |
| timeout: Request timeout in seconds | |
| Returns: | |
| Tool result dict or error dict | |
| """ | |
| timeout = timeout or HTTP_TIMEOUT | |
| url = f"{FINANCIALS_HTTP_URL}/tools/{tool_name}" | |
| try: | |
| async with httpx.AsyncClient(timeout=timeout) as client: | |
| response = await client.post(url, json=arguments) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.TimeoutException: | |
| logger.error(f"HTTP timeout calling {tool_name}: {timeout}s") | |
| return {"error": f"HTTP timeout after {timeout}s", "tool": tool_name} | |
| except httpx.HTTPStatusError as e: | |
| logger.error(f"HTTP error calling {tool_name}: {e.response.status_code}") | |
| return {"error": f"HTTP {e.response.status_code}", "tool": tool_name} | |
| except httpx.ConnectError: | |
| logger.warning(f"HTTP connection failed for {tool_name}, falling back to subprocess") | |
| # Fall back to subprocess MCP if HTTP cluster is not running | |
| return await call_mcp_server("fundamentals-basket", tool_name, arguments, timeout) | |
| except Exception as e: | |
| logger.error(f"HTTP error calling {tool_name}: {e}") | |
| return {"error": str(e), "tool": tool_name} | |
| async def check_fundamentals_http_health() -> bool: | |
| """ | |
| Check if the fundamentals HTTP cluster is healthy. | |
| Returns: | |
| True if cluster is responding, False otherwise | |
| """ | |
| try: | |
| async with httpx.AsyncClient(timeout=5.0) as client: | |
| response = await client.get(f"{FINANCIALS_HTTP_URL}/health") | |
| return response.status_code == 200 | |
| except Exception: | |
| return False | |
| async def emit_metric( | |
| progress_callback: Optional[Callable], | |
| source: str, | |
| metric: str, | |
| value: Any, | |
| end_date: str = None, | |
| fiscal_year: int = None, | |
| form: str = None | |
| ): | |
| """Emit a metric event as a structured payload with optional temporal data.""" | |
| if progress_callback: | |
| payload = { | |
| "source": source, | |
| "metric": metric, | |
| "value": value, | |
| "end_date": end_date, | |
| "fiscal_year": fiscal_year, | |
| "form": form, | |
| } | |
| logger.debug(f"emit_metric payload: {json.dumps(payload, default=str)}") | |
| progress_callback(payload) | |
| await asyncio.sleep(METRIC_DELAY_MS / 1000) | |
| async def call_mcp_server( | |
| server_name: str, | |
| tool_name: str, | |
| arguments: dict, | |
| timeout: float = 90.0 | |
| ) -> dict: | |
| """ | |
| Call an MCP server tool via subprocess stdio using proper MCP protocol sequencing. | |
| Protocol sequence: | |
| 1. Send initialize request -> wait for response (id=1) | |
| 2. Send initialized notification | |
| 3. Send tools/call request -> wait for response (id=2) | |
| 4. Clean up | |
| Args: | |
| server_name: Name of the MCP server directory (e.g., 'fundamentals-basket') | |
| tool_name: Name of the tool to call (e.g., 'get_sec_fundamentals') | |
| arguments: Dict of arguments to pass to the tool | |
| timeout: Total timeout in seconds (default 60s for external API calls) | |
| Returns: | |
| Dict with tool result or error | |
| """ | |
| server_path = MCP_SERVERS_PATH / server_name / "server.py" | |
| if not server_path.exists(): | |
| return {"error": f"MCP server not found: {server_name}"} | |
| process = None | |
| try: | |
| # Start the MCP server process | |
| process = await asyncio.create_subprocess_exec( | |
| "python3", str(server_path), | |
| stdin=asyncio.subprocess.PIPE, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| cwd=str(server_path.parent), | |
| env={**os.environ} | |
| ) | |
| async def send_message(msg: dict): | |
| """Send a JSON-RPC message to the server.""" | |
| data = json.dumps(msg) + "\n" | |
| process.stdin.write(data.encode()) | |
| await process.stdin.drain() | |
| async def read_response(expected_id: int, phase_timeout: float) -> dict: | |
| """Read and parse JSON-RPC response with expected id.""" | |
| buffer = "" | |
| start_time = asyncio.get_event_loop().time() | |
| while True: | |
| remaining = phase_timeout - (asyncio.get_event_loop().time() - start_time) | |
| if remaining <= 0: | |
| raise asyncio.TimeoutError(f"Timeout waiting for response id={expected_id}") | |
| try: | |
| line = await asyncio.wait_for( | |
| process.stdout.readline(), | |
| timeout=min(remaining, 5.0) # Check every 5s | |
| ) | |
| except asyncio.TimeoutError: | |
| continue # Keep trying until phase_timeout | |
| if not line: | |
| # EOF - server closed stdout | |
| raise EOFError(f"Server closed stdout before sending response id={expected_id}") | |
| line_str = line.decode().strip() | |
| if not line_str: | |
| continue | |
| # Try to parse as JSON | |
| # Handle case where line might contain non-JSON prefix (logs) | |
| json_start = line_str.find('{') | |
| if json_start == -1: | |
| continue | |
| try: | |
| response = json.loads(line_str[json_start:]) | |
| if isinstance(response, dict): | |
| # Check if this is the response we're waiting for | |
| if response.get("id") == expected_id: | |
| return response | |
| # Also check for error responses | |
| if "error" in response and response.get("id") == expected_id: | |
| return response | |
| except json.JSONDecodeError: | |
| # Might be partial JSON, accumulate in buffer | |
| buffer += line_str | |
| try: | |
| response = json.loads(buffer) | |
| if response.get("id") == expected_id: | |
| return response | |
| buffer = "" # Reset if we got valid JSON but wrong id | |
| except json.JSONDecodeError: | |
| pass # Keep accumulating | |
| # Phase 1: Initialize | |
| init_request = { | |
| "jsonrpc": "2.0", | |
| "id": 1, | |
| "method": "initialize", | |
| "params": { | |
| "protocolVersion": "2024-11-05", | |
| "capabilities": {}, | |
| "clientInfo": {"name": "research-service", "version": "1.0.0"} | |
| } | |
| } | |
| await send_message(init_request) | |
| init_response = await read_response(expected_id=1, phase_timeout=20.0) | |
| if "error" in init_response: | |
| return {"error": f"Initialize failed: {init_response['error']}"} | |
| # Phase 2: Send initialized notification (no response expected) | |
| initialized_notification = { | |
| "jsonrpc": "2.0", | |
| "method": "notifications/initialized" | |
| } | |
| await send_message(initialized_notification) | |
| await asyncio.sleep(0.05) # Brief pause for server to process | |
| # Phase 3: Tool call | |
| tool_request = { | |
| "jsonrpc": "2.0", | |
| "id": 2, | |
| "method": "tools/call", | |
| "params": {"name": tool_name, "arguments": arguments} | |
| } | |
| await send_message(tool_request) | |
| tool_response = await read_response(expected_id=2, phase_timeout=timeout) | |
| # Process tool response | |
| if "error" in tool_response: | |
| return {"error": f"Tool call failed: {tool_response['error']}"} | |
| if "result" in tool_response: | |
| result = tool_response["result"] | |
| # MCP SDK format: {"content": [{"type": "text", "text": "..."}]} | |
| if isinstance(result, dict) and "content" in result: | |
| content_list = result.get("content", []) | |
| if content_list and isinstance(content_list, list): | |
| for content in content_list: | |
| if isinstance(content, dict) and content.get("type") == "text": | |
| try: | |
| return json.loads(content.get("text", "{}")) | |
| except json.JSONDecodeError: | |
| return {"raw_text": content.get("text", "")} | |
| return result | |
| return {"error": "No result in tool response"} | |
| except asyncio.TimeoutError as e: | |
| logger.warning(f"MCP {server_name} timeout: {e}") | |
| return {"error": f"Timeout: {e}"} | |
| except EOFError as e: | |
| logger.warning(f"MCP {server_name} EOF: {e}") | |
| return {"error": f"Server closed: {e}"} | |
| except Exception as e: | |
| logger.error(f"MCP {server_name} error: {e}") | |
| return {"error": str(e)} | |
| finally: | |
| # Clean up process | |
| if process: | |
| try: | |
| process.stdin.close() | |
| except: | |
| pass | |
| try: | |
| # Give process 2s to exit gracefully | |
| await asyncio.wait_for(process.wait(), timeout=2.0) | |
| except asyncio.TimeoutError: | |
| process.kill() | |
| await process.wait() | |
| # Log stderr if any | |
| try: | |
| stderr_data = await asyncio.wait_for(process.stderr.read(), timeout=1.0) | |
| if stderr_data: | |
| stderr_text = stderr_data.decode().strip() | |
| if stderr_text: | |
| logger.debug(f"MCP {server_name} stderr: {stderr_text[:500]}") | |
| except: | |
| pass | |
| async def call_fundamentals_mcp(ticker: str) -> dict: | |
| """ | |
| Fetch SEC fundamentals for a ticker. | |
| Uses HTTP load balancer if USE_HTTP_FINANCIALS=true, otherwise subprocess MCP. | |
| """ | |
| if USE_HTTP_FINANCIALS: | |
| return await call_fundamentals_http("get_sec_fundamentals", {"ticker": ticker}) | |
| return await call_mcp_server( | |
| "fundamentals-basket", | |
| "get_sec_fundamentals", | |
| {"ticker": ticker} | |
| ) | |
| async def call_fundamentals_all_sources_mcp(ticker: str) -> dict: | |
| """ | |
| Fetch fundamentals from ALL sources (SEC EDGAR + Yahoo Finance). | |
| Uses HTTP load balancer if USE_HTTP_FINANCIALS=true, otherwise subprocess MCP. | |
| """ | |
| if USE_HTTP_FINANCIALS: | |
| return await call_fundamentals_http("get_all_sources_fundamentals", {"ticker": ticker}) | |
| return await call_mcp_server( | |
| "fundamentals-basket", | |
| "get_all_sources_fundamentals", | |
| {"ticker": ticker} | |
| ) | |
| async def call_volatility_mcp(ticker: str) -> dict: | |
| """Fetch volatility metrics for a ticker.""" | |
| return await call_mcp_server( | |
| "volatility-basket", | |
| "get_volatility_basket", | |
| {"ticker": ticker} | |
| ) | |
| async def call_volatility_all_sources_mcp(ticker: str) -> dict: | |
| """Fetch volatility from ALL sources (Yahoo + Alpha Vantage + Tradier).""" | |
| return await call_mcp_server( | |
| "volatility-basket", | |
| "get_all_sources_volatility", | |
| {"ticker": ticker} | |
| ) | |
| async def call_macro_mcp() -> dict: | |
| """Fetch macroeconomic indicators.""" | |
| return await call_mcp_server( | |
| "macro-basket", | |
| "get_macro_basket", | |
| {} | |
| ) | |
| async def call_macro_all_sources_mcp() -> dict: | |
| """Fetch macro from ALL sources (BEA/BLS primary + FRED fallback).""" | |
| return await call_mcp_server( | |
| "macro-basket", | |
| "get_all_sources_macro", | |
| {} | |
| ) | |
| async def call_valuation_mcp(ticker: str) -> dict: | |
| """Fetch valuation ratios for a ticker.""" | |
| return await call_mcp_server( | |
| "valuation-basket", | |
| "get_valuation_basket", | |
| {"ticker": ticker} | |
| ) | |
| async def call_valuation_all_sources_mcp(ticker: str) -> dict: | |
| """Fetch valuation from ALL sources (Yahoo Finance + Alpha Vantage).""" | |
| return await call_mcp_server( | |
| "valuation-basket", | |
| "get_all_sources_valuation", | |
| {"ticker": ticker} | |
| ) | |
| async def call_news_mcp(ticker: str, company_name: str = "") -> dict: | |
| """Fetch news for a company.""" | |
| args = {"ticker": ticker} | |
| if company_name: | |
| args["company_name"] = company_name | |
| return await call_mcp_server( | |
| "news-basket", | |
| "get_all_sources_news", | |
| args | |
| ) | |
| async def call_sentiment_mcp(ticker: str, company_name: str = "") -> dict: | |
| """Fetch sentiment metrics for a ticker.""" | |
| return await call_mcp_server( | |
| "sentiment-basket", | |
| "get_sentiment_basket", | |
| {"ticker": ticker, "company_name": company_name} | |
| ) | |
| # ============================================================================= | |
| # SCHEMA NORMALIZERS | |
| # Convert MCP-emitted schemas to analyzer-expected format | |
| # ============================================================================= | |
| def _normalize_volatility(raw: dict) -> dict: | |
| """Pass-through: MCPs now emit {source: {data: ...}} directly.""" | |
| return raw | |
| def _normalize_macro(raw: dict) -> dict: | |
| """Pass-through: MCPs now emit {source: {data: ...}} directly.""" | |
| return raw | |
| def _normalize_valuation(raw: dict) -> dict: | |
| """Pass-through: MCPs now emit {source: {data: ...}} directly.""" | |
| return raw | |
| def _normalize_fundamentals(raw: dict) -> dict: | |
| """Pass-through: MCPs now emit {source: {data: ...}} directly.""" | |
| return raw | |
| def _get_nested_value(data: dict, *keys): | |
| """Safely get nested value from dict, returns None if not found.""" | |
| for key in keys: | |
| if not isinstance(data, dict): | |
| return None | |
| data = data.get(key) | |
| return data | |
| async def _extract_and_emit_metrics( | |
| source: str, | |
| result: dict, | |
| progress_callback: Optional[Callable] | |
| ) -> None: | |
| """Extract metrics from MCP result and emit via callback. | |
| Handles multi-source structures from _all_sources endpoints: | |
| - fundamentals: {"sec_edgar": {...}, "yahoo_finance": {...}} | |
| - valuation: {"yahoo_finance": {...}, "alpha_vantage": {...}} | |
| - volatility: {"yahoo_finance": {...}, "alpha_vantage": {...}, "market_volatility_context": {...}} | |
| - macro: {"bea_bls": {...}, "fred": {...}} | |
| """ | |
| if not progress_callback or not result or "error" in result: | |
| return | |
| if source == "fundamentals": | |
| # Multi-source structure (flattened): {"sec_edgar": {...}, "yahoo_finance": {...}} | |
| sec_data = result.get("sec_edgar") or {} | |
| yf_data = result.get("yahoo_finance") or {} | |
| # Revenue - prefer SEC EDGAR (primary source) | |
| revenue = sec_data.get("revenue") or yf_data.get("revenue") or {} | |
| if isinstance(revenue, dict) and revenue.get("value"): | |
| await emit_metric( | |
| progress_callback, source, "revenue", revenue["value"], | |
| end_date=revenue.get("end_date"), | |
| fiscal_year=revenue.get("fiscal_year"), | |
| form=revenue.get("form") | |
| ) | |
| elif isinstance(revenue, (int, float)): | |
| await emit_metric(progress_callback, source, "revenue", revenue) | |
| # Net margin | |
| net_margin = sec_data.get("net_margin_pct") or yf_data.get("net_margin_pct") or {} | |
| if isinstance(net_margin, dict) and net_margin.get("value") is not None: | |
| await emit_metric( | |
| progress_callback, source, "net_margin", net_margin["value"], | |
| end_date=net_margin.get("end_date"), | |
| fiscal_year=net_margin.get("fiscal_year"), | |
| form=net_margin.get("form") | |
| ) | |
| elif isinstance(net_margin, (int, float)): | |
| await emit_metric(progress_callback, source, "net_margin", net_margin) | |
| # EPS | |
| eps = sec_data.get("eps") or yf_data.get("eps") or {} | |
| if isinstance(eps, dict) and eps.get("value"): | |
| await emit_metric( | |
| progress_callback, source, "EPS", eps["value"], | |
| end_date=eps.get("end_date"), | |
| fiscal_year=eps.get("fiscal_year"), | |
| form=eps.get("form") | |
| ) | |
| elif isinstance(eps, (int, float)): | |
| await emit_metric(progress_callback, source, "EPS", eps) | |
| # Debt to Equity | |
| debt_to_equity = sec_data.get("debt_to_equity") or yf_data.get("debt_to_equity") | |
| if isinstance(debt_to_equity, dict) and debt_to_equity.get("value") is not None: | |
| await emit_metric( | |
| progress_callback, source, "debt_to_equity", debt_to_equity["value"], | |
| end_date=debt_to_equity.get("end_date"), | |
| fiscal_year=debt_to_equity.get("fiscal_year"), | |
| form=debt_to_equity.get("form") | |
| ) | |
| elif isinstance(debt_to_equity, (int, float)): | |
| await emit_metric(progress_callback, source, "debt_to_equity", debt_to_equity) | |
| elif source == "volatility": | |
| # Multi-source (flattened): {"fred": {...}, "yahoo_finance": {...}} | |
| fred_data = result.get("fred") or {} | |
| yf_data = result.get("yahoo_finance") or {} | |
| # VIX from FRED | |
| vix = fred_data.get("vix") or {} | |
| if isinstance(vix, dict) and vix.get("value") is not None: | |
| await emit_metric(progress_callback, source, "VIX", vix["value"], end_date=vix.get("as_of")) | |
| elif isinstance(vix, (int, float)): | |
| await emit_metric(progress_callback, source, "VIX", vix) | |
| # Beta from Yahoo Finance | |
| beta = yf_data.get("beta") or {} | |
| if isinstance(beta, dict) and beta.get("value") is not None: | |
| await emit_metric(progress_callback, source, "beta", beta["value"], end_date=beta.get("as_of")) | |
| elif isinstance(beta, (int, float)): | |
| await emit_metric(progress_callback, source, "beta", beta) | |
| # Historical Volatility from Yahoo Finance | |
| hist_vol = yf_data.get("historical_volatility") or {} | |
| if isinstance(hist_vol, dict) and hist_vol.get("value") is not None: | |
| await emit_metric(progress_callback, source, "hist_vol", hist_vol["value"], end_date=hist_vol.get("as_of")) | |
| elif isinstance(hist_vol, (int, float)): | |
| await emit_metric(progress_callback, source, "hist_vol", hist_vol) | |
| elif source == "macro": | |
| # Multi-source (flattened): {"bea": {...}, "bls": {...}, "fred": {...}} | |
| bea = result.get("bea") or {} | |
| bls = result.get("bls") or {} | |
| fred = result.get("fred") or {} | |
| # GDP Growth from BEA | |
| gdp = bea.get("gdp_growth") or {} | |
| if isinstance(gdp, dict) and gdp.get("value") is not None: | |
| await emit_metric(progress_callback, source, "GDP_growth", gdp["value"], end_date=gdp.get("as_of")) | |
| elif isinstance(gdp, (int, float)): | |
| await emit_metric(progress_callback, source, "GDP_growth", gdp) | |
| # Interest Rate from FRED | |
| interest = fred.get("interest_rate") or {} | |
| if isinstance(interest, dict) and interest.get("value") is not None: | |
| await emit_metric(progress_callback, source, "interest_rate", interest["value"], end_date=interest.get("as_of")) | |
| elif isinstance(interest, (int, float)): | |
| await emit_metric(progress_callback, source, "interest_rate", interest) | |
| # Inflation (CPI) from BLS | |
| inflation = bls.get("cpi_inflation") or {} | |
| if isinstance(inflation, dict) and inflation.get("value") is not None: | |
| await emit_metric(progress_callback, source, "inflation", inflation["value"], end_date=inflation.get("as_of")) | |
| elif isinstance(inflation, (int, float)): | |
| await emit_metric(progress_callback, source, "inflation", inflation) | |
| # Unemployment from BLS | |
| unemployment = bls.get("unemployment") or {} | |
| if isinstance(unemployment, dict) and unemployment.get("value") is not None: | |
| await emit_metric(progress_callback, source, "unemployment", unemployment["value"], end_date=unemployment.get("as_of")) | |
| elif isinstance(unemployment, (int, float)): | |
| await emit_metric(progress_callback, source, "unemployment", unemployment) | |
| elif source == "valuation": | |
| # Multi-source (flattened): {"yahoo_finance": {...}, "alpha_vantage": {...}} | |
| yf_data = result.get("yahoo_finance") or {} | |
| av_data = result.get("alpha_vantage") or {} | |
| # Use regular_market_time from yahoo_finance for timestamp | |
| market_time = yf_data.get("regular_market_time") | |
| # P/E Ratio - prefer Yahoo Finance (wrapped in {value, as_of}) | |
| pe_data = yf_data.get("trailing_pe") or av_data.get("trailing_pe") or {} | |
| if isinstance(pe_data, dict) and pe_data.get("value") is not None: | |
| await emit_metric(progress_callback, source, "P/E", pe_data["value"], end_date=pe_data.get("as_of") or market_time) | |
| elif isinstance(pe_data, (int, float)): | |
| await emit_metric(progress_callback, source, "P/E", pe_data, end_date=market_time) | |
| # P/B Ratio | |
| pb_data = yf_data.get("pb_ratio") or av_data.get("pb_ratio") or {} | |
| if isinstance(pb_data, dict) and pb_data.get("value") is not None: | |
| await emit_metric(progress_callback, source, "P/B", pb_data["value"], end_date=pb_data.get("as_of") or market_time) | |
| elif isinstance(pb_data, (int, float)): | |
| await emit_metric(progress_callback, source, "P/B", pb_data, end_date=market_time) | |
| # P/S Ratio | |
| ps_data = yf_data.get("ps_ratio") or av_data.get("ps_ratio") or {} | |
| if isinstance(ps_data, dict) and ps_data.get("value") is not None: | |
| await emit_metric(progress_callback, source, "P/S", ps_data["value"], end_date=ps_data.get("as_of") or market_time) | |
| elif isinstance(ps_data, (int, float)): | |
| await emit_metric(progress_callback, source, "P/S", ps_data, end_date=market_time) | |
| # EV/EBITDA | |
| ev_data = yf_data.get("ev_ebitda") or av_data.get("ev_ebitda") or {} | |
| if isinstance(ev_data, dict) and ev_data.get("value") is not None: | |
| await emit_metric(progress_callback, source, "EV/EBITDA", ev_data["value"], end_date=ev_data.get("as_of") or market_time) | |
| elif isinstance(ev_data, (int, float)): | |
| await emit_metric(progress_callback, source, "EV/EBITDA", ev_data, end_date=market_time) | |
| elif source == "news": | |
| # News-basket returns source-keyed structure: {"tavily": [...], "nyt": [...], "newsapi": [...]} | |
| total_items = 0 | |
| for news_source in ["tavily", "nyt", "newsapi"]: | |
| items = result.get(news_source) or [] | |
| if isinstance(items, list): | |
| total_items += len(items) | |
| if total_items > 0: | |
| await emit_metric(progress_callback, source, "items_found", total_items) | |
| else: | |
| await emit_metric(progress_callback, source, "status", "No recent news found") | |
| elif source == "sentiment": | |
| # Sentiment-basket returns source-keyed structure: {"finnhub": [...], "reddit": [...]} | |
| total_items = 0 | |
| for sent_source in ["finnhub", "reddit"]: | |
| items = result.get(sent_source) or [] | |
| if isinstance(items, list): | |
| total_items += len(items) | |
| if total_items > 0: | |
| await emit_metric(progress_callback, source, "items_found", total_items) | |
| else: | |
| await emit_metric(progress_callback, source, "status", "No sentiment content found") | |
| def _has_metric(data: dict, field: str) -> bool: | |
| """Check if metric exists in possibly nested structure.""" | |
| if not isinstance(data, dict): | |
| return False | |
| if field in data: | |
| val = data[field] | |
| if isinstance(val, dict): | |
| return val.get("value") is not None | |
| # Special case for items (list) - news and sentiment | |
| if isinstance(val, list): | |
| return len(val) > 0 | |
| return val is not None | |
| # Check common nested paths | |
| for key in ["data", "metrics", "sec_edgar", "yahoo_finance"]: | |
| if key in data and isinstance(data[key], dict): | |
| if field in data[key]: | |
| return True | |
| return False | |
| def _calculate_completeness(metrics: dict, sources_available: list) -> dict: | |
| """Calculate completeness score and identify missing data.""" | |
| required = { | |
| "fundamentals": ["revenue", "net_income", "eps", "debt_to_equity"], | |
| "valuation": ["trailing_pe", "pb_ratio", "ps_ratio"], | |
| "volatility": ["beta", "vix"], | |
| "macro": ["gdp_growth", "interest_rate", "cpi_inflation"], | |
| "news": ["items"], | |
| "sentiment": ["items"] | |
| } | |
| total = 0 | |
| found = 0 | |
| missing = {} | |
| for source, fields in required.items(): | |
| source_data = metrics.get(source, {}) | |
| missing[source] = [] | |
| for field in fields: | |
| total += 1 | |
| if _has_metric(source_data, field): | |
| found += 1 | |
| else: | |
| missing[source].append(field) | |
| return { | |
| "completeness_pct": round(found / total * 100, 1) if total > 0 else 0, | |
| "metrics_found": found, | |
| "metrics_total": total, | |
| "missing": {k: v for k, v in missing.items() if v} | |
| } | |
| def _aggregate_swot(metrics: dict, sources_available: list) -> dict: | |
| """Aggregate SWOT summaries from all MCP sources.""" | |
| aggregated_swot = { | |
| "strengths": [], | |
| "weaknesses": [], | |
| "opportunities": [], | |
| "threats": [] | |
| } | |
| for source in sources_available: | |
| source_data = metrics.get(source, {}) | |
| swot = source_data.get("swot_summary", {}) | |
| for category in aggregated_swot: | |
| items = swot.get(category, []) | |
| if items: | |
| aggregated_swot[category].extend(items) | |
| return aggregated_swot | |
| def _sort_and_limit_news(news_data: dict, limit: int = 10) -> dict: | |
| """Sort news items by date (most recent first) and limit to top N.""" | |
| if not news_data or "items" not in news_data: | |
| return news_data | |
| items = news_data.get("items", []) | |
| # Sort by datetime descending (most recent first) | |
| def get_date(item): | |
| date_str = item.get("datetime") or "" | |
| return date_str if date_str else "1970-01-01" | |
| sorted_items = sorted(items, key=get_date, reverse=True) | |
| # Limit to top N | |
| news_data["items"] = sorted_items[:limit] | |
| news_data["total_items"] = len(items) | |
| news_data["showing"] = min(limit, len(items)) | |
| return news_data | |
| def _sort_and_limit_sentiment(sentiment_data: dict, limit: int = 10) -> dict: | |
| """Sort sentiment items by date (most recent first) and limit to top N.""" | |
| if not sentiment_data or "items" not in sentiment_data: | |
| return sentiment_data | |
| items = sentiment_data.get("items", []) | |
| # Sort by datetime descending (most recent first) | |
| def get_date(item): | |
| return item.get("datetime") or "1970-01-01" | |
| sorted_items = sorted(items, key=get_date, reverse=True) | |
| # Limit to top N | |
| sentiment_data["items"] = sorted_items[:limit] | |
| sentiment_data["total_items"] = len(items) | |
| sentiment_data["showing"] = min(limit, len(items)) | |
| return sentiment_data | |
| def _add_conflict_markers(fundamentals_all: dict, valuation_all: dict) -> dict: | |
| """ | |
| Add conflict resolution markers to multi-source data. | |
| Primary sources: SEC EDGAR (fundamentals), Yahoo Finance (valuation) | |
| """ | |
| conflict_resolution = { | |
| "fundamentals": { | |
| "primary_source": "SEC EDGAR XBRL", | |
| "secondary_source": "Yahoo Finance", | |
| "conflicts": [] | |
| }, | |
| "valuation": { | |
| "primary_source": "Yahoo Finance", | |
| "secondary_source": "Alpha Vantage", | |
| "conflicts": [] | |
| } | |
| } | |
| # Check fundamentals for conflicts | |
| if fundamentals_all and "sec_edgar" in fundamentals_all and "yahoo_finance" in fundamentals_all: | |
| sec_data = fundamentals_all.get("sec_edgar", {}).get("data", {}) | |
| yf_data = fundamentals_all.get("yahoo_finance", {}).get("data", {}) | |
| for metric in ["revenue", "net_income", "free_cash_flow"]: | |
| sec_val = sec_data.get(metric, {}) | |
| yf_val = yf_data.get(metric, {}) | |
| if isinstance(sec_val, dict): | |
| sec_val = sec_val.get("value") | |
| if isinstance(yf_val, dict): | |
| yf_val = yf_val.get("value") | |
| if isinstance(yf_val, dict): | |
| yf_val = yf_val.get("value") | |
| if sec_val and yf_val and sec_val != yf_val: | |
| conflict_resolution["fundamentals"]["conflicts"].append({ | |
| "metric": metric, | |
| "primary_value": sec_val, | |
| "secondary_value": yf_val, | |
| "used": "primary" | |
| }) | |
| # Check valuation for conflicts | |
| if valuation_all and "yahoo_finance" in valuation_all and "alpha_vantage" in valuation_all: | |
| yf_data = valuation_all.get("yahoo_finance", {}).get("data", {}) | |
| av_data = valuation_all.get("alpha_vantage", {}).get("data", {}) | |
| for metric in ["trailing_pe", "forward_pe", "pb_ratio", "ps_ratio"]: | |
| yf_val = yf_data.get(metric) | |
| av_val = av_data.get(metric) | |
| if yf_val and av_val and abs(yf_val - av_val) > 0.5: | |
| conflict_resolution["valuation"]["conflicts"].append({ | |
| "metric": metric, | |
| "primary_value": yf_val, | |
| "secondary_value": av_val, | |
| "used": "primary" | |
| }) | |
| return conflict_resolution | |
| async def fetch_all_research_data( | |
| ticker: str, | |
| company_name: str, | |
| progress_callback: Optional[Callable] = None | |
| ) -> dict: | |
| """ | |
| Fetch data from 6 MCP servers SEQUENTIALLY using TRUE MCP protocol. | |
| Only calls multi-source (_all) versions to avoid duplicate API calls. | |
| Order: fundamentals -> valuation -> volatility -> macro -> news -> sentiment | |
| Args: | |
| ticker: Stock ticker symbol | |
| company_name: Company name | |
| progress_callback: Optional callback for granular metric events | |
| Returns aggregated results with sources_available, sources_failed, and aggregated_swot. | |
| """ | |
| logger.info(f"Fetching from MCP servers for {ticker} ({company_name})...") | |
| # Sequential order: critical data first | |
| mcp_sequence = [ | |
| ("fundamentals", lambda: call_fundamentals_all_sources_mcp(ticker)), | |
| ("valuation", lambda: call_valuation_all_sources_mcp(ticker)), | |
| ("volatility", lambda: call_volatility_all_sources_mcp(ticker)), | |
| ("macro", lambda: call_macro_all_sources_mcp()), | |
| ("news", lambda: call_news_mcp(ticker, company_name)), | |
| ("sentiment", lambda: call_sentiment_mcp(ticker, company_name)), | |
| ] | |
| # Normalizers to convert MCP schemas to analyzer-expected format | |
| normalizers = { | |
| "fundamentals": _normalize_fundamentals, | |
| "valuation": _normalize_valuation, | |
| "volatility": _normalize_volatility, | |
| "macro": _normalize_macro, | |
| } | |
| metrics = {} | |
| sources_available = [] | |
| sources_failed = [] | |
| # Sequential execution - one at a time | |
| for name, mcp_func in mcp_sequence: | |
| logger.info(f"Fetching {name}...") | |
| try: | |
| result = await mcp_func() | |
| if isinstance(result, dict) and "error" in result: | |
| # First attempt failed, retry once | |
| logger.warning(f"MCP {name} error, retrying: {result.get('error', 'Unknown')[:50]}") | |
| result = await mcp_func() | |
| if isinstance(result, dict) and "error" in result: | |
| sources_failed.append(name) | |
| metrics[name] = {**result, "retried": True} | |
| logger.warning(f"MCP {name} failed after retry: {result.get('error')}") | |
| else: | |
| # Apply normalizer if available | |
| if name in normalizers: | |
| result = normalizers[name](result) | |
| sources_available.append(name) | |
| metrics[name] = result | |
| logger.info(f"MCP {name} succeeded on retry") | |
| # Emit metrics for real-time streaming to frontend | |
| await _extract_and_emit_metrics(name, result, progress_callback) | |
| else: | |
| # Apply normalizer if available | |
| if name in normalizers: | |
| result = normalizers[name](result) | |
| sources_available.append(name) | |
| metrics[name] = result | |
| logger.info(f"MCP {name} fetched successfully") | |
| # Emit metrics for real-time streaming to frontend | |
| await _extract_and_emit_metrics(name, result, progress_callback) | |
| except Exception as e: | |
| # First attempt exception, retry once | |
| logger.warning(f"MCP {name} exception, retrying: {e}") | |
| try: | |
| result = await mcp_func() | |
| if isinstance(result, dict) and "error" not in result: | |
| # Apply normalizer if available | |
| if name in normalizers: | |
| result = normalizers[name](result) | |
| sources_available.append(name) | |
| metrics[name] = result | |
| logger.info(f"MCP {name} succeeded on retry") | |
| # Emit metrics for real-time streaming to frontend | |
| await _extract_and_emit_metrics(name, result, progress_callback) | |
| else: | |
| sources_failed.append(name) | |
| metrics[name] = {"error": str(result.get("error", e)), "retried": True} | |
| logger.warning(f"MCP {name} failed after retry") | |
| except Exception as e2: | |
| sources_failed.append(name) | |
| metrics[name] = {"error": str(e2), "retried": True} | |
| logger.warning(f"MCP {name} failed after retry: {e2}") | |
| # Apply sorting and limiting to news (top 10, most recent first) | |
| if "news" in metrics and "error" not in metrics.get("news", {}): | |
| metrics["news"] = _sort_and_limit_news(metrics["news"], limit=10) | |
| # Apply sorting and limiting to sentiment (top 10 articles/posts, most recent first) | |
| if "sentiment" in metrics and "error" not in metrics.get("sentiment", {}): | |
| metrics["sentiment"] = _sort_and_limit_sentiment(metrics["sentiment"], limit=10) | |
| # Get multi-source data (now stored directly under source name) | |
| fundamentals_data = metrics.get("fundamentals", {}) | |
| valuation_data = metrics.get("valuation", {}) | |
| macro_data = metrics.get("macro", {}) | |
| volatility_data = metrics.get("volatility", {}) | |
| # Add conflict resolution markers | |
| conflict_resolution = _add_conflict_markers(fundamentals_data, valuation_data) | |
| # Build aggregated SWOT from primary source data | |
| aggregated_swot = _aggregate_swot(metrics, sources_available) | |
| # Calculate completeness score | |
| completeness = _calculate_completeness(metrics, sources_available) | |
| # Final data package - shared with analyzer only after all collection complete | |
| data = { | |
| "ticker": ticker.upper(), | |
| "company_name": company_name, | |
| "sources_available": sources_available, | |
| "sources_failed": sources_failed, | |
| "metrics": metrics, | |
| "multi_source": { | |
| "fundamentals_all": fundamentals_data, | |
| "valuation_all": valuation_data, | |
| "macro_all": macro_data, | |
| "volatility_all": volatility_data, | |
| }, | |
| "conflict_resolution": conflict_resolution, | |
| "aggregated_swot": aggregated_swot, | |
| "completeness": completeness, | |
| "generated_at": datetime.now().isoformat() | |
| } | |
| logger.info(f"Research complete: {len(sources_available)} sources, {len(sources_failed)} failed, {completeness['completeness_pct']}% complete") | |
| return data | |