Spaces:
Sleeping
Sleeping
File size: 37,786 Bytes
d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 f286eb6 e7b767d d66c6c9 e7b767d d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f 50c0d03 26c5c2f 50c0d03 26c5c2f 50c0d03 26c5c2f 50c0d03 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f 287f0b8 7de6ec4 26c5c2f 2901968 f286eb6 26c5c2f fbe5ac1 26c5c2f fbe5ac1 2901968 fbe5ac1 26c5c2f 2901968 f286eb6 26c5c2f fbe5ac1 26c5c2f fbe5ac1 2901968 d66c6c9 287f0b8 2901968 287f0b8 26c5c2f 287f0b8 26c5c2f 8b9a7b7 26c5c2f 287f0b8 2901968 8b9a7b7 26c5c2f d66c6c9 287f0b8 26c5c2f 287f0b8 2901968 287f0b8 26c5c2f 287f0b8 26c5c2f 2901968 287f0b8 26c5c2f 287f0b8 2901968 287f0b8 26c5c2f 287f0b8 2901968 287f0b8 26c5c2f d66c6c9 287f0b8 26c5c2f 287f0b8 26c5c2f 287f0b8 26c5c2f 287f0b8 d66c6c9 287f0b8 584203d d66c6c9 287f0b8 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 26c5c2f d66c6c9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 | """
MCP Client - TRUE MCP protocol via subprocess stdio.
Implements proper MCP handshake:
1. Send 'initialize' request
2. Receive initialization response
3. Send 'initialized' notification
4. Send 'tools/call' request
5. Parse response
Also supports HTTP-based load-balanced calls for fundamentals-basket.
"""
import asyncio
import json
import os
import logging
from pathlib import Path
from datetime import datetime
from typing import Optional, Callable, Any
import httpx
logger = logging.getLogger(__name__)
# Base path for MCP servers
MCP_SERVERS_PATH = Path(__file__).parent / "mcp-servers"
# Configurable delay for granular progress events (ms)
# Set to 0 for completeness-first mode (no artificial UI delays)
METRIC_DELAY_MS = int(os.getenv("METRIC_DELAY_MS", "0"))
# =============================================================================
# HTTP LOAD BALANCER CONFIGURATION
# =============================================================================
# Financials HTTP load balancer URL (nginx on port 8080)
FINANCIALS_HTTP_URL = os.getenv("FINANCIALS_HTTP_URL", "http://localhost:8080")
# Toggle HTTP mode (set to "false" to use subprocess MCP)
USE_HTTP_FINANCIALS = os.getenv("USE_HTTP_FINANCIALS", "false").lower() == "true"
# HTTP client timeout (increased for completeness-first mode)
HTTP_TIMEOUT = float(os.getenv("HTTP_TIMEOUT", "90.0"))
# =============================================================================
# HTTP CLIENT FOR LOAD-BALANCED CALLS
# =============================================================================
async def call_fundamentals_http(tool_name: str, arguments: dict, timeout: float = None) -> dict:
"""
Call fundamentals-basket via HTTP load balancer (nginx).
This bypasses MCP subprocess spawning for better performance.
Requires the HTTP cluster to be running (./start_cluster.sh).
Args:
tool_name: Name of the tool (e.g., 'get_sec_fundamentals')
arguments: Tool arguments dict
timeout: Request timeout in seconds
Returns:
Tool result dict or error dict
"""
timeout = timeout or HTTP_TIMEOUT
url = f"{FINANCIALS_HTTP_URL}/tools/{tool_name}"
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, json=arguments)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
logger.error(f"HTTP timeout calling {tool_name}: {timeout}s")
return {"error": f"HTTP timeout after {timeout}s", "tool": tool_name}
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error calling {tool_name}: {e.response.status_code}")
return {"error": f"HTTP {e.response.status_code}", "tool": tool_name}
except httpx.ConnectError:
logger.warning(f"HTTP connection failed for {tool_name}, falling back to subprocess")
# Fall back to subprocess MCP if HTTP cluster is not running
return await call_mcp_server("fundamentals-basket", tool_name, arguments, timeout)
except Exception as e:
logger.error(f"HTTP error calling {tool_name}: {e}")
return {"error": str(e), "tool": tool_name}
async def check_fundamentals_http_health() -> bool:
"""
Check if the fundamentals HTTP cluster is healthy.
Returns:
True if cluster is responding, False otherwise
"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{FINANCIALS_HTTP_URL}/health")
return response.status_code == 200
except Exception:
return False
async def emit_metric(
progress_callback: Optional[Callable],
source: str,
metric: str,
value: Any,
end_date: str = None,
fiscal_year: int = None,
form: str = None
):
"""Emit a metric event as a structured payload with optional temporal data."""
if progress_callback:
payload = {
"source": source,
"metric": metric,
"value": value,
"end_date": end_date,
"fiscal_year": fiscal_year,
"form": form,
}
logger.debug(f"emit_metric payload: {json.dumps(payload, default=str)}")
progress_callback(payload)
await asyncio.sleep(METRIC_DELAY_MS / 1000)
async def call_mcp_server(
server_name: str,
tool_name: str,
arguments: dict,
timeout: float = 90.0
) -> dict:
"""
Call an MCP server tool via subprocess stdio using proper MCP protocol sequencing.
Protocol sequence:
1. Send initialize request -> wait for response (id=1)
2. Send initialized notification
3. Send tools/call request -> wait for response (id=2)
4. Clean up
Args:
server_name: Name of the MCP server directory (e.g., 'fundamentals-basket')
tool_name: Name of the tool to call (e.g., 'get_sec_fundamentals')
arguments: Dict of arguments to pass to the tool
timeout: Total timeout in seconds (default 60s for external API calls)
Returns:
Dict with tool result or error
"""
server_path = MCP_SERVERS_PATH / server_name / "server.py"
if not server_path.exists():
return {"error": f"MCP server not found: {server_name}"}
process = None
try:
# Start the MCP server process
process = await asyncio.create_subprocess_exec(
"python3", str(server_path),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(server_path.parent),
env={**os.environ}
)
async def send_message(msg: dict):
"""Send a JSON-RPC message to the server."""
data = json.dumps(msg) + "\n"
process.stdin.write(data.encode())
await process.stdin.drain()
async def read_response(expected_id: int, phase_timeout: float) -> dict:
"""Read and parse JSON-RPC response with expected id."""
buffer = ""
start_time = asyncio.get_event_loop().time()
while True:
remaining = phase_timeout - (asyncio.get_event_loop().time() - start_time)
if remaining <= 0:
raise asyncio.TimeoutError(f"Timeout waiting for response id={expected_id}")
try:
line = await asyncio.wait_for(
process.stdout.readline(),
timeout=min(remaining, 5.0) # Check every 5s
)
except asyncio.TimeoutError:
continue # Keep trying until phase_timeout
if not line:
# EOF - server closed stdout
raise EOFError(f"Server closed stdout before sending response id={expected_id}")
line_str = line.decode().strip()
if not line_str:
continue
# Try to parse as JSON
# Handle case where line might contain non-JSON prefix (logs)
json_start = line_str.find('{')
if json_start == -1:
continue
try:
response = json.loads(line_str[json_start:])
if isinstance(response, dict):
# Check if this is the response we're waiting for
if response.get("id") == expected_id:
return response
# Also check for error responses
if "error" in response and response.get("id") == expected_id:
return response
except json.JSONDecodeError:
# Might be partial JSON, accumulate in buffer
buffer += line_str
try:
response = json.loads(buffer)
if response.get("id") == expected_id:
return response
buffer = "" # Reset if we got valid JSON but wrong id
except json.JSONDecodeError:
pass # Keep accumulating
# Phase 1: Initialize
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "research-service", "version": "1.0.0"}
}
}
await send_message(init_request)
init_response = await read_response(expected_id=1, phase_timeout=20.0)
if "error" in init_response:
return {"error": f"Initialize failed: {init_response['error']}"}
# Phase 2: Send initialized notification (no response expected)
initialized_notification = {
"jsonrpc": "2.0",
"method": "notifications/initialized"
}
await send_message(initialized_notification)
await asyncio.sleep(0.05) # Brief pause for server to process
# Phase 3: Tool call
tool_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments}
}
await send_message(tool_request)
tool_response = await read_response(expected_id=2, phase_timeout=timeout)
# Process tool response
if "error" in tool_response:
return {"error": f"Tool call failed: {tool_response['error']}"}
if "result" in tool_response:
result = tool_response["result"]
# MCP SDK format: {"content": [{"type": "text", "text": "..."}]}
if isinstance(result, dict) and "content" in result:
content_list = result.get("content", [])
if content_list and isinstance(content_list, list):
for content in content_list:
if isinstance(content, dict) and content.get("type") == "text":
try:
return json.loads(content.get("text", "{}"))
except json.JSONDecodeError:
return {"raw_text": content.get("text", "")}
return result
return {"error": "No result in tool response"}
except asyncio.TimeoutError as e:
logger.warning(f"MCP {server_name} timeout: {e}")
return {"error": f"Timeout: {e}"}
except EOFError as e:
logger.warning(f"MCP {server_name} EOF: {e}")
return {"error": f"Server closed: {e}"}
except Exception as e:
logger.error(f"MCP {server_name} error: {e}")
return {"error": str(e)}
finally:
# Clean up process
if process:
try:
process.stdin.close()
except:
pass
try:
# Give process 2s to exit gracefully
await asyncio.wait_for(process.wait(), timeout=2.0)
except asyncio.TimeoutError:
process.kill()
await process.wait()
# Log stderr if any
try:
stderr_data = await asyncio.wait_for(process.stderr.read(), timeout=1.0)
if stderr_data:
stderr_text = stderr_data.decode().strip()
if stderr_text:
logger.debug(f"MCP {server_name} stderr: {stderr_text[:500]}")
except:
pass
async def call_fundamentals_mcp(ticker: str) -> dict:
"""
Fetch SEC fundamentals for a ticker.
Uses HTTP load balancer if USE_HTTP_FINANCIALS=true, otherwise subprocess MCP.
"""
if USE_HTTP_FINANCIALS:
return await call_fundamentals_http("get_sec_fundamentals", {"ticker": ticker})
return await call_mcp_server(
"fundamentals-basket",
"get_sec_fundamentals",
{"ticker": ticker}
)
async def call_fundamentals_all_sources_mcp(ticker: str) -> dict:
"""
Fetch fundamentals from ALL sources (SEC EDGAR + Yahoo Finance).
Uses HTTP load balancer if USE_HTTP_FINANCIALS=true, otherwise subprocess MCP.
"""
if USE_HTTP_FINANCIALS:
return await call_fundamentals_http("get_all_sources_fundamentals", {"ticker": ticker})
return await call_mcp_server(
"fundamentals-basket",
"get_all_sources_fundamentals",
{"ticker": ticker}
)
async def call_volatility_mcp(ticker: str) -> dict:
"""Fetch volatility metrics for a ticker."""
return await call_mcp_server(
"volatility-basket",
"get_volatility_basket",
{"ticker": ticker}
)
async def call_volatility_all_sources_mcp(ticker: str) -> dict:
"""Fetch volatility from ALL sources (Yahoo + Alpha Vantage + Tradier)."""
return await call_mcp_server(
"volatility-basket",
"get_all_sources_volatility",
{"ticker": ticker}
)
async def call_macro_mcp() -> dict:
"""Fetch macroeconomic indicators."""
return await call_mcp_server(
"macro-basket",
"get_macro_basket",
{}
)
async def call_macro_all_sources_mcp() -> dict:
"""Fetch macro from ALL sources (BEA/BLS primary + FRED fallback)."""
return await call_mcp_server(
"macro-basket",
"get_all_sources_macro",
{}
)
async def call_valuation_mcp(ticker: str) -> dict:
"""Fetch valuation ratios for a ticker."""
return await call_mcp_server(
"valuation-basket",
"get_valuation_basket",
{"ticker": ticker}
)
async def call_valuation_all_sources_mcp(ticker: str) -> dict:
"""Fetch valuation from ALL sources (Yahoo Finance + Alpha Vantage)."""
return await call_mcp_server(
"valuation-basket",
"get_all_sources_valuation",
{"ticker": ticker}
)
async def call_news_mcp(ticker: str, company_name: str = "") -> dict:
"""Fetch news for a company."""
args = {"ticker": ticker}
if company_name:
args["company_name"] = company_name
return await call_mcp_server(
"news-basket",
"get_all_sources_news",
args
)
async def call_sentiment_mcp(ticker: str, company_name: str = "") -> dict:
"""Fetch sentiment metrics for a ticker."""
return await call_mcp_server(
"sentiment-basket",
"get_sentiment_basket",
{"ticker": ticker, "company_name": company_name}
)
# =============================================================================
# SCHEMA NORMALIZERS
# Convert MCP-emitted schemas to analyzer-expected format
# =============================================================================
def _normalize_volatility(raw: dict) -> dict:
"""Pass-through: MCPs now emit {source: {data: ...}} directly."""
return raw
def _normalize_macro(raw: dict) -> dict:
"""Pass-through: MCPs now emit {source: {data: ...}} directly."""
return raw
def _normalize_valuation(raw: dict) -> dict:
"""Pass-through: MCPs now emit {source: {data: ...}} directly."""
return raw
def _normalize_fundamentals(raw: dict) -> dict:
"""Pass-through: MCPs now emit {source: {data: ...}} directly."""
return raw
def _get_nested_value(data: dict, *keys):
"""Safely get nested value from dict, returns None if not found."""
for key in keys:
if not isinstance(data, dict):
return None
data = data.get(key)
return data
async def _extract_and_emit_metrics(
source: str,
result: dict,
progress_callback: Optional[Callable]
) -> None:
"""Extract metrics from MCP result and emit via callback.
Handles multi-source structures from _all_sources endpoints:
- fundamentals: {"sec_edgar": {...}, "yahoo_finance": {...}}
- valuation: {"yahoo_finance": {...}, "alpha_vantage": {...}}
- volatility: {"yahoo_finance": {...}, "alpha_vantage": {...}, "market_volatility_context": {...}}
- macro: {"bea_bls": {...}, "fred": {...}}
"""
if not progress_callback or not result or "error" in result:
return
if source == "fundamentals":
# Multi-source structure (flattened): {"sec_edgar": {...}, "yahoo_finance": {...}}
sec_data = result.get("sec_edgar") or {}
yf_data = result.get("yahoo_finance") or {}
# Revenue - prefer SEC EDGAR (primary source)
revenue = sec_data.get("revenue") or yf_data.get("revenue") or {}
if isinstance(revenue, dict) and revenue.get("value"):
await emit_metric(
progress_callback, source, "revenue", revenue["value"],
end_date=revenue.get("end_date"),
fiscal_year=revenue.get("fiscal_year"),
form=revenue.get("form")
)
elif isinstance(revenue, (int, float)):
await emit_metric(progress_callback, source, "revenue", revenue)
# Net margin
net_margin = sec_data.get("net_margin_pct") or yf_data.get("net_margin_pct") or {}
if isinstance(net_margin, dict) and net_margin.get("value") is not None:
await emit_metric(
progress_callback, source, "net_margin", net_margin["value"],
end_date=net_margin.get("end_date"),
fiscal_year=net_margin.get("fiscal_year"),
form=net_margin.get("form")
)
elif isinstance(net_margin, (int, float)):
await emit_metric(progress_callback, source, "net_margin", net_margin)
# EPS
eps = sec_data.get("eps") or yf_data.get("eps") or {}
if isinstance(eps, dict) and eps.get("value"):
await emit_metric(
progress_callback, source, "EPS", eps["value"],
end_date=eps.get("end_date"),
fiscal_year=eps.get("fiscal_year"),
form=eps.get("form")
)
elif isinstance(eps, (int, float)):
await emit_metric(progress_callback, source, "EPS", eps)
# Debt to Equity
debt_to_equity = sec_data.get("debt_to_equity") or yf_data.get("debt_to_equity")
if isinstance(debt_to_equity, dict) and debt_to_equity.get("value") is not None:
await emit_metric(
progress_callback, source, "debt_to_equity", debt_to_equity["value"],
end_date=debt_to_equity.get("end_date"),
fiscal_year=debt_to_equity.get("fiscal_year"),
form=debt_to_equity.get("form")
)
elif isinstance(debt_to_equity, (int, float)):
await emit_metric(progress_callback, source, "debt_to_equity", debt_to_equity)
elif source == "volatility":
# Multi-source (flattened): {"fred": {...}, "yahoo_finance": {...}}
fred_data = result.get("fred") or {}
yf_data = result.get("yahoo_finance") or {}
# VIX from FRED
vix = fred_data.get("vix") or {}
if isinstance(vix, dict) and vix.get("value") is not None:
await emit_metric(progress_callback, source, "VIX", vix["value"], end_date=vix.get("as_of"))
elif isinstance(vix, (int, float)):
await emit_metric(progress_callback, source, "VIX", vix)
# Beta from Yahoo Finance
beta = yf_data.get("beta") or {}
if isinstance(beta, dict) and beta.get("value") is not None:
await emit_metric(progress_callback, source, "beta", beta["value"], end_date=beta.get("as_of"))
elif isinstance(beta, (int, float)):
await emit_metric(progress_callback, source, "beta", beta)
# Historical Volatility from Yahoo Finance
hist_vol = yf_data.get("historical_volatility") or {}
if isinstance(hist_vol, dict) and hist_vol.get("value") is not None:
await emit_metric(progress_callback, source, "hist_vol", hist_vol["value"], end_date=hist_vol.get("as_of"))
elif isinstance(hist_vol, (int, float)):
await emit_metric(progress_callback, source, "hist_vol", hist_vol)
elif source == "macro":
# Multi-source (flattened): {"bea": {...}, "bls": {...}, "fred": {...}}
bea = result.get("bea") or {}
bls = result.get("bls") or {}
fred = result.get("fred") or {}
# GDP Growth from BEA
gdp = bea.get("gdp_growth") or {}
if isinstance(gdp, dict) and gdp.get("value") is not None:
await emit_metric(progress_callback, source, "GDP_growth", gdp["value"], end_date=gdp.get("as_of"))
elif isinstance(gdp, (int, float)):
await emit_metric(progress_callback, source, "GDP_growth", gdp)
# Interest Rate from FRED
interest = fred.get("interest_rate") or {}
if isinstance(interest, dict) and interest.get("value") is not None:
await emit_metric(progress_callback, source, "interest_rate", interest["value"], end_date=interest.get("as_of"))
elif isinstance(interest, (int, float)):
await emit_metric(progress_callback, source, "interest_rate", interest)
# Inflation (CPI) from BLS
inflation = bls.get("cpi_inflation") or {}
if isinstance(inflation, dict) and inflation.get("value") is not None:
await emit_metric(progress_callback, source, "inflation", inflation["value"], end_date=inflation.get("as_of"))
elif isinstance(inflation, (int, float)):
await emit_metric(progress_callback, source, "inflation", inflation)
# Unemployment from BLS
unemployment = bls.get("unemployment") or {}
if isinstance(unemployment, dict) and unemployment.get("value") is not None:
await emit_metric(progress_callback, source, "unemployment", unemployment["value"], end_date=unemployment.get("as_of"))
elif isinstance(unemployment, (int, float)):
await emit_metric(progress_callback, source, "unemployment", unemployment)
elif source == "valuation":
# Multi-source (flattened): {"yahoo_finance": {...}, "alpha_vantage": {...}}
yf_data = result.get("yahoo_finance") or {}
av_data = result.get("alpha_vantage") or {}
# Use regular_market_time from yahoo_finance for timestamp
market_time = yf_data.get("regular_market_time")
# P/E Ratio - prefer Yahoo Finance (wrapped in {value, as_of})
pe_data = yf_data.get("trailing_pe") or av_data.get("trailing_pe") or {}
if isinstance(pe_data, dict) and pe_data.get("value") is not None:
await emit_metric(progress_callback, source, "P/E", pe_data["value"], end_date=pe_data.get("as_of") or market_time)
elif isinstance(pe_data, (int, float)):
await emit_metric(progress_callback, source, "P/E", pe_data, end_date=market_time)
# P/B Ratio
pb_data = yf_data.get("pb_ratio") or av_data.get("pb_ratio") or {}
if isinstance(pb_data, dict) and pb_data.get("value") is not None:
await emit_metric(progress_callback, source, "P/B", pb_data["value"], end_date=pb_data.get("as_of") or market_time)
elif isinstance(pb_data, (int, float)):
await emit_metric(progress_callback, source, "P/B", pb_data, end_date=market_time)
# P/S Ratio
ps_data = yf_data.get("ps_ratio") or av_data.get("ps_ratio") or {}
if isinstance(ps_data, dict) and ps_data.get("value") is not None:
await emit_metric(progress_callback, source, "P/S", ps_data["value"], end_date=ps_data.get("as_of") or market_time)
elif isinstance(ps_data, (int, float)):
await emit_metric(progress_callback, source, "P/S", ps_data, end_date=market_time)
# EV/EBITDA
ev_data = yf_data.get("ev_ebitda") or av_data.get("ev_ebitda") or {}
if isinstance(ev_data, dict) and ev_data.get("value") is not None:
await emit_metric(progress_callback, source, "EV/EBITDA", ev_data["value"], end_date=ev_data.get("as_of") or market_time)
elif isinstance(ev_data, (int, float)):
await emit_metric(progress_callback, source, "EV/EBITDA", ev_data, end_date=market_time)
elif source == "news":
# News-basket returns source-keyed structure: {"tavily": [...], "nyt": [...], "newsapi": [...]}
total_items = 0
for news_source in ["tavily", "nyt", "newsapi"]:
items = result.get(news_source) or []
if isinstance(items, list):
total_items += len(items)
if total_items > 0:
await emit_metric(progress_callback, source, "items_found", total_items)
else:
await emit_metric(progress_callback, source, "status", "No recent news found")
elif source == "sentiment":
# Sentiment-basket returns source-keyed structure: {"finnhub": [...], "reddit": [...]}
total_items = 0
for sent_source in ["finnhub", "reddit"]:
items = result.get(sent_source) or []
if isinstance(items, list):
total_items += len(items)
if total_items > 0:
await emit_metric(progress_callback, source, "items_found", total_items)
else:
await emit_metric(progress_callback, source, "status", "No sentiment content found")
def _has_metric(data: dict, field: str) -> bool:
"""Check if metric exists in possibly nested structure."""
if not isinstance(data, dict):
return False
if field in data:
val = data[field]
if isinstance(val, dict):
return val.get("value") is not None
# Special case for items (list) - news and sentiment
if isinstance(val, list):
return len(val) > 0
return val is not None
# Check common nested paths
for key in ["data", "metrics", "sec_edgar", "yahoo_finance"]:
if key in data and isinstance(data[key], dict):
if field in data[key]:
return True
return False
def _calculate_completeness(metrics: dict, sources_available: list) -> dict:
"""Calculate completeness score and identify missing data."""
required = {
"fundamentals": ["revenue", "net_income", "eps", "debt_to_equity"],
"valuation": ["trailing_pe", "pb_ratio", "ps_ratio"],
"volatility": ["beta", "vix"],
"macro": ["gdp_growth", "interest_rate", "cpi_inflation"],
"news": ["items"],
"sentiment": ["items"]
}
total = 0
found = 0
missing = {}
for source, fields in required.items():
source_data = metrics.get(source, {})
missing[source] = []
for field in fields:
total += 1
if _has_metric(source_data, field):
found += 1
else:
missing[source].append(field)
return {
"completeness_pct": round(found / total * 100, 1) if total > 0 else 0,
"metrics_found": found,
"metrics_total": total,
"missing": {k: v for k, v in missing.items() if v}
}
def _aggregate_swot(metrics: dict, sources_available: list) -> dict:
"""Aggregate SWOT summaries from all MCP sources."""
aggregated_swot = {
"strengths": [],
"weaknesses": [],
"opportunities": [],
"threats": []
}
for source in sources_available:
source_data = metrics.get(source, {})
swot = source_data.get("swot_summary", {})
for category in aggregated_swot:
items = swot.get(category, [])
if items:
aggregated_swot[category].extend(items)
return aggregated_swot
def _sort_and_limit_news(news_data: dict, limit: int = 10) -> dict:
"""Sort news items by date (most recent first) and limit to top N."""
if not news_data or "items" not in news_data:
return news_data
items = news_data.get("items", [])
# Sort by datetime descending (most recent first)
def get_date(item):
date_str = item.get("datetime") or ""
return date_str if date_str else "1970-01-01"
sorted_items = sorted(items, key=get_date, reverse=True)
# Limit to top N
news_data["items"] = sorted_items[:limit]
news_data["total_items"] = len(items)
news_data["showing"] = min(limit, len(items))
return news_data
def _sort_and_limit_sentiment(sentiment_data: dict, limit: int = 10) -> dict:
"""Sort sentiment items by date (most recent first) and limit to top N."""
if not sentiment_data or "items" not in sentiment_data:
return sentiment_data
items = sentiment_data.get("items", [])
# Sort by datetime descending (most recent first)
def get_date(item):
return item.get("datetime") or "1970-01-01"
sorted_items = sorted(items, key=get_date, reverse=True)
# Limit to top N
sentiment_data["items"] = sorted_items[:limit]
sentiment_data["total_items"] = len(items)
sentiment_data["showing"] = min(limit, len(items))
return sentiment_data
def _add_conflict_markers(fundamentals_all: dict, valuation_all: dict) -> dict:
"""
Add conflict resolution markers to multi-source data.
Primary sources: SEC EDGAR (fundamentals), Yahoo Finance (valuation)
"""
conflict_resolution = {
"fundamentals": {
"primary_source": "SEC EDGAR XBRL",
"secondary_source": "Yahoo Finance",
"conflicts": []
},
"valuation": {
"primary_source": "Yahoo Finance",
"secondary_source": "Alpha Vantage",
"conflicts": []
}
}
# Check fundamentals for conflicts
if fundamentals_all and "sec_edgar" in fundamentals_all and "yahoo_finance" in fundamentals_all:
sec_data = fundamentals_all.get("sec_edgar", {}).get("data", {})
yf_data = fundamentals_all.get("yahoo_finance", {}).get("data", {})
for metric in ["revenue", "net_income", "free_cash_flow"]:
sec_val = sec_data.get(metric, {})
yf_val = yf_data.get(metric, {})
if isinstance(sec_val, dict):
sec_val = sec_val.get("value")
if isinstance(yf_val, dict):
yf_val = yf_val.get("value")
if isinstance(yf_val, dict):
yf_val = yf_val.get("value")
if sec_val and yf_val and sec_val != yf_val:
conflict_resolution["fundamentals"]["conflicts"].append({
"metric": metric,
"primary_value": sec_val,
"secondary_value": yf_val,
"used": "primary"
})
# Check valuation for conflicts
if valuation_all and "yahoo_finance" in valuation_all and "alpha_vantage" in valuation_all:
yf_data = valuation_all.get("yahoo_finance", {}).get("data", {})
av_data = valuation_all.get("alpha_vantage", {}).get("data", {})
for metric in ["trailing_pe", "forward_pe", "pb_ratio", "ps_ratio"]:
yf_val = yf_data.get(metric)
av_val = av_data.get(metric)
if yf_val and av_val and abs(yf_val - av_val) > 0.5:
conflict_resolution["valuation"]["conflicts"].append({
"metric": metric,
"primary_value": yf_val,
"secondary_value": av_val,
"used": "primary"
})
return conflict_resolution
async def fetch_all_research_data(
ticker: str,
company_name: str,
progress_callback: Optional[Callable] = None
) -> dict:
"""
Fetch data from 6 MCP servers SEQUENTIALLY using TRUE MCP protocol.
Only calls multi-source (_all) versions to avoid duplicate API calls.
Order: fundamentals -> valuation -> volatility -> macro -> news -> sentiment
Args:
ticker: Stock ticker symbol
company_name: Company name
progress_callback: Optional callback for granular metric events
Returns aggregated results with sources_available, sources_failed, and aggregated_swot.
"""
logger.info(f"Fetching from MCP servers for {ticker} ({company_name})...")
# Sequential order: critical data first
mcp_sequence = [
("fundamentals", lambda: call_fundamentals_all_sources_mcp(ticker)),
("valuation", lambda: call_valuation_all_sources_mcp(ticker)),
("volatility", lambda: call_volatility_all_sources_mcp(ticker)),
("macro", lambda: call_macro_all_sources_mcp()),
("news", lambda: call_news_mcp(ticker, company_name)),
("sentiment", lambda: call_sentiment_mcp(ticker, company_name)),
]
# Normalizers to convert MCP schemas to analyzer-expected format
normalizers = {
"fundamentals": _normalize_fundamentals,
"valuation": _normalize_valuation,
"volatility": _normalize_volatility,
"macro": _normalize_macro,
}
metrics = {}
sources_available = []
sources_failed = []
# Sequential execution - one at a time
for name, mcp_func in mcp_sequence:
logger.info(f"Fetching {name}...")
try:
result = await mcp_func()
if isinstance(result, dict) and "error" in result:
# First attempt failed, retry once
logger.warning(f"MCP {name} error, retrying: {result.get('error', 'Unknown')[:50]}")
result = await mcp_func()
if isinstance(result, dict) and "error" in result:
sources_failed.append(name)
metrics[name] = {**result, "retried": True}
logger.warning(f"MCP {name} failed after retry: {result.get('error')}")
else:
# Apply normalizer if available
if name in normalizers:
result = normalizers[name](result)
sources_available.append(name)
metrics[name] = result
logger.info(f"MCP {name} succeeded on retry")
# Emit metrics for real-time streaming to frontend
await _extract_and_emit_metrics(name, result, progress_callback)
else:
# Apply normalizer if available
if name in normalizers:
result = normalizers[name](result)
sources_available.append(name)
metrics[name] = result
logger.info(f"MCP {name} fetched successfully")
# Emit metrics for real-time streaming to frontend
await _extract_and_emit_metrics(name, result, progress_callback)
except Exception as e:
# First attempt exception, retry once
logger.warning(f"MCP {name} exception, retrying: {e}")
try:
result = await mcp_func()
if isinstance(result, dict) and "error" not in result:
# Apply normalizer if available
if name in normalizers:
result = normalizers[name](result)
sources_available.append(name)
metrics[name] = result
logger.info(f"MCP {name} succeeded on retry")
# Emit metrics for real-time streaming to frontend
await _extract_and_emit_metrics(name, result, progress_callback)
else:
sources_failed.append(name)
metrics[name] = {"error": str(result.get("error", e)), "retried": True}
logger.warning(f"MCP {name} failed after retry")
except Exception as e2:
sources_failed.append(name)
metrics[name] = {"error": str(e2), "retried": True}
logger.warning(f"MCP {name} failed after retry: {e2}")
# Apply sorting and limiting to news (top 10, most recent first)
if "news" in metrics and "error" not in metrics.get("news", {}):
metrics["news"] = _sort_and_limit_news(metrics["news"], limit=10)
# Apply sorting and limiting to sentiment (top 10 articles/posts, most recent first)
if "sentiment" in metrics and "error" not in metrics.get("sentiment", {}):
metrics["sentiment"] = _sort_and_limit_sentiment(metrics["sentiment"], limit=10)
# Get multi-source data (now stored directly under source name)
fundamentals_data = metrics.get("fundamentals", {})
valuation_data = metrics.get("valuation", {})
macro_data = metrics.get("macro", {})
volatility_data = metrics.get("volatility", {})
# Add conflict resolution markers
conflict_resolution = _add_conflict_markers(fundamentals_data, valuation_data)
# Build aggregated SWOT from primary source data
aggregated_swot = _aggregate_swot(metrics, sources_available)
# Calculate completeness score
completeness = _calculate_completeness(metrics, sources_available)
# Final data package - shared with analyzer only after all collection complete
data = {
"ticker": ticker.upper(),
"company_name": company_name,
"sources_available": sources_available,
"sources_failed": sources_failed,
"metrics": metrics,
"multi_source": {
"fundamentals_all": fundamentals_data,
"valuation_all": valuation_data,
"macro_all": macro_data,
"volatility_all": volatility_data,
},
"conflict_resolution": conflict_resolution,
"aggregated_swot": aggregated_swot,
"completeness": completeness,
"generated_at": datetime.now().isoformat()
}
logger.info(f"Research complete: {len(sources_available)} sources, {len(sources_failed)} failed, {completeness['completeness_pct']}% complete")
return data
|