| """FinAgentCrew class — main orchestrator for the multi-agent analysis pipeline.""" |
|
|
| from __future__ import annotations |
|
|
| import re |
| from dataclasses import dataclass |
| from typing import Optional |
|
|
| from crewai import Crew, Process |
|
|
| from crew.config import OrchestratorConfig, TradePreferences |
| from crew.agents import ( |
| create_llm, |
| create_market_scanner, |
| create_fundamental_analyst, |
| create_technical_analyst, |
| create_risk_manager, |
| create_chief_strategist, |
| ) |
| from crew.tasks import ( |
| create_market_scan_task, |
| create_fundamental_task, |
| create_technical_task, |
| create_risk_task, |
| create_strategy_task, |
| ) |
| from crew.signals import Action, TradingSignal, TradingSignalParser |
| from crew.callbacks import ActivityFeedCallback |
|
|
|
|
| @dataclass |
| class CrewResult: |
| """Result of a single ticker crew execution.""" |
|
|
| ticker: str |
| signal: Optional[TradingSignal] |
| raw_output: str |
| success: bool |
| error: Optional[str] = None |
|
|
|
|
| class FinAgentCrew: |
| """Orchestrates the multi-agent analysis pipeline for a single ticker.""" |
|
|
| def __init__( |
| self, |
| config: OrchestratorConfig, |
| tools: dict[str, list], |
| callback: Optional[ActivityFeedCallback] = None, |
| preferences: Optional[TradePreferences] = None, |
| ): |
| """Initialize the crew orchestrator. |
| |
| Args: |
| config: Full orchestrator configuration |
| tools: Dict mapping agent names to their tool lists: |
| { |
| "market_scanner": [search_news, get_price_change, get_volume], |
| "fundamental_analyst": [get_financials, get_earnings, get_peers], |
| "technical_analyst": [get_price_history, calculate_indicators], |
| "risk_manager": [calculate_position_size, set_stop_loss], |
| } |
| callback: Optional activity feed callback for real-time UI updates |
| preferences: User trading preferences that shape the final |
| signal's stop / target bands. Defaults to Moderate / |
| Swing Trading / $10k when not supplied. |
| """ |
| self._config = config |
| self._tools = tools |
| self._callback = callback |
| self._preferences = preferences or TradePreferences() |
| self._parser = TradingSignalParser() |
|
|
| def run(self, ticker: str) -> CrewResult: |
| """Execute the full analysis pipeline for a single ticker. |
| |
| Builds the crew, kicks off execution, parses the output into a |
| TradingSignal, and returns a CrewResult. Emits callback events |
| at task start, completion, and failure points. |
| |
| Args: |
| ticker: Stock ticker symbol to analyze (e.g., "AAPL") |
| |
| Returns: |
| CrewResult with parsed TradingSignal on success, |
| or error details on failure. |
| """ |
| try: |
| if self._callback: |
| self._callback.on_task_start("Crew", ticker) |
|
|
| crew = self._build_crew(ticker) |
| result = crew.kickoff() |
| raw_output = str(result) |
|
|
| signal = self._parse_output(raw_output, ticker) |
|
|
| |
| |
| |
| |
| |
| |
| if signal is None: |
| signal = self._synthesize_from_tools(ticker, raw_output) |
| else: |
| |
| |
| |
| |
| |
| |
| |
| signal = self._reground_if_drifted(ticker, signal, raw_output) |
|
|
| if signal is not None: |
| if self._callback: |
| self._callback.on_task_complete( |
| "Chief Strategist", |
| ticker, |
| f"{signal.action.value} ({signal.confidence}%)", |
| ) |
| return CrewResult( |
| ticker=ticker, |
| signal=signal, |
| raw_output=raw_output, |
| success=True, |
| ) |
| else: |
| if self._callback: |
| self._callback.on_task_failed( |
| "Chief Strategist", |
| ticker, |
| "Failed to parse trading signal", |
| ) |
| return CrewResult( |
| ticker=ticker, |
| signal=None, |
| raw_output=raw_output, |
| success=False, |
| error="Failed to parse trading signal from crew output", |
| ) |
| except Exception as e: |
| error_msg = str(e) |
| if self._callback: |
| self._callback.on_task_failed("Crew", ticker, error_msg) |
| return CrewResult( |
| ticker=ticker, |
| signal=None, |
| raw_output="", |
| success=False, |
| error=error_msg, |
| ) |
|
|
| def _build_crew(self, ticker: str) -> Crew: |
| """Assemble the Crew with agents, tasks, and process configuration. |
| |
| Creates all five agents with their respective tools, builds tasks |
| with proper dependency chains, and returns a configured Crew instance. |
| |
| Args: |
| ticker: Stock ticker symbol for task descriptions |
| |
| Returns: |
| Configured Crew instance ready for kickoff |
| """ |
| llm = create_llm(self._config.llm) |
|
|
| |
| market_scanner = create_market_scanner( |
| llm, self._tools.get("market_scanner", []) |
| ) |
| fundamental_analyst = create_fundamental_analyst( |
| llm, self._tools.get("fundamental_analyst", []) |
| ) |
| technical_analyst = create_technical_analyst( |
| llm, self._tools.get("technical_analyst", []) |
| ) |
| risk_manager = create_risk_manager( |
| llm, self._tools.get("risk_manager", []) |
| ) |
| chief_strategist = create_chief_strategist(llm) |
|
|
| |
| market_task = create_market_scan_task(market_scanner, ticker) |
| fundamental_task = create_fundamental_task(fundamental_analyst, ticker) |
| technical_task = create_technical_task(technical_analyst, ticker) |
| risk_task = create_risk_task( |
| risk_manager, ticker, [technical_task], self._preferences |
| ) |
| strategy_task = create_strategy_task( |
| chief_strategist, |
| ticker, |
| [market_task, fundamental_task, technical_task, risk_task], |
| self._preferences, |
| ) |
|
|
| return Crew( |
| agents=[ |
| market_scanner, |
| fundamental_analyst, |
| technical_analyst, |
| risk_manager, |
| chief_strategist, |
| ], |
| tasks=[ |
| market_task, |
| fundamental_task, |
| technical_task, |
| risk_task, |
| strategy_task, |
| ], |
| process=Process.sequential, |
| verbose=self._config.crew.verbose, |
| ) |
|
|
| def _parse_output( |
| self, raw_output: str, ticker: str |
| ) -> Optional[TradingSignal]: |
| """Parse crew output into a TradingSignal. |
| |
| Delegates to TradingSignalParser which attempts primary structured |
| format first, then falls back to heuristic extraction. |
| |
| Args: |
| raw_output: Raw text output from the crew execution |
| ticker: Expected ticker symbol for validation |
| |
| Returns: |
| TradingSignal if parsing succeeds, None if output is unparseable |
| """ |
| return self._parser.parse(raw_output, ticker) |
|
|
| def _reground_if_drifted( |
| self, |
| ticker: str, |
| signal: TradingSignal, |
| raw_output: str, |
| ) -> TradingSignal: |
| r"""Anchor every signal's entry price to the live quote. |
| |
| Small LLMs frequently emit training-era prices (Qwen3-14B once |
| handed back \$203 for NVDA when it was trading at \$215) that |
| pass the parser because the BUY/SELL inequalities still hold |
| relative to the invented entry. The card then shows a stale |
| price — useless for a trading signal. |
| |
| The fix is absolute: we fetch the current quote via |
| :func:`get_price_change`, force ``entry_price`` to that value, |
| and **rescale** stop / target by the same ratio so the LLM's |
| risk / reward geometry is preserved. Example: LLM emits |
| entry=\$200, stop=\$194, target=\$210 for an NVDA card while |
| the live price is \$215 — we scale 1.075× so the card renders |
| entry=\$215, stop=\$208.52, target=\$225.75. The narrative |
| reasoning is kept as-is. |
| |
| When the parsed entry is missing, zero, or more than 50 % off |
| the live price we fall back to the deterministic tool-grounded |
| synthesis in :meth:`_synthesize_from_tools`, which picks a |
| BUY / SELL / HOLD from the day's % change and derives bands |
| from scratch. |
| """ |
| parsed_entry = signal.entry_price |
|
|
| |
| live_entry: Optional[float] = None |
| try: |
| market_tools = self._tools.get("market_scanner", []) |
| price_tool = next( |
| (t for t in market_tools |
| if getattr(t, "name", "") == "Get Price Change"), |
| None, |
| ) |
| if price_tool is not None: |
| price_fn = getattr(price_tool, "func", price_tool) |
| result = str(price_fn(ticker)) |
| m = re.search( |
| r"Current Price:\s*\$\s*([\d,]+\.?\d*)", result |
| ) |
| if m: |
| live_entry = float(m.group(1).replace(",", "")) |
| except Exception: |
| live_entry = None |
|
|
| |
| |
| if live_entry is None or live_entry <= 0: |
| return self._backfill_missing_prices(signal) |
|
|
| |
| |
| |
| |
| badly_drifted = ( |
| parsed_entry is None |
| or parsed_entry <= 0 |
| or abs(parsed_entry - live_entry) / live_entry > 0.50 |
| ) |
| if badly_drifted: |
| synthesized = self._synthesize_from_tools(ticker, raw_output) |
| if synthesized is not None: |
| return synthesized |
| |
| return self._backfill_missing_prices( |
| TradingSignal( |
| ticker=signal.ticker, |
| action=signal.action, |
| confidence=signal.confidence, |
| entry_price=live_entry, |
| stop_loss=None, |
| target_price=None, |
| reasoning=signal.reasoning, |
| ) |
| ) |
|
|
| |
| |
| |
| scale = live_entry / parsed_entry |
| stop = signal.stop_loss |
| target = signal.target_price |
|
|
| def _bad(v: Optional[float]) -> bool: |
| """Treat very-close-to-entry (< 0.5 %) stops / targets as |
| degenerate zero-risk numbers the model sometimes emits on |
| HOLD calls.""" |
| if v is None or v <= 0: |
| return True |
| return abs(v - parsed_entry) / parsed_entry < 0.005 |
|
|
| new_stop = None if _bad(stop) else round(stop * scale, 2) |
| new_target = None if _bad(target) else round(target * scale, 2) |
|
|
| |
| |
| |
| |
| |
| |
| stop_clamp = self._preferences.stop_clamp |
| target_clamp = self._preferences.target_clamp |
|
|
| def _stop_unreasonable(v: Optional[float]) -> bool: |
| if v is None: |
| return True |
| return abs(v - live_entry) / live_entry > stop_clamp |
|
|
| def _target_unreasonable(v: Optional[float]) -> bool: |
| if v is None: |
| return True |
| return abs(v - live_entry) / live_entry > target_clamp |
|
|
| if _stop_unreasonable(new_stop): |
| new_stop = None |
| if _target_unreasonable(new_target): |
| new_target = None |
|
|
| |
| |
| |
| |
| |
| |
| if ( |
| new_stop is not None |
| and new_target is not None |
| and signal.action in (Action.BUY, Action.SELL) |
| ): |
| stop_dist = abs(new_stop - live_entry) |
| target_dist = abs(new_target - live_entry) |
| if target_dist <= stop_dist: |
| new_target = None |
|
|
| rescaled = TradingSignal( |
| ticker=signal.ticker, |
| action=signal.action, |
| confidence=signal.confidence, |
| entry_price=round(live_entry, 2), |
| stop_loss=new_stop, |
| target_price=new_target, |
| reasoning=signal.reasoning, |
| ) |
| return self._backfill_missing_prices(rescaled) |
|
|
| def _backfill_missing_prices(self, signal: TradingSignal) -> TradingSignal: |
| """Back-fill stop-loss and target when the LLM emitted N/A or omitted them. |
| |
| This ensures the UI card always shows three numeric prices rather |
| than a mix of numbers and N/A placeholders. The band widths come |
| from the user's :class:`TradePreferences` so Conservative / Day |
| Trading produces tight short-horizon numbers and Aggressive / |
| Position Trading produces wider longer-horizon numbers. |
| """ |
| entry = signal.entry_price |
| if entry is None or entry <= 0: |
| return signal |
|
|
| stop = signal.stop_loss |
| target = signal.target_price |
| stop_pct = self._preferences.stop_pct |
| target_pct = self._preferences.target_pct |
| |
| |
| |
| hold_stop_pct = max(0.015, stop_pct * 0.5) |
| hold_target_pct = max(0.02, target_pct * 0.5) |
|
|
| if stop is None or stop <= 0: |
| if signal.action == Action.BUY: |
| stop = round(entry * (1 - stop_pct), 2) |
| elif signal.action == Action.SELL: |
| stop = round(entry * (1 + stop_pct), 2) |
| else: |
| stop = round(entry * (1 - hold_stop_pct), 2) |
|
|
| if target is None or target <= 0: |
| if signal.action == Action.BUY: |
| target = round(entry * (1 + target_pct), 2) |
| elif signal.action == Action.SELL: |
| target = round(entry * (1 - target_pct), 2) |
| else: |
| target = round(entry * (1 + hold_target_pct), 2) |
|
|
| return TradingSignal( |
| ticker=signal.ticker, |
| action=signal.action, |
| confidence=signal.confidence, |
| entry_price=entry, |
| stop_loss=stop, |
| target_price=target, |
| reasoning=signal.reasoning, |
| ) |
|
|
| def _synthesize_from_tools( |
| self, ticker: str, raw_output: str |
| ) -> Optional[TradingSignal]: |
| """Build a TradingSignal directly from tool outputs when LLM parsing fails. |
| |
| First we try to scan ``raw_output`` for the canonical |
| ``Current Price: $X.XX`` row emitted by ``tools.market_scanner`` |
| — that row leaks into the transcript when the agents call |
| ``get_price_change``. If the Strategist's final answer doesn't |
| include it (it often doesn't — the Strategist is the last agent |
| and only sees the prior agents' *summaries*), we call |
| ``get_price_change`` directly as a backstop so we still have a |
| live price to ground the card on. |
| |
| From the live price we derive: |
| |
| * **entry** = live price |
| * **action** = BUY if today's change is ≥ +1 %, SELL if ≤ −1 %, |
| otherwise HOLD |
| * **stop / target** = ± 3 % / ± 5 % of entry |
| * **confidence** = 50 baseline, + up to 25 scaled by |% change| |
| * **reasoning** = preserve the LLM's narrative (first ~800 chars) |
| |
| Returns ``None`` only if no live price can be retrieved at all. |
| """ |
| entry: Optional[float] = None |
| pct_change = 0.0 |
|
|
| |
| price_match = re.search( |
| r"Current Price:\s*\$\s*([\d,]+\.?\d*)", raw_output |
| ) |
| if price_match: |
| try: |
| entry = float(price_match.group(1).replace(",", "")) |
| except ValueError: |
| entry = None |
|
|
| pct_match = re.search( |
| r"Change:[^()]*\(([+-]?)([\d.]+)%\)", raw_output |
| ) |
| if pct_match: |
| sign = -1.0 if pct_match.group(1) == "-" else 1.0 |
| try: |
| pct_change = sign * float(pct_match.group(2)) |
| except ValueError: |
| pct_change = 0.0 |
|
|
| |
| |
| if entry is None: |
| try: |
| market_tools = self._tools.get("market_scanner", []) |
| price_tool = next( |
| (t for t in market_tools |
| if getattr(t, "name", "") == "Get Price Change"), |
| None, |
| ) |
| if price_tool is None: |
| return None |
| |
| price_fn = getattr(price_tool, "func", price_tool) |
| result = price_fn(ticker) |
| p = re.search( |
| r"Current Price:\s*\$\s*([\d,]+\.?\d*)", str(result) |
| ) |
| if not p: |
| return None |
| entry = float(p.group(1).replace(",", "")) |
| pct = re.search( |
| r"Change:[^()]*\(([+-]?)([\d.]+)%\)", str(result) |
| ) |
| if pct: |
| sign = -1.0 if pct.group(1) == "-" else 1.0 |
| try: |
| pct_change = sign * float(pct.group(2)) |
| except ValueError: |
| pct_change = 0.0 |
| except Exception: |
| return None |
|
|
| if entry is None or entry <= 0: |
| return None |
|
|
| |
| if pct_change >= 1.0: |
| action = Action.BUY |
| elif pct_change <= -1.0: |
| action = Action.SELL |
| else: |
| action = Action.HOLD |
|
|
| |
| stop_pct = self._preferences.stop_pct |
| target_pct = self._preferences.target_pct |
| hold_stop_pct = max(0.015, stop_pct * 0.5) |
| hold_target_pct = max(0.02, target_pct * 0.5) |
|
|
| if action == Action.BUY: |
| stop = round(entry * (1 - stop_pct), 2) |
| target = round(entry * (1 + target_pct), 2) |
| elif action == Action.SELL: |
| stop = round(entry * (1 + stop_pct), 2) |
| target = round(entry * (1 - target_pct), 2) |
| else: |
| stop = round(entry * (1 - hold_stop_pct), 2) |
| target = round(entry * (1 + hold_target_pct), 2) |
|
|
| |
| confidence = int( |
| 50 + min(25, round(abs(pct_change) * 5)) |
| ) |
|
|
| |
| |
| |
| |
| |
| if action == Action.BUY: |
| rationale = ( |
| f"Last close to last print {pct_change:+.2f}%. " |
| f"Short-term momentum favours a long entry at the live quote." |
| ) |
| elif action == Action.SELL: |
| rationale = ( |
| f"Last close to last print {pct_change:+.2f}%. " |
| f"Short-term momentum favours a short entry at the live quote." |
| ) |
| else: |
| rationale = ( |
| f"Change vs previous close {pct_change:+.2f}%. " |
| f"No directional conviction; HOLD and wait for a cleaner setup." |
| ) |
|
|
| stop_pct_shown = abs(stop - entry) / entry * 100 |
| target_pct_shown = abs(target - entry) / entry * 100 |
| risk_note = ( |
| f"Bands sized to your {self._preferences.risk_tolerance} / " |
| f"{self._preferences.trading_style} profile: " |
| f"stop ≈ {stop_pct_shown:.1f}% / target ≈ {target_pct_shown:.1f}%." |
| ) |
|
|
| reasoning = { |
| "Market": rationale, |
| "Fundamental": ( |
| "Fundamental view deferred — insufficient signal for a " |
| "high-conviction call on this window." |
| ), |
| "Technical": ( |
| "Entry anchored to live yfinance quote; stop and target " |
| "derived from the preference-aware default band." |
| ), |
| "Risk": risk_note, |
| } |
|
|
| return TradingSignal( |
| ticker=ticker, |
| action=action, |
| confidence=confidence, |
| entry_price=entry, |
| stop_loss=stop, |
| target_price=target, |
| reasoning=reasoning, |
| ) |
|
|