File size: 3,454 Bytes
07ff2cb 1e840aa 07ff2cb 1e840aa 07ff2cb 1e840aa 07ff2cb 1e840aa 07ff2cb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | """WatchlistRunner — multi-ticker sequential execution with fault isolation."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
from crew.config import OrchestratorConfig, TradePreferences
from crew.crew import CrewResult, FinAgentCrew
from crew.callbacks import ActivityFeedCallback
@dataclass
class WatchlistResult:
"""Aggregated result of running the analysis pipeline across multiple tickers."""
signals: list[CrewResult] = field(default_factory=list)
total_tickers: int = 0
successful: int = 0
failed: int = 0
class WatchlistRunner:
"""Runs the FinAgentCrew pipeline for each ticker in a watchlist sequentially."""
def __init__(
self,
config: OrchestratorConfig,
tools: dict[str, list],
callback: Optional[ActivityFeedCallback] = None,
preferences: Optional[TradePreferences] = None,
):
self._config = config
self._tools = tools
self._callback = callback
self._preferences = preferences or TradePreferences()
def run(self, watchlist: str) -> WatchlistResult:
"""Parse the watchlist and run the analysis pipeline for each ticker.
Args:
watchlist: Comma-separated string of ticker symbols.
Returns:
WatchlistResult with aggregated signals and success/failure counts.
"""
tickers = self._parse_watchlist(watchlist)
results: list[CrewResult] = []
successful = 0
failed = 0
for ticker in tickers:
if self._callback:
self._callback.on_ticker_start(ticker)
result = self._run_single(ticker)
results.append(result)
if result.success:
successful += 1
else:
failed += 1
if self._callback:
self._callback.on_ticker_complete(ticker, result.signal)
return WatchlistResult(
signals=results,
total_tickers=len(tickers),
successful=successful,
failed=failed,
)
def _parse_watchlist(self, watchlist: str) -> list[str]:
"""Split watchlist string on commas, strip whitespace, uppercase, remove empties.
Args:
watchlist: Raw comma-separated ticker string.
Returns:
List of cleaned, uppercased ticker symbols.
"""
parts = watchlist.split(",")
tickers = []
for part in parts:
stripped = part.strip().upper()
if stripped:
tickers.append(stripped)
return tickers
def _run_single(self, ticker: str) -> CrewResult:
"""Run the full analysis pipeline for a single ticker with error isolation.
Args:
ticker: Uppercased ticker symbol.
Returns:
CrewResult on success or a failure CrewResult if an exception occurs.
"""
try:
crew = FinAgentCrew(
config=self._config,
tools=self._tools,
callback=self._callback,
preferences=self._preferences,
)
return crew.run(ticker)
except Exception as e:
return CrewResult(
ticker=ticker,
signal=None,
raw_output="",
success=False,
error=str(e),
)
|