finagent / crew /runner.py
emmanuelakbi's picture
feat(prefs): risk tolerance + trading style now shape the signal
1e840aa
"""WatchlistRunner — multi-ticker sequential execution with fault isolation."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
from crew.config import OrchestratorConfig, TradePreferences
from crew.crew import CrewResult, FinAgentCrew
from crew.callbacks import ActivityFeedCallback
@dataclass
class WatchlistResult:
"""Aggregated result of running the analysis pipeline across multiple tickers."""
signals: list[CrewResult] = field(default_factory=list)
total_tickers: int = 0
successful: int = 0
failed: int = 0
class WatchlistRunner:
"""Runs the FinAgentCrew pipeline for each ticker in a watchlist sequentially."""
def __init__(
self,
config: OrchestratorConfig,
tools: dict[str, list],
callback: Optional[ActivityFeedCallback] = None,
preferences: Optional[TradePreferences] = None,
):
self._config = config
self._tools = tools
self._callback = callback
self._preferences = preferences or TradePreferences()
def run(self, watchlist: str) -> WatchlistResult:
"""Parse the watchlist and run the analysis pipeline for each ticker.
Args:
watchlist: Comma-separated string of ticker symbols.
Returns:
WatchlistResult with aggregated signals and success/failure counts.
"""
tickers = self._parse_watchlist(watchlist)
results: list[CrewResult] = []
successful = 0
failed = 0
for ticker in tickers:
if self._callback:
self._callback.on_ticker_start(ticker)
result = self._run_single(ticker)
results.append(result)
if result.success:
successful += 1
else:
failed += 1
if self._callback:
self._callback.on_ticker_complete(ticker, result.signal)
return WatchlistResult(
signals=results,
total_tickers=len(tickers),
successful=successful,
failed=failed,
)
def _parse_watchlist(self, watchlist: str) -> list[str]:
"""Split watchlist string on commas, strip whitespace, uppercase, remove empties.
Args:
watchlist: Raw comma-separated ticker string.
Returns:
List of cleaned, uppercased ticker symbols.
"""
parts = watchlist.split(",")
tickers = []
for part in parts:
stripped = part.strip().upper()
if stripped:
tickers.append(stripped)
return tickers
def _run_single(self, ticker: str) -> CrewResult:
"""Run the full analysis pipeline for a single ticker with error isolation.
Args:
ticker: Uppercased ticker symbol.
Returns:
CrewResult on success or a failure CrewResult if an exception occurs.
"""
try:
crew = FinAgentCrew(
config=self._config,
tools=self._tools,
callback=self._callback,
preferences=self._preferences,
)
return crew.run(ticker)
except Exception as e:
return CrewResult(
ticker=ticker,
signal=None,
raw_output="",
success=False,
error=str(e),
)