""" MCP Client - TRUE MCP protocol via subprocess stdio. Implements proper MCP handshake: 1. Send 'initialize' request 2. Receive initialization response 3. Send 'initialized' notification 4. Send 'tools/call' request 5. Parse response Also supports HTTP-based load-balanced calls for fundamentals-basket. """ import asyncio import json import os import logging from pathlib import Path from datetime import datetime from typing import Optional, Callable, Any import httpx logger = logging.getLogger(__name__) # Base path for MCP servers MCP_SERVERS_PATH = Path(__file__).parent / "mcp-servers" # Configurable delay for granular progress events (ms) # Set to 0 for completeness-first mode (no artificial UI delays) METRIC_DELAY_MS = int(os.getenv("METRIC_DELAY_MS", "0")) # ============================================================================= # HTTP LOAD BALANCER CONFIGURATION # ============================================================================= # Financials HTTP load balancer URL (nginx on port 8080) FINANCIALS_HTTP_URL = os.getenv("FINANCIALS_HTTP_URL", "http://localhost:8080") # Toggle HTTP mode (set to "false" to use subprocess MCP) USE_HTTP_FINANCIALS = os.getenv("USE_HTTP_FINANCIALS", "false").lower() == "true" # HTTP client timeout (increased for completeness-first mode) HTTP_TIMEOUT = float(os.getenv("HTTP_TIMEOUT", "90.0")) # ============================================================================= # HTTP CLIENT FOR LOAD-BALANCED CALLS # ============================================================================= async def call_fundamentals_http(tool_name: str, arguments: dict, timeout: float = None) -> dict: """ Call fundamentals-basket via HTTP load balancer (nginx). This bypasses MCP subprocess spawning for better performance. Requires the HTTP cluster to be running (./start_cluster.sh). Args: tool_name: Name of the tool (e.g., 'get_sec_fundamentals') arguments: Tool arguments dict timeout: Request timeout in seconds Returns: Tool result dict or error dict """ timeout = timeout or HTTP_TIMEOUT url = f"{FINANCIALS_HTTP_URL}/tools/{tool_name}" try: async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post(url, json=arguments) response.raise_for_status() return response.json() except httpx.TimeoutException: logger.error(f"HTTP timeout calling {tool_name}: {timeout}s") return {"error": f"HTTP timeout after {timeout}s", "tool": tool_name} except httpx.HTTPStatusError as e: logger.error(f"HTTP error calling {tool_name}: {e.response.status_code}") return {"error": f"HTTP {e.response.status_code}", "tool": tool_name} except httpx.ConnectError: logger.warning(f"HTTP connection failed for {tool_name}, falling back to subprocess") # Fall back to subprocess MCP if HTTP cluster is not running return await call_mcp_server("fundamentals-basket", tool_name, arguments, timeout) except Exception as e: logger.error(f"HTTP error calling {tool_name}: {e}") return {"error": str(e), "tool": tool_name} async def check_fundamentals_http_health() -> bool: """ Check if the fundamentals HTTP cluster is healthy. Returns: True if cluster is responding, False otherwise """ try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(f"{FINANCIALS_HTTP_URL}/health") return response.status_code == 200 except Exception: return False async def emit_metric( progress_callback: Optional[Callable], source: str, metric: str, value: Any, end_date: str = None, fiscal_year: int = None, form: str = None ): """Emit a metric event as a structured payload with optional temporal data.""" if progress_callback: payload = { "source": source, "metric": metric, "value": value, "end_date": end_date, "fiscal_year": fiscal_year, "form": form, } logger.debug(f"emit_metric payload: {json.dumps(payload, default=str)}") progress_callback(payload) await asyncio.sleep(METRIC_DELAY_MS / 1000) async def call_mcp_server( server_name: str, tool_name: str, arguments: dict, timeout: float = 90.0 ) -> dict: """ Call an MCP server tool via subprocess stdio using proper MCP protocol sequencing. Protocol sequence: 1. Send initialize request -> wait for response (id=1) 2. Send initialized notification 3. Send tools/call request -> wait for response (id=2) 4. Clean up Args: server_name: Name of the MCP server directory (e.g., 'fundamentals-basket') tool_name: Name of the tool to call (e.g., 'get_sec_fundamentals') arguments: Dict of arguments to pass to the tool timeout: Total timeout in seconds (default 60s for external API calls) Returns: Dict with tool result or error """ server_path = MCP_SERVERS_PATH / server_name / "server.py" if not server_path.exists(): return {"error": f"MCP server not found: {server_name}"} process = None try: # Start the MCP server process process = await asyncio.create_subprocess_exec( "python3", str(server_path), stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(server_path.parent), env={**os.environ} ) async def send_message(msg: dict): """Send a JSON-RPC message to the server.""" data = json.dumps(msg) + "\n" process.stdin.write(data.encode()) await process.stdin.drain() async def read_response(expected_id: int, phase_timeout: float) -> dict: """Read and parse JSON-RPC response with expected id.""" buffer = "" start_time = asyncio.get_event_loop().time() while True: remaining = phase_timeout - (asyncio.get_event_loop().time() - start_time) if remaining <= 0: raise asyncio.TimeoutError(f"Timeout waiting for response id={expected_id}") try: line = await asyncio.wait_for( process.stdout.readline(), timeout=min(remaining, 5.0) # Check every 5s ) except asyncio.TimeoutError: continue # Keep trying until phase_timeout if not line: # EOF - server closed stdout raise EOFError(f"Server closed stdout before sending response id={expected_id}") line_str = line.decode().strip() if not line_str: continue # Try to parse as JSON # Handle case where line might contain non-JSON prefix (logs) json_start = line_str.find('{') if json_start == -1: continue try: response = json.loads(line_str[json_start:]) if isinstance(response, dict): # Check if this is the response we're waiting for if response.get("id") == expected_id: return response # Also check for error responses if "error" in response and response.get("id") == expected_id: return response except json.JSONDecodeError: # Might be partial JSON, accumulate in buffer buffer += line_str try: response = json.loads(buffer) if response.get("id") == expected_id: return response buffer = "" # Reset if we got valid JSON but wrong id except json.JSONDecodeError: pass # Keep accumulating # Phase 1: Initialize init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "research-service", "version": "1.0.0"} } } await send_message(init_request) init_response = await read_response(expected_id=1, phase_timeout=20.0) if "error" in init_response: return {"error": f"Initialize failed: {init_response['error']}"} # Phase 2: Send initialized notification (no response expected) initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized" } await send_message(initialized_notification) await asyncio.sleep(0.05) # Brief pause for server to process # Phase 3: Tool call tool_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": tool_name, "arguments": arguments} } await send_message(tool_request) tool_response = await read_response(expected_id=2, phase_timeout=timeout) # Process tool response if "error" in tool_response: return {"error": f"Tool call failed: {tool_response['error']}"} if "result" in tool_response: result = tool_response["result"] # MCP SDK format: {"content": [{"type": "text", "text": "..."}]} if isinstance(result, dict) and "content" in result: content_list = result.get("content", []) if content_list and isinstance(content_list, list): for content in content_list: if isinstance(content, dict) and content.get("type") == "text": try: return json.loads(content.get("text", "{}")) except json.JSONDecodeError: return {"raw_text": content.get("text", "")} return result return {"error": "No result in tool response"} except asyncio.TimeoutError as e: logger.warning(f"MCP {server_name} timeout: {e}") return {"error": f"Timeout: {e}"} except EOFError as e: logger.warning(f"MCP {server_name} EOF: {e}") return {"error": f"Server closed: {e}"} except Exception as e: logger.error(f"MCP {server_name} error: {e}") return {"error": str(e)} finally: # Clean up process if process: try: process.stdin.close() except: pass try: # Give process 2s to exit gracefully await asyncio.wait_for(process.wait(), timeout=2.0) except asyncio.TimeoutError: process.kill() await process.wait() # Log stderr if any try: stderr_data = await asyncio.wait_for(process.stderr.read(), timeout=1.0) if stderr_data: stderr_text = stderr_data.decode().strip() if stderr_text: logger.debug(f"MCP {server_name} stderr: {stderr_text[:500]}") except: pass async def call_fundamentals_mcp(ticker: str) -> dict: """ Fetch SEC fundamentals for a ticker. Uses HTTP load balancer if USE_HTTP_FINANCIALS=true, otherwise subprocess MCP. """ if USE_HTTP_FINANCIALS: return await call_fundamentals_http("get_sec_fundamentals", {"ticker": ticker}) return await call_mcp_server( "fundamentals-basket", "get_sec_fundamentals", {"ticker": ticker} ) async def call_fundamentals_all_sources_mcp(ticker: str) -> dict: """ Fetch fundamentals from ALL sources (SEC EDGAR + Yahoo Finance). Uses HTTP load balancer if USE_HTTP_FINANCIALS=true, otherwise subprocess MCP. """ if USE_HTTP_FINANCIALS: return await call_fundamentals_http("get_all_sources_fundamentals", {"ticker": ticker}) return await call_mcp_server( "fundamentals-basket", "get_all_sources_fundamentals", {"ticker": ticker} ) async def call_volatility_mcp(ticker: str) -> dict: """Fetch volatility metrics for a ticker.""" return await call_mcp_server( "volatility-basket", "get_volatility_basket", {"ticker": ticker} ) async def call_volatility_all_sources_mcp(ticker: str) -> dict: """Fetch volatility from ALL sources (Yahoo + Alpha Vantage + Tradier).""" return await call_mcp_server( "volatility-basket", "get_all_sources_volatility", {"ticker": ticker} ) async def call_macro_mcp() -> dict: """Fetch macroeconomic indicators.""" return await call_mcp_server( "macro-basket", "get_macro_basket", {} ) async def call_macro_all_sources_mcp() -> dict: """Fetch macro from ALL sources (BEA/BLS primary + FRED fallback).""" return await call_mcp_server( "macro-basket", "get_all_sources_macro", {} ) async def call_valuation_mcp(ticker: str) -> dict: """Fetch valuation ratios for a ticker.""" return await call_mcp_server( "valuation-basket", "get_valuation_basket", {"ticker": ticker} ) async def call_valuation_all_sources_mcp(ticker: str) -> dict: """Fetch valuation from ALL sources (Yahoo Finance + Alpha Vantage).""" return await call_mcp_server( "valuation-basket", "get_all_sources_valuation", {"ticker": ticker} ) async def call_news_mcp(ticker: str, company_name: str = "") -> dict: """Fetch news for a company.""" args = {"ticker": ticker} if company_name: args["company_name"] = company_name return await call_mcp_server( "news-basket", "get_all_sources_news", args ) async def call_sentiment_mcp(ticker: str, company_name: str = "") -> dict: """Fetch sentiment metrics for a ticker.""" return await call_mcp_server( "sentiment-basket", "get_sentiment_basket", {"ticker": ticker, "company_name": company_name} ) # ============================================================================= # SCHEMA NORMALIZERS # Convert MCP-emitted schemas to analyzer-expected format # ============================================================================= def _normalize_volatility(raw: dict) -> dict: """Pass-through: MCPs now emit {source: {data: ...}} directly.""" return raw def _normalize_macro(raw: dict) -> dict: """Pass-through: MCPs now emit {source: {data: ...}} directly.""" return raw def _normalize_valuation(raw: dict) -> dict: """Pass-through: MCPs now emit {source: {data: ...}} directly.""" return raw def _normalize_fundamentals(raw: dict) -> dict: """Pass-through: MCPs now emit {source: {data: ...}} directly.""" return raw def _get_nested_value(data: dict, *keys): """Safely get nested value from dict, returns None if not found.""" for key in keys: if not isinstance(data, dict): return None data = data.get(key) return data async def _extract_and_emit_metrics( source: str, result: dict, progress_callback: Optional[Callable] ) -> None: """Extract metrics from MCP result and emit via callback. Handles multi-source structures from _all_sources endpoints: - fundamentals: {"sec_edgar": {...}, "yahoo_finance": {...}} - valuation: {"yahoo_finance": {...}, "alpha_vantage": {...}} - volatility: {"yahoo_finance": {...}, "alpha_vantage": {...}, "market_volatility_context": {...}} - macro: {"bea_bls": {...}, "fred": {...}} """ if not progress_callback or not result or "error" in result: return if source == "fundamentals": # Multi-source structure (flattened): {"sec_edgar": {...}, "yahoo_finance": {...}} sec_data = result.get("sec_edgar") or {} yf_data = result.get("yahoo_finance") or {} # Revenue - prefer SEC EDGAR (primary source) revenue = sec_data.get("revenue") or yf_data.get("revenue") or {} if isinstance(revenue, dict) and revenue.get("value"): await emit_metric( progress_callback, source, "revenue", revenue["value"], end_date=revenue.get("end_date"), fiscal_year=revenue.get("fiscal_year"), form=revenue.get("form") ) elif isinstance(revenue, (int, float)): await emit_metric(progress_callback, source, "revenue", revenue) # Net margin net_margin = sec_data.get("net_margin_pct") or yf_data.get("net_margin_pct") or {} if isinstance(net_margin, dict) and net_margin.get("value") is not None: await emit_metric( progress_callback, source, "net_margin", net_margin["value"], end_date=net_margin.get("end_date"), fiscal_year=net_margin.get("fiscal_year"), form=net_margin.get("form") ) elif isinstance(net_margin, (int, float)): await emit_metric(progress_callback, source, "net_margin", net_margin) # EPS eps = sec_data.get("eps") or yf_data.get("eps") or {} if isinstance(eps, dict) and eps.get("value"): await emit_metric( progress_callback, source, "EPS", eps["value"], end_date=eps.get("end_date"), fiscal_year=eps.get("fiscal_year"), form=eps.get("form") ) elif isinstance(eps, (int, float)): await emit_metric(progress_callback, source, "EPS", eps) # Debt to Equity debt_to_equity = sec_data.get("debt_to_equity") or yf_data.get("debt_to_equity") if isinstance(debt_to_equity, dict) and debt_to_equity.get("value") is not None: await emit_metric( progress_callback, source, "debt_to_equity", debt_to_equity["value"], end_date=debt_to_equity.get("end_date"), fiscal_year=debt_to_equity.get("fiscal_year"), form=debt_to_equity.get("form") ) elif isinstance(debt_to_equity, (int, float)): await emit_metric(progress_callback, source, "debt_to_equity", debt_to_equity) elif source == "volatility": # Multi-source (flattened): {"fred": {...}, "yahoo_finance": {...}} fred_data = result.get("fred") or {} yf_data = result.get("yahoo_finance") or {} # VIX from FRED vix = fred_data.get("vix") or {} if isinstance(vix, dict) and vix.get("value") is not None: await emit_metric(progress_callback, source, "VIX", vix["value"], end_date=vix.get("as_of")) elif isinstance(vix, (int, float)): await emit_metric(progress_callback, source, "VIX", vix) # Beta from Yahoo Finance beta = yf_data.get("beta") or {} if isinstance(beta, dict) and beta.get("value") is not None: await emit_metric(progress_callback, source, "beta", beta["value"], end_date=beta.get("as_of")) elif isinstance(beta, (int, float)): await emit_metric(progress_callback, source, "beta", beta) # Historical Volatility from Yahoo Finance hist_vol = yf_data.get("historical_volatility") or {} if isinstance(hist_vol, dict) and hist_vol.get("value") is not None: await emit_metric(progress_callback, source, "hist_vol", hist_vol["value"], end_date=hist_vol.get("as_of")) elif isinstance(hist_vol, (int, float)): await emit_metric(progress_callback, source, "hist_vol", hist_vol) elif source == "macro": # Multi-source (flattened): {"bea": {...}, "bls": {...}, "fred": {...}} bea = result.get("bea") or {} bls = result.get("bls") or {} fred = result.get("fred") or {} # GDP Growth from BEA gdp = bea.get("gdp_growth") or {} if isinstance(gdp, dict) and gdp.get("value") is not None: await emit_metric(progress_callback, source, "GDP_growth", gdp["value"], end_date=gdp.get("as_of")) elif isinstance(gdp, (int, float)): await emit_metric(progress_callback, source, "GDP_growth", gdp) # Interest Rate from FRED interest = fred.get("interest_rate") or {} if isinstance(interest, dict) and interest.get("value") is not None: await emit_metric(progress_callback, source, "interest_rate", interest["value"], end_date=interest.get("as_of")) elif isinstance(interest, (int, float)): await emit_metric(progress_callback, source, "interest_rate", interest) # Inflation (CPI) from BLS inflation = bls.get("cpi_inflation") or {} if isinstance(inflation, dict) and inflation.get("value") is not None: await emit_metric(progress_callback, source, "inflation", inflation["value"], end_date=inflation.get("as_of")) elif isinstance(inflation, (int, float)): await emit_metric(progress_callback, source, "inflation", inflation) # Unemployment from BLS unemployment = bls.get("unemployment") or {} if isinstance(unemployment, dict) and unemployment.get("value") is not None: await emit_metric(progress_callback, source, "unemployment", unemployment["value"], end_date=unemployment.get("as_of")) elif isinstance(unemployment, (int, float)): await emit_metric(progress_callback, source, "unemployment", unemployment) elif source == "valuation": # Multi-source (flattened): {"yahoo_finance": {...}, "alpha_vantage": {...}} yf_data = result.get("yahoo_finance") or {} av_data = result.get("alpha_vantage") or {} # Use regular_market_time from yahoo_finance for timestamp market_time = yf_data.get("regular_market_time") # P/E Ratio - prefer Yahoo Finance (wrapped in {value, as_of}) pe_data = yf_data.get("trailing_pe") or av_data.get("trailing_pe") or {} if isinstance(pe_data, dict) and pe_data.get("value") is not None: await emit_metric(progress_callback, source, "P/E", pe_data["value"], end_date=pe_data.get("as_of") or market_time) elif isinstance(pe_data, (int, float)): await emit_metric(progress_callback, source, "P/E", pe_data, end_date=market_time) # P/B Ratio pb_data = yf_data.get("pb_ratio") or av_data.get("pb_ratio") or {} if isinstance(pb_data, dict) and pb_data.get("value") is not None: await emit_metric(progress_callback, source, "P/B", pb_data["value"], end_date=pb_data.get("as_of") or market_time) elif isinstance(pb_data, (int, float)): await emit_metric(progress_callback, source, "P/B", pb_data, end_date=market_time) # P/S Ratio ps_data = yf_data.get("ps_ratio") or av_data.get("ps_ratio") or {} if isinstance(ps_data, dict) and ps_data.get("value") is not None: await emit_metric(progress_callback, source, "P/S", ps_data["value"], end_date=ps_data.get("as_of") or market_time) elif isinstance(ps_data, (int, float)): await emit_metric(progress_callback, source, "P/S", ps_data, end_date=market_time) # EV/EBITDA ev_data = yf_data.get("ev_ebitda") or av_data.get("ev_ebitda") or {} if isinstance(ev_data, dict) and ev_data.get("value") is not None: await emit_metric(progress_callback, source, "EV/EBITDA", ev_data["value"], end_date=ev_data.get("as_of") or market_time) elif isinstance(ev_data, (int, float)): await emit_metric(progress_callback, source, "EV/EBITDA", ev_data, end_date=market_time) elif source == "news": # News-basket returns source-keyed structure: {"tavily": [...], "nyt": [...], "newsapi": [...]} total_items = 0 for news_source in ["tavily", "nyt", "newsapi"]: items = result.get(news_source) or [] if isinstance(items, list): total_items += len(items) if total_items > 0: await emit_metric(progress_callback, source, "items_found", total_items) else: await emit_metric(progress_callback, source, "status", "No recent news found") elif source == "sentiment": # Sentiment-basket returns source-keyed structure: {"finnhub": [...], "reddit": [...]} total_items = 0 for sent_source in ["finnhub", "reddit"]: items = result.get(sent_source) or [] if isinstance(items, list): total_items += len(items) if total_items > 0: await emit_metric(progress_callback, source, "items_found", total_items) else: await emit_metric(progress_callback, source, "status", "No sentiment content found") def _has_metric(data: dict, field: str) -> bool: """Check if metric exists in possibly nested structure.""" if not isinstance(data, dict): return False if field in data: val = data[field] if isinstance(val, dict): return val.get("value") is not None # Special case for items (list) - news and sentiment if isinstance(val, list): return len(val) > 0 return val is not None # Check common nested paths for key in ["data", "metrics", "sec_edgar", "yahoo_finance"]: if key in data and isinstance(data[key], dict): if field in data[key]: return True return False def _calculate_completeness(metrics: dict, sources_available: list) -> dict: """Calculate completeness score and identify missing data.""" required = { "fundamentals": ["revenue", "net_income", "eps", "debt_to_equity"], "valuation": ["trailing_pe", "pb_ratio", "ps_ratio"], "volatility": ["beta", "vix"], "macro": ["gdp_growth", "interest_rate", "cpi_inflation"], "news": ["items"], "sentiment": ["items"] } total = 0 found = 0 missing = {} for source, fields in required.items(): source_data = metrics.get(source, {}) missing[source] = [] for field in fields: total += 1 if _has_metric(source_data, field): found += 1 else: missing[source].append(field) return { "completeness_pct": round(found / total * 100, 1) if total > 0 else 0, "metrics_found": found, "metrics_total": total, "missing": {k: v for k, v in missing.items() if v} } def _aggregate_swot(metrics: dict, sources_available: list) -> dict: """Aggregate SWOT summaries from all MCP sources.""" aggregated_swot = { "strengths": [], "weaknesses": [], "opportunities": [], "threats": [] } for source in sources_available: source_data = metrics.get(source, {}) swot = source_data.get("swot_summary", {}) for category in aggregated_swot: items = swot.get(category, []) if items: aggregated_swot[category].extend(items) return aggregated_swot def _sort_and_limit_news(news_data: dict, limit: int = 10) -> dict: """Sort news items by date (most recent first) and limit to top N.""" if not news_data or "items" not in news_data: return news_data items = news_data.get("items", []) # Sort by datetime descending (most recent first) def get_date(item): date_str = item.get("datetime") or "" return date_str if date_str else "1970-01-01" sorted_items = sorted(items, key=get_date, reverse=True) # Limit to top N news_data["items"] = sorted_items[:limit] news_data["total_items"] = len(items) news_data["showing"] = min(limit, len(items)) return news_data def _sort_and_limit_sentiment(sentiment_data: dict, limit: int = 10) -> dict: """Sort sentiment items by date (most recent first) and limit to top N.""" if not sentiment_data or "items" not in sentiment_data: return sentiment_data items = sentiment_data.get("items", []) # Sort by datetime descending (most recent first) def get_date(item): return item.get("datetime") or "1970-01-01" sorted_items = sorted(items, key=get_date, reverse=True) # Limit to top N sentiment_data["items"] = sorted_items[:limit] sentiment_data["total_items"] = len(items) sentiment_data["showing"] = min(limit, len(items)) return sentiment_data def _add_conflict_markers(fundamentals_all: dict, valuation_all: dict) -> dict: """ Add conflict resolution markers to multi-source data. Primary sources: SEC EDGAR (fundamentals), Yahoo Finance (valuation) """ conflict_resolution = { "fundamentals": { "primary_source": "SEC EDGAR XBRL", "secondary_source": "Yahoo Finance", "conflicts": [] }, "valuation": { "primary_source": "Yahoo Finance", "secondary_source": "Alpha Vantage", "conflicts": [] } } # Check fundamentals for conflicts if fundamentals_all and "sec_edgar" in fundamentals_all and "yahoo_finance" in fundamentals_all: sec_data = fundamentals_all.get("sec_edgar", {}).get("data", {}) yf_data = fundamentals_all.get("yahoo_finance", {}).get("data", {}) for metric in ["revenue", "net_income", "free_cash_flow"]: sec_val = sec_data.get(metric, {}) yf_val = yf_data.get(metric, {}) if isinstance(sec_val, dict): sec_val = sec_val.get("value") if isinstance(yf_val, dict): yf_val = yf_val.get("value") if isinstance(yf_val, dict): yf_val = yf_val.get("value") if sec_val and yf_val and sec_val != yf_val: conflict_resolution["fundamentals"]["conflicts"].append({ "metric": metric, "primary_value": sec_val, "secondary_value": yf_val, "used": "primary" }) # Check valuation for conflicts if valuation_all and "yahoo_finance" in valuation_all and "alpha_vantage" in valuation_all: yf_data = valuation_all.get("yahoo_finance", {}).get("data", {}) av_data = valuation_all.get("alpha_vantage", {}).get("data", {}) for metric in ["trailing_pe", "forward_pe", "pb_ratio", "ps_ratio"]: yf_val = yf_data.get(metric) av_val = av_data.get(metric) if yf_val and av_val and abs(yf_val - av_val) > 0.5: conflict_resolution["valuation"]["conflicts"].append({ "metric": metric, "primary_value": yf_val, "secondary_value": av_val, "used": "primary" }) return conflict_resolution async def fetch_all_research_data( ticker: str, company_name: str, progress_callback: Optional[Callable] = None ) -> dict: """ Fetch data from 6 MCP servers SEQUENTIALLY using TRUE MCP protocol. Only calls multi-source (_all) versions to avoid duplicate API calls. Order: fundamentals -> valuation -> volatility -> macro -> news -> sentiment Args: ticker: Stock ticker symbol company_name: Company name progress_callback: Optional callback for granular metric events Returns aggregated results with sources_available, sources_failed, and aggregated_swot. """ logger.info(f"Fetching from MCP servers for {ticker} ({company_name})...") # Sequential order: critical data first mcp_sequence = [ ("fundamentals", lambda: call_fundamentals_all_sources_mcp(ticker)), ("valuation", lambda: call_valuation_all_sources_mcp(ticker)), ("volatility", lambda: call_volatility_all_sources_mcp(ticker)), ("macro", lambda: call_macro_all_sources_mcp()), ("news", lambda: call_news_mcp(ticker, company_name)), ("sentiment", lambda: call_sentiment_mcp(ticker, company_name)), ] # Normalizers to convert MCP schemas to analyzer-expected format normalizers = { "fundamentals": _normalize_fundamentals, "valuation": _normalize_valuation, "volatility": _normalize_volatility, "macro": _normalize_macro, } metrics = {} sources_available = [] sources_failed = [] # Sequential execution - one at a time for name, mcp_func in mcp_sequence: logger.info(f"Fetching {name}...") try: result = await mcp_func() if isinstance(result, dict) and "error" in result: # First attempt failed, retry once logger.warning(f"MCP {name} error, retrying: {result.get('error', 'Unknown')[:50]}") result = await mcp_func() if isinstance(result, dict) and "error" in result: sources_failed.append(name) metrics[name] = {**result, "retried": True} logger.warning(f"MCP {name} failed after retry: {result.get('error')}") else: # Apply normalizer if available if name in normalizers: result = normalizers[name](result) sources_available.append(name) metrics[name] = result logger.info(f"MCP {name} succeeded on retry") # Emit metrics for real-time streaming to frontend await _extract_and_emit_metrics(name, result, progress_callback) else: # Apply normalizer if available if name in normalizers: result = normalizers[name](result) sources_available.append(name) metrics[name] = result logger.info(f"MCP {name} fetched successfully") # Emit metrics for real-time streaming to frontend await _extract_and_emit_metrics(name, result, progress_callback) except Exception as e: # First attempt exception, retry once logger.warning(f"MCP {name} exception, retrying: {e}") try: result = await mcp_func() if isinstance(result, dict) and "error" not in result: # Apply normalizer if available if name in normalizers: result = normalizers[name](result) sources_available.append(name) metrics[name] = result logger.info(f"MCP {name} succeeded on retry") # Emit metrics for real-time streaming to frontend await _extract_and_emit_metrics(name, result, progress_callback) else: sources_failed.append(name) metrics[name] = {"error": str(result.get("error", e)), "retried": True} logger.warning(f"MCP {name} failed after retry") except Exception as e2: sources_failed.append(name) metrics[name] = {"error": str(e2), "retried": True} logger.warning(f"MCP {name} failed after retry: {e2}") # Apply sorting and limiting to news (top 10, most recent first) if "news" in metrics and "error" not in metrics.get("news", {}): metrics["news"] = _sort_and_limit_news(metrics["news"], limit=10) # Apply sorting and limiting to sentiment (top 10 articles/posts, most recent first) if "sentiment" in metrics and "error" not in metrics.get("sentiment", {}): metrics["sentiment"] = _sort_and_limit_sentiment(metrics["sentiment"], limit=10) # Get multi-source data (now stored directly under source name) fundamentals_data = metrics.get("fundamentals", {}) valuation_data = metrics.get("valuation", {}) macro_data = metrics.get("macro", {}) volatility_data = metrics.get("volatility", {}) # Add conflict resolution markers conflict_resolution = _add_conflict_markers(fundamentals_data, valuation_data) # Build aggregated SWOT from primary source data aggregated_swot = _aggregate_swot(metrics, sources_available) # Calculate completeness score completeness = _calculate_completeness(metrics, sources_available) # Final data package - shared with analyzer only after all collection complete data = { "ticker": ticker.upper(), "company_name": company_name, "sources_available": sources_available, "sources_failed": sources_failed, "metrics": metrics, "multi_source": { "fundamentals_all": fundamentals_data, "valuation_all": valuation_data, "macro_all": macro_data, "volatility_all": volatility_data, }, "conflict_resolution": conflict_resolution, "aggregated_swot": aggregated_swot, "completeness": completeness, "generated_at": datetime.now().isoformat() } logger.info(f"Research complete: {len(sources_available)} sources, {len(sources_failed)} failed, {completeness['completeness_pct']}% complete") return data