| """TradingSignal dataclass and parser for structured output handling.""" |
|
|
| import re |
| from dataclasses import dataclass |
| from enum import Enum |
| from typing import Optional |
|
|
|
|
| class Action(str, Enum): |
| """Trading signal actions.""" |
|
|
| BUY = "BUY" |
| SELL = "SELL" |
| HOLD = "HOLD" |
|
|
|
|
| @dataclass |
| class TradingSignal: |
| """Structured trading signal output.""" |
|
|
| ticker: str |
| action: Action |
| confidence: int |
| entry_price: Optional[float] = None |
| stop_loss: Optional[float] = None |
| target_price: Optional[float] = None |
| reasoning: Optional[dict[str, str]] = None |
|
|
| @staticmethod |
| def validate_confidence(value: int) -> int: |
| """Clamp confidence to 0-100 range.""" |
| return max(0, min(100, value)) |
|
|
|
|
| class TradingSignalParser: |
| """Parses raw LLM output into structured TradingSignal objects.""" |
|
|
| |
| PRIMARY_PATTERN = re.compile( |
| r"([A-Z\-\.]+)\s*[—–-]\s*(BUY|SELL|HOLD)\s*\(Confidence:\s*(\d{1,3})%\)", |
| re.IGNORECASE, |
| ) |
|
|
| |
| ACTION_PATTERN = re.compile(r"\b(BUY|SELL|HOLD)\b", re.IGNORECASE) |
| CONFIDENCE_PATTERN = re.compile(r"(\d{1,3})\s*%") |
| PRICE_PATTERN = re.compile(r"\$\s*([\d,]+\.?\d*)") |
|
|
| |
| |
| |
| |
| THINK_BLOCK = re.compile(r"<think>.*?</think>", re.DOTALL | re.IGNORECASE) |
|
|
| def parse(self, raw_output: str, ticker: str) -> Optional[TradingSignal]: |
| """Parse raw output into a TradingSignal. |
| |
| Attempts primary pattern first, falls back to heuristic extraction. |
| After parsing, prices are sanity-checked so the Strategist can't |
| ship obviously-wrong numbers (e.g. $50 000 target on a $290 stock, |
| or stop-loss on the wrong side of entry for the given action). |
| |
| Args: |
| raw_output: Raw text from Chief Strategist agent |
| ticker: Expected ticker symbol |
| |
| Returns: |
| TradingSignal if parsing succeeds, None if output is unparseable |
| """ |
| |
| |
| |
| |
| |
| |
| cleaned = self.THINK_BLOCK.sub("", raw_output) |
| |
| |
| cleaned = re.sub(r"</?think>", "", cleaned, flags=re.IGNORECASE) |
|
|
| signal = self._parse_primary(cleaned, ticker) |
| if signal is None: |
| signal = self._parse_fallback(cleaned, ticker) |
| if signal is None: |
| return None |
| return self._sanity_fix_prices(signal) |
|
|
| @staticmethod |
| def _sanity_fix_prices(signal: TradingSignal) -> TradingSignal: |
| """Clamp runaway prices and enforce BUY/SELL inequality semantics. |
| |
| Small LLMs occasionally emit obvious absurdities at synthesis time |
| (e.g. target that is 100× entry, or stop_loss on the wrong side |
| of entry for a BUY). Rather than surface nonsense to the user, |
| detect these and replace with conservative defaults derived from |
| the entry price: |
| |
| * For BUY: stop = entry × 0.97, target = entry × 1.05 |
| * For SELL: stop = entry × 1.03, target = entry × 0.95 |
| * For HOLD: defaults left as-is. |
| |
| We only override a field when it fails a plausibility test |
| (>20 % away from entry, or on the wrong side of entry for the |
| current action). Valid prices from the model are preserved. |
| """ |
| entry = signal.entry_price |
| |
| if entry is None or entry <= 0: |
| return signal |
|
|
| stop = signal.stop_loss |
| target = signal.target_price |
| action = signal.action |
|
|
| def _out_of_band(value: Optional[float]) -> bool: |
| """True if ``value`` is missing, non-positive, or >20 % from entry.""" |
| if value is None or value <= 0: |
| return True |
| return abs(value - entry) / entry > 0.20 |
|
|
| def _wrong_side_stop(value: Optional[float]) -> bool: |
| """Stop on the wrong side of entry for the current action.""" |
| if value is None: |
| return False |
| if action == Action.BUY and value >= entry: |
| return True |
| if action == Action.SELL and value <= entry: |
| return True |
| return False |
|
|
| def _wrong_side_target(value: Optional[float]) -> bool: |
| """Target on the wrong side of entry for the current action.""" |
| if value is None: |
| return False |
| if action == Action.BUY and value <= entry: |
| return True |
| if action == Action.SELL and value >= entry: |
| return True |
| return False |
|
|
| if action == Action.BUY: |
| if _out_of_band(stop) or _wrong_side_stop(stop): |
| stop = round(entry * 0.97, 2) |
| if _out_of_band(target) or _wrong_side_target(target): |
| target = round(entry * 1.05, 2) |
| elif action == Action.SELL: |
| if _out_of_band(stop) or _wrong_side_stop(stop): |
| stop = round(entry * 1.03, 2) |
| if _out_of_band(target) or _wrong_side_target(target): |
| target = round(entry * 0.95, 2) |
| |
|
|
| return TradingSignal( |
| ticker=signal.ticker, |
| action=signal.action, |
| confidence=signal.confidence, |
| entry_price=entry, |
| stop_loss=stop, |
| target_price=target, |
| reasoning=signal.reasoning, |
| ) |
|
|
| def _parse_primary(self, raw_output: str, ticker: str) -> Optional[TradingSignal]: |
| """Attempt to parse using the primary structured format.""" |
| match = self.PRIMARY_PATTERN.search(raw_output) |
| if not match: |
| return None |
|
|
| parsed_ticker = match.group(1).upper() |
| action_str = match.group(2).upper() |
| confidence_raw = int(match.group(3)) |
|
|
| action = Action(action_str) |
| confidence = TradingSignal.validate_confidence(confidence_raw) |
|
|
| prices = self._extract_prices(raw_output) |
| reasoning = self._extract_reasoning(raw_output) |
|
|
| return TradingSignal( |
| ticker=parsed_ticker, |
| action=action, |
| confidence=confidence, |
| entry_price=prices.get("entry"), |
| stop_loss=prices.get("stop_loss"), |
| target_price=prices.get("target"), |
| reasoning=reasoning, |
| ) |
|
|
| def _parse_fallback(self, raw_output: str, ticker: str) -> Optional[TradingSignal]: |
| """Attempt heuristic extraction from unstructured output.""" |
| action_match = self.ACTION_PATTERN.search(raw_output) |
| if not action_match: |
| return None |
|
|
| action = Action(action_match.group(1).upper()) |
|
|
| confidence_match = self.CONFIDENCE_PATTERN.search(raw_output) |
| if confidence_match: |
| confidence = TradingSignal.validate_confidence(int(confidence_match.group(1))) |
| else: |
| confidence = 50 |
|
|
| prices = self._extract_prices(raw_output) |
|
|
| return TradingSignal( |
| ticker=ticker.upper(), |
| action=action, |
| confidence=confidence, |
| entry_price=prices.get("entry"), |
| stop_loss=prices.get("stop_loss"), |
| target_price=prices.get("target"), |
| ) |
|
|
| def _extract_prices(self, raw_output: str) -> dict[str, Optional[float]]: |
| """Extract entry, stop-loss, and target prices from text. |
| |
| Finds all $XX.XX patterns and assigns: |
| - First as entry price |
| - Second as stop_loss |
| - Third as target price |
| """ |
| matches = self.PRICE_PATTERN.findall(raw_output) |
| prices: dict[str, Optional[float]] = { |
| "entry": None, |
| "stop_loss": None, |
| "target": None, |
| } |
|
|
| |
| parsed = [] |
| for m in matches: |
| cleaned = m.replace(",", "") |
| try: |
| parsed.append(float(cleaned)) |
| except ValueError: |
| continue |
|
|
| if len(parsed) >= 1: |
| prices["entry"] = parsed[0] |
| if len(parsed) >= 2: |
| prices["stop_loss"] = parsed[1] |
| if len(parsed) >= 3: |
| prices["target"] = parsed[2] |
|
|
| return prices |
|
|
| def _extract_reasoning(self, raw_output: str) -> Optional[dict[str, str]]: |
| """Extract per-agent reasoning summaries from text. |
| |
| Looks for lines like: |
| - Market: ... |
| - Fundamental: ... |
| - Technical: ... |
| - Risk: ... |
| """ |
| reasoning_pattern = re.compile( |
| r"^-\s*(Market|Fundamental|Technical|Risk)\s*:\s*(.+)$", |
| re.MULTILINE | re.IGNORECASE, |
| ) |
| matches = reasoning_pattern.findall(raw_output) |
|
|
| if not matches: |
| return None |
|
|
| reasoning = {} |
| for key, value in matches: |
| reasoning[key.strip()] = value.strip() |
|
|
| return reasoning if reasoning else None |
|
|