"""WatchlistRunner — multi-ticker sequential execution with fault isolation.""" from __future__ import annotations from dataclasses import dataclass, field from typing import Optional from crew.config import OrchestratorConfig, TradePreferences from crew.crew import CrewResult, FinAgentCrew from crew.callbacks import ActivityFeedCallback @dataclass class WatchlistResult: """Aggregated result of running the analysis pipeline across multiple tickers.""" signals: list[CrewResult] = field(default_factory=list) total_tickers: int = 0 successful: int = 0 failed: int = 0 class WatchlistRunner: """Runs the FinAgentCrew pipeline for each ticker in a watchlist sequentially.""" def __init__( self, config: OrchestratorConfig, tools: dict[str, list], callback: Optional[ActivityFeedCallback] = None, preferences: Optional[TradePreferences] = None, ): self._config = config self._tools = tools self._callback = callback self._preferences = preferences or TradePreferences() def run(self, watchlist: str) -> WatchlistResult: """Parse the watchlist and run the analysis pipeline for each ticker. Args: watchlist: Comma-separated string of ticker symbols. Returns: WatchlistResult with aggregated signals and success/failure counts. """ tickers = self._parse_watchlist(watchlist) results: list[CrewResult] = [] successful = 0 failed = 0 for ticker in tickers: if self._callback: self._callback.on_ticker_start(ticker) result = self._run_single(ticker) results.append(result) if result.success: successful += 1 else: failed += 1 if self._callback: self._callback.on_ticker_complete(ticker, result.signal) return WatchlistResult( signals=results, total_tickers=len(tickers), successful=successful, failed=failed, ) def _parse_watchlist(self, watchlist: str) -> list[str]: """Split watchlist string on commas, strip whitespace, uppercase, remove empties. Args: watchlist: Raw comma-separated ticker string. Returns: List of cleaned, uppercased ticker symbols. """ parts = watchlist.split(",") tickers = [] for part in parts: stripped = part.strip().upper() if stripped: tickers.append(stripped) return tickers def _run_single(self, ticker: str) -> CrewResult: """Run the full analysis pipeline for a single ticker with error isolation. Args: ticker: Uppercased ticker symbol. Returns: CrewResult on success or a failure CrewResult if an exception occurs. """ try: crew = FinAgentCrew( config=self._config, tools=self._tools, callback=self._callback, preferences=self._preferences, ) return crew.run(ticker) except Exception as e: return CrewResult( ticker=ticker, signal=None, raw_output="", success=False, error=str(e), )