Spaces:
Sleeping
Sleeping
| """ | |
| Research Gateway Node | |
| Fetches data from the Research Service via A2A protocol. | |
| The Research Service internally calls all 6 MCP servers using TRUE MCP protocol. | |
| This node acts as the gateway between the main SWOT Agent and the external Research Service. | |
| """ | |
| import asyncio | |
| import json | |
| from langsmith import traceable | |
| from src.utils.ticker_lookup import get_ticker, normalize_company_name | |
| async def _fetch_via_research_gateway(company: str, ticker: str = None, progress_callback=None, add_log=None) -> dict: | |
| """Async helper to fetch data via Research Gateway (A2A protocol).""" | |
| from src.nodes.research_gateway import call_research_service | |
| # Use provided ticker or lookup from company name | |
| if not ticker: | |
| ticker = get_ticker(company) | |
| if not ticker: | |
| print(f"Could not determine ticker for '{company}', using company name as ticker") | |
| ticker = company.upper().replace(" ", "")[:5] | |
| # Normalize company name for display | |
| company_name = normalize_company_name(company) | |
| print(f"Calling Research Service for {company_name} ({ticker})...") | |
| # Call Research Service with callbacks for real-time streaming | |
| result = await call_research_service( | |
| company_name, | |
| ticker, | |
| progress_callback=progress_callback, | |
| add_log=add_log | |
| ) | |
| return result | |
| def researcher_node(state, workflow_id=None, progress_store=None): | |
| """ | |
| Research Gateway node that fetches data via A2A protocol. | |
| Calls the external Research Service which internally fetches from 6 MCP servers: | |
| Fundamentals, Volatility, Macro, Valuation, News, Sentiment | |
| """ | |
| company = state["company_name"] | |
| ticker = state.get("ticker") # Use ticker from stock search if available | |
| # Extract workflow_id and progress_store from state (graph invokes with state only) | |
| if workflow_id is None: | |
| workflow_id = state.get("workflow_id") | |
| if progress_store is None: | |
| progress_store = state.get("progress_store") | |
| print(f"[DEBUG] researcher_node: workflow_id={workflow_id}, progress_store={'yes' if progress_store else 'no'}") | |
| # Update progress if tracking is enabled | |
| if workflow_id and progress_store: | |
| progress_store[workflow_id].update({ | |
| "current_step": "researcher", | |
| "revision_count": state.get("revision_count", 0), | |
| "score": state.get("score", 0) | |
| }) | |
| # Helper to add activity log | |
| def add_log(step: str, message: str): | |
| if workflow_id and progress_store: | |
| from src.services.workflow_store import add_activity_log | |
| add_activity_log(workflow_id, step, message) | |
| # Create progress callback for granular metric events | |
| # Supports both dict payload (new) and positional args (legacy) | |
| def progress_callback(*args, **kwargs): | |
| if args and isinstance(args[0], dict): | |
| # New structured payload format | |
| p = args[0] | |
| src = p.get("source") | |
| metric = p.get("metric") | |
| value = p.get("value") | |
| end_date = p.get("end_date") | |
| fiscal_year = p.get("fiscal_year") | |
| form = p.get("form") | |
| else: | |
| # Legacy positional args format | |
| src = args[0] if len(args) > 0 else kwargs.get("source") | |
| metric = args[1] if len(args) > 1 else kwargs.get("metric") | |
| value = args[2] if len(args) > 2 else kwargs.get("value") | |
| end_date = args[3] if len(args) > 3 else kwargs.get("end_date") | |
| fiscal_year = args[4] if len(args) > 4 else kwargs.get("fiscal_year") | |
| form = args[5] if len(args) > 5 else kwargs.get("form") | |
| if workflow_id and progress_store and src and metric: | |
| from src.services.workflow_store import add_metric | |
| add_metric(workflow_id, src, metric, value, | |
| end_date=end_date, fiscal_year=fiscal_year, form=form) | |
| try: | |
| # Set all MCP servers to "executing" state before research starts | |
| if workflow_id and progress_store: | |
| from src.services.workflow_store import set_mcp_executing | |
| set_mcp_executing(workflow_id) | |
| # Fetch via Research Gateway (A2A protocol) | |
| print("[Research Gateway] Calling Research Service via A2A...") | |
| result = asyncio.run(_fetch_via_research_gateway( | |
| company, | |
| ticker, | |
| progress_callback=progress_callback, | |
| add_log=add_log | |
| )) | |
| # Validate result | |
| if not result or not isinstance(result, dict): | |
| raise RuntimeError(f"Research Service returned invalid data for {company}") | |
| state["data_source"] = "a2a" | |
| # Note: Metrics are streamed via partial_metrics during A2A polling | |
| # Check MCP source availability with tiered logic | |
| # Core sources (need at least 2 of 3): fundamentals, valuation, volatility | |
| # Supplementary sources (non-blocking): macro, news, sentiment | |
| CORE_SOURCES = {"fundamentals", "valuation", "volatility"} | |
| SUPPLEMENTARY_SOURCES = {"macro", "news", "sentiment"} | |
| sources_available = set(result.get("sources_available", [])) | |
| sources_failed = result.get("sources_failed", []) | |
| core_available = sources_available & CORE_SOURCES | |
| core_failed = CORE_SOURCES - core_available | |
| supplementary_failed = set(sources_failed) & SUPPLEMENTARY_SOURCES | |
| # Log supplementary failures as non-critical | |
| for source in supplementary_failed: | |
| add_log("researcher", f"{source.capitalize()} unavailable (non-critical)") | |
| # Log core failures as critical | |
| for source in core_failed: | |
| add_log("researcher", f"{source.capitalize()} unavailable (critical)") | |
| # Abort if 2+ core sources failed (need at least 2 of 3) | |
| if len(core_available) < 2: | |
| failed_list = ", ".join(sorted(core_failed)) | |
| raise RuntimeError(f"Insufficient core data: {failed_list} unavailable. Need at least 2 of: Fundamentals, Valuation, Volatility.") | |
| if sources_available: | |
| state["raw_data"] = json.dumps(result, indent=2, default=str) | |
| state["sources_failed"] = sources_failed | |
| print(f" - Sources available: {result['sources_available']}") | |
| if sources_failed: | |
| print(f" - Sources failed: {sources_failed}") | |
| else: | |
| # All MCPs failed - raise error | |
| raise RuntimeError(f"All MCP servers failed for {company}. Check API configurations.") | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f"Research failed: {error_msg}") | |
| add_log("researcher", f"ERROR: {error_msg}") | |
| raise RuntimeError(f"Research failed for {company}: {error_msg}") | |
| return state | |