File size: 23,330 Bytes
07ff2cb 98060b8 07ff2cb 1e840aa 07ff2cb 98060b8 07ff2cb 1e840aa 07ff2cb 1e840aa 07ff2cb 1e840aa 07ff2cb 98060b8 7c0742a 98060b8 07ff2cb 1e840aa 07ff2cb 1e840aa 07ff2cb 98060b8 7c0742a b64ba5a 7c0742a b64ba5a 7c0742a b64ba5a 7c0742a b64ba5a 7c0742a 99a1635 b64ba5a 99a1635 b64ba5a 1e840aa 9d444de 1e840aa 9d444de b64ba5a 1e840aa b64ba5a 9d444de b64ba5a 9d444de b64ba5a 1553537 b64ba5a 7c0742a 1e840aa 7c0742a 1e840aa 7c0742a 1e840aa 7c0742a 1e840aa 7c0742a 1e840aa 7c0742a 1e840aa 7c0742a 1e840aa 7c0742a 1e840aa 7c0742a 1e840aa 7c0742a 98060b8 2ac24b1 98060b8 2ac24b1 98060b8 2ac24b1 98060b8 2ac24b1 98060b8 2ac24b1 98060b8 2ac24b1 98060b8 2ac24b1 98060b8 1e840aa 98060b8 1e840aa 98060b8 1e840aa 98060b8 1e840aa 98060b8 0ae549a 0d2f0f9 0ae549a 0d2f0f9 0ae549a 0d2f0f9 0ae549a 0d2f0f9 0ae549a 0d2f0f9 0ae549a 0d2f0f9 98060b8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 | """FinAgentCrew class β main orchestrator for the multi-agent analysis pipeline."""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Optional
from crewai import Crew, Process
from crew.config import OrchestratorConfig, TradePreferences
from crew.agents import (
create_llm,
create_market_scanner,
create_fundamental_analyst,
create_technical_analyst,
create_risk_manager,
create_chief_strategist,
)
from crew.tasks import (
create_market_scan_task,
create_fundamental_task,
create_technical_task,
create_risk_task,
create_strategy_task,
)
from crew.signals import Action, TradingSignal, TradingSignalParser
from crew.callbacks import ActivityFeedCallback
@dataclass
class CrewResult:
"""Result of a single ticker crew execution."""
ticker: str
signal: Optional[TradingSignal]
raw_output: str
success: bool
error: Optional[str] = None
class FinAgentCrew:
"""Orchestrates the multi-agent analysis pipeline for a single ticker."""
def __init__(
self,
config: OrchestratorConfig,
tools: dict[str, list],
callback: Optional[ActivityFeedCallback] = None,
preferences: Optional[TradePreferences] = None,
):
"""Initialize the crew orchestrator.
Args:
config: Full orchestrator configuration
tools: Dict mapping agent names to their tool lists:
{
"market_scanner": [search_news, get_price_change, get_volume],
"fundamental_analyst": [get_financials, get_earnings, get_peers],
"technical_analyst": [get_price_history, calculate_indicators],
"risk_manager": [calculate_position_size, set_stop_loss],
}
callback: Optional activity feed callback for real-time UI updates
preferences: User trading preferences that shape the final
signal's stop / target bands. Defaults to Moderate /
Swing Trading / $10k when not supplied.
"""
self._config = config
self._tools = tools
self._callback = callback
self._preferences = preferences or TradePreferences()
self._parser = TradingSignalParser()
def run(self, ticker: str) -> CrewResult:
"""Execute the full analysis pipeline for a single ticker.
Builds the crew, kicks off execution, parses the output into a
TradingSignal, and returns a CrewResult. Emits callback events
at task start, completion, and failure points.
Args:
ticker: Stock ticker symbol to analyze (e.g., "AAPL")
Returns:
CrewResult with parsed TradingSignal on success,
or error details on failure.
"""
try:
if self._callback:
self._callback.on_task_start("Crew", ticker)
crew = self._build_crew(ticker)
result = crew.kickoff()
raw_output = str(result)
signal = self._parse_output(raw_output, ticker)
# If the parser couldn't extract a structured signal from the
# Strategist's prose, fall back to a deterministic synthesis
# that grounds the signal in the upstream tools (live price
# from yfinance + a heuristic BUY/SELL/HOLD from the recent
# trend). This keeps the pipeline producing sensible output
# even when Qwen loses the output format mid-deliberation.
if signal is None:
signal = self._synthesize_from_tools(ticker, raw_output)
else:
# Cross-check the parsed entry against the live tool
# price. Small LLMs occasionally emit completely
# fabricated prices (e.g. $10.00 for a $215 stock) that
# happen to pass the relative-ordering sanity check
# because stop/target are on the right side of entry.
# Re-synthesize from tools whenever the parsed entry is
# wildly off from reality.
signal = self._reground_if_drifted(ticker, signal, raw_output)
if signal is not None:
if self._callback:
self._callback.on_task_complete(
"Chief Strategist",
ticker,
f"{signal.action.value} ({signal.confidence}%)",
)
return CrewResult(
ticker=ticker,
signal=signal,
raw_output=raw_output,
success=True,
)
else:
if self._callback:
self._callback.on_task_failed(
"Chief Strategist",
ticker,
"Failed to parse trading signal",
)
return CrewResult(
ticker=ticker,
signal=None,
raw_output=raw_output,
success=False,
error="Failed to parse trading signal from crew output",
)
except Exception as e:
error_msg = str(e)
if self._callback:
self._callback.on_task_failed("Crew", ticker, error_msg)
return CrewResult(
ticker=ticker,
signal=None,
raw_output="",
success=False,
error=error_msg,
)
def _build_crew(self, ticker: str) -> Crew:
"""Assemble the Crew with agents, tasks, and process configuration.
Creates all five agents with their respective tools, builds tasks
with proper dependency chains, and returns a configured Crew instance.
Args:
ticker: Stock ticker symbol for task descriptions
Returns:
Configured Crew instance ready for kickoff
"""
llm = create_llm(self._config.llm)
# Create agents with their assigned tools
market_scanner = create_market_scanner(
llm, self._tools.get("market_scanner", [])
)
fundamental_analyst = create_fundamental_analyst(
llm, self._tools.get("fundamental_analyst", [])
)
technical_analyst = create_technical_analyst(
llm, self._tools.get("technical_analyst", [])
)
risk_manager = create_risk_manager(
llm, self._tools.get("risk_manager", [])
)
chief_strategist = create_chief_strategist(llm)
# Create tasks with dependency chain
market_task = create_market_scan_task(market_scanner, ticker)
fundamental_task = create_fundamental_task(fundamental_analyst, ticker)
technical_task = create_technical_task(technical_analyst, ticker)
risk_task = create_risk_task(
risk_manager, ticker, [technical_task], self._preferences
)
strategy_task = create_strategy_task(
chief_strategist,
ticker,
[market_task, fundamental_task, technical_task, risk_task],
self._preferences,
)
return Crew(
agents=[
market_scanner,
fundamental_analyst,
technical_analyst,
risk_manager,
chief_strategist,
],
tasks=[
market_task,
fundamental_task,
technical_task,
risk_task,
strategy_task,
],
process=Process.sequential,
verbose=self._config.crew.verbose,
)
def _parse_output(
self, raw_output: str, ticker: str
) -> Optional[TradingSignal]:
"""Parse crew output into a TradingSignal.
Delegates to TradingSignalParser which attempts primary structured
format first, then falls back to heuristic extraction.
Args:
raw_output: Raw text output from the crew execution
ticker: Expected ticker symbol for validation
Returns:
TradingSignal if parsing succeeds, None if output is unparseable
"""
return self._parser.parse(raw_output, ticker)
def _reground_if_drifted(
self,
ticker: str,
signal: TradingSignal,
raw_output: str,
) -> TradingSignal:
r"""Anchor every signal's entry price to the live quote.
Small LLMs frequently emit training-era prices (Qwen3-14B once
handed back \$203 for NVDA when it was trading at \$215) that
pass the parser because the BUY/SELL inequalities still hold
relative to the invented entry. The card then shows a stale
price β useless for a trading signal.
The fix is absolute: we fetch the current quote via
:func:`get_price_change`, force ``entry_price`` to that value,
and **rescale** stop / target by the same ratio so the LLM's
risk / reward geometry is preserved. Example: LLM emits
entry=\$200, stop=\$194, target=\$210 for an NVDA card while
the live price is \$215 β we scale 1.075Γ so the card renders
entry=\$215, stop=\$208.52, target=\$225.75. The narrative
reasoning is kept as-is.
When the parsed entry is missing, zero, or more than 50 % off
the live price we fall back to the deterministic tool-grounded
synthesis in :meth:`_synthesize_from_tools`, which picks a
BUY / SELL / HOLD from the day's % change and derives bands
from scratch.
"""
parsed_entry = signal.entry_price
# Get the live price (best-effort; don't block on tool errors).
live_entry: Optional[float] = None
try:
market_tools = self._tools.get("market_scanner", [])
price_tool = next(
(t for t in market_tools
if getattr(t, "name", "") == "Get Price Change"),
None,
)
if price_tool is not None:
price_fn = getattr(price_tool, "func", price_tool)
result = str(price_fn(ticker))
m = re.search(
r"Current Price:\s*\$\s*([\d,]+\.?\d*)", result
)
if m:
live_entry = float(m.group(1).replace(",", ""))
except Exception:
live_entry = None
# If we couldn't fetch a live price, leave the signal as-is β
# the sanity-fix already clamped relative ordering.
if live_entry is None or live_entry <= 0:
return self._backfill_missing_prices(signal)
# Parsed entry is missing or grossly off (>50 %): the model
# probably fabricated the entire signal, so re-synthesise from
# tools. Below 50 % drift we keep the LLM's narrative but
# rescale the numbers.
badly_drifted = (
parsed_entry is None
or parsed_entry <= 0
or abs(parsed_entry - live_entry) / live_entry > 0.50
)
if badly_drifted:
synthesized = self._synthesize_from_tools(ticker, raw_output)
if synthesized is not None:
return synthesized
# If synthesis failed too, at least swap the entry.
return self._backfill_missing_prices(
TradingSignal(
ticker=signal.ticker,
action=signal.action,
confidence=signal.confidence,
entry_price=live_entry,
stop_loss=None,
target_price=None,
reasoning=signal.reasoning,
)
)
# Anchor to live price and rescale stop / target to preserve
# the LLM's risk-reward geometry. If either side is missing or
# degenerate, let the back-fill derive a fresh band.
scale = live_entry / parsed_entry
stop = signal.stop_loss
target = signal.target_price
def _bad(v: Optional[float]) -> bool:
"""Treat very-close-to-entry (< 0.5 %) stops / targets as
degenerate zero-risk numbers the model sometimes emits on
HOLD calls."""
if v is None or v <= 0:
return True
return abs(v - parsed_entry) / parsed_entry < 0.005
new_stop = None if _bad(stop) else round(stop * scale, 2)
new_target = None if _bad(target) else round(target * scale, 2)
# Extra guard: after rescaling, stop / target should still be
# within a reasonable band of the new entry. Bounds come from
# the user's preferences β Conservative / Day Trading locks
# everything down tight, Aggressive / Position Trading opens
# it up. If the LLM emitted something implausibly wide we let
# the back-fill replace it with the profile default band.
stop_clamp = self._preferences.stop_clamp
target_clamp = self._preferences.target_clamp
def _stop_unreasonable(v: Optional[float]) -> bool:
if v is None:
return True
return abs(v - live_entry) / live_entry > stop_clamp
def _target_unreasonable(v: Optional[float]) -> bool:
if v is None:
return True
return abs(v - live_entry) / live_entry > target_clamp
if _stop_unreasonable(new_stop):
new_stop = None
if _target_unreasonable(new_target):
new_target = None
# R:R guard: a BUY/SELL card with target closer to entry than
# stop is an inverted risk-reward setup (you'd risk more than
# you could make). Qwen occasionally emits this when it picks
# a conservative target that's still within the absolute clamp
# but not sensible relative to the stop it chose. Force the
# target back to the profile default band in that case.
if (
new_stop is not None
and new_target is not None
and signal.action in (Action.BUY, Action.SELL)
):
stop_dist = abs(new_stop - live_entry)
target_dist = abs(new_target - live_entry)
if target_dist <= stop_dist:
new_target = None
rescaled = TradingSignal(
ticker=signal.ticker,
action=signal.action,
confidence=signal.confidence,
entry_price=round(live_entry, 2),
stop_loss=new_stop,
target_price=new_target,
reasoning=signal.reasoning,
)
return self._backfill_missing_prices(rescaled)
def _backfill_missing_prices(self, signal: TradingSignal) -> TradingSignal:
"""Back-fill stop-loss and target when the LLM emitted N/A or omitted them.
This ensures the UI card always shows three numeric prices rather
than a mix of numbers and N/A placeholders. The band widths come
from the user's :class:`TradePreferences` so Conservative / Day
Trading produces tight short-horizon numbers and Aggressive /
Position Trading produces wider longer-horizon numbers.
"""
entry = signal.entry_price
if entry is None or entry <= 0:
return signal
stop = signal.stop_loss
target = signal.target_price
stop_pct = self._preferences.stop_pct
target_pct = self._preferences.target_pct
# HOLD cards use a half-width band so the card still renders
# with numeric stop / target without implying a directional
# trade the Strategist wasn't making.
hold_stop_pct = max(0.015, stop_pct * 0.5)
hold_target_pct = max(0.02, target_pct * 0.5)
if stop is None or stop <= 0:
if signal.action == Action.BUY:
stop = round(entry * (1 - stop_pct), 2)
elif signal.action == Action.SELL:
stop = round(entry * (1 + stop_pct), 2)
else: # HOLD
stop = round(entry * (1 - hold_stop_pct), 2)
if target is None or target <= 0:
if signal.action == Action.BUY:
target = round(entry * (1 + target_pct), 2)
elif signal.action == Action.SELL:
target = round(entry * (1 - target_pct), 2)
else: # HOLD
target = round(entry * (1 + hold_target_pct), 2)
return TradingSignal(
ticker=signal.ticker,
action=signal.action,
confidence=signal.confidence,
entry_price=entry,
stop_loss=stop,
target_price=target,
reasoning=signal.reasoning,
)
def _synthesize_from_tools(
self, ticker: str, raw_output: str
) -> Optional[TradingSignal]:
"""Build a TradingSignal directly from tool outputs when LLM parsing fails.
First we try to scan ``raw_output`` for the canonical
``Current Price: $X.XX`` row emitted by ``tools.market_scanner``
β that row leaks into the transcript when the agents call
``get_price_change``. If the Strategist's final answer doesn't
include it (it often doesn't β the Strategist is the last agent
and only sees the prior agents' *summaries*), we call
``get_price_change`` directly as a backstop so we still have a
live price to ground the card on.
From the live price we derive:
* **entry** = live price
* **action** = BUY if today's change is β₯ +1 %, SELL if β€ β1 %,
otherwise HOLD
* **stop / target** = Β± 3 % / Β± 5 % of entry
* **confidence** = 50 baseline, + up to 25 scaled by |% change|
* **reasoning** = preserve the LLM's narrative (first ~800 chars)
Returns ``None`` only if no live price can be retrieved at all.
"""
entry: Optional[float] = None
pct_change = 0.0
# Try the transcript first (cheap β no extra network call).
price_match = re.search(
r"Current Price:\s*\$\s*([\d,]+\.?\d*)", raw_output
)
if price_match:
try:
entry = float(price_match.group(1).replace(",", ""))
except ValueError:
entry = None
pct_match = re.search(
r"Change:[^()]*\(([+-]?)([\d.]+)%\)", raw_output
)
if pct_match:
sign = -1.0 if pct_match.group(1) == "-" else 1.0
try:
pct_change = sign * float(pct_match.group(2))
except ValueError:
pct_change = 0.0
# Backstop: call get_price_change directly if the Strategist's
# response did not carry the price row through from upstream.
if entry is None:
try:
market_tools = self._tools.get("market_scanner", [])
price_tool = next(
(t for t in market_tools
if getattr(t, "name", "") == "Get Price Change"),
None,
)
if price_tool is None:
return None
# crewai wraps tools; the underlying function is .func.
price_fn = getattr(price_tool, "func", price_tool)
result = price_fn(ticker)
p = re.search(
r"Current Price:\s*\$\s*([\d,]+\.?\d*)", str(result)
)
if not p:
return None
entry = float(p.group(1).replace(",", ""))
pct = re.search(
r"Change:[^()]*\(([+-]?)([\d.]+)%\)", str(result)
)
if pct:
sign = -1.0 if pct.group(1) == "-" else 1.0
try:
pct_change = sign * float(pct.group(2))
except ValueError:
pct_change = 0.0
except Exception:
return None
if entry is None or entry <= 0:
return None
# Choose action from today's trend.
if pct_change >= 1.0:
action = Action.BUY
elif pct_change <= -1.0:
action = Action.SELL
else:
action = Action.HOLD
# Price bands derived from user preferences.
stop_pct = self._preferences.stop_pct
target_pct = self._preferences.target_pct
hold_stop_pct = max(0.015, stop_pct * 0.5)
hold_target_pct = max(0.02, target_pct * 0.5)
if action == Action.BUY:
stop = round(entry * (1 - stop_pct), 2)
target = round(entry * (1 + target_pct), 2)
elif action == Action.SELL:
stop = round(entry * (1 + stop_pct), 2)
target = round(entry * (1 - target_pct), 2)
else: # HOLD
stop = round(entry * (1 - hold_stop_pct), 2)
target = round(entry * (1 + hold_target_pct), 2)
# Confidence = 50 baseline + up to 25 more for stronger moves.
confidence = int(
50 + min(25, round(abs(pct_change) * 5))
)
# Clean, structured reasoning β this fallback path runs when
# the Strategist's final output can't be parsed into the
# expected schema. We surface a concise four-line rationale
# derived from live data so the card reads consistently even
# when the upstream narrative is missing.
if action == Action.BUY:
rationale = (
f"Last close to last print {pct_change:+.2f}%. "
f"Short-term momentum favours a long entry at the live quote."
)
elif action == Action.SELL:
rationale = (
f"Last close to last print {pct_change:+.2f}%. "
f"Short-term momentum favours a short entry at the live quote."
)
else:
rationale = (
f"Change vs previous close {pct_change:+.2f}%. "
f"No directional conviction; HOLD and wait for a cleaner setup."
)
stop_pct_shown = abs(stop - entry) / entry * 100
target_pct_shown = abs(target - entry) / entry * 100
risk_note = (
f"Bands sized to your {self._preferences.risk_tolerance} / "
f"{self._preferences.trading_style} profile: "
f"stop β {stop_pct_shown:.1f}% / target β {target_pct_shown:.1f}%."
)
reasoning = {
"Market": rationale,
"Fundamental": (
"Fundamental view deferred β insufficient signal for a "
"high-conviction call on this window."
),
"Technical": (
"Entry anchored to live yfinance quote; stop and target "
"derived from the preference-aware default band."
),
"Risk": risk_note,
}
return TradingSignal(
ticker=ticker,
action=action,
confidence=confidence,
entry_price=entry,
stop_loss=stop,
target_price=target,
reasoning=reasoning,
)
|