File size: 9,770 Bytes
07ff2cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ae549a
 
 
 
 
 
07ff2cb
 
 
 
eb4ef03
 
 
07ff2cb
 
 
 
 
 
 
 
0ae549a
 
 
 
 
 
 
 
 
 
 
 
eb4ef03
0ae549a
eb4ef03
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
07ff2cb
eb4ef03
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
07ff2cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
"""TradingSignal dataclass and parser for structured output handling."""

import re
from dataclasses import dataclass
from enum import Enum
from typing import Optional


class Action(str, Enum):
    """Trading signal actions."""

    BUY = "BUY"
    SELL = "SELL"
    HOLD = "HOLD"


@dataclass
class TradingSignal:
    """Structured trading signal output."""

    ticker: str
    action: Action
    confidence: int  # 0-100
    entry_price: Optional[float] = None
    stop_loss: Optional[float] = None
    target_price: Optional[float] = None
    reasoning: Optional[dict[str, str]] = None  # {agent_name: summary}

    @staticmethod
    def validate_confidence(value: int) -> int:
        """Clamp confidence to 0-100 range."""
        return max(0, min(100, value))


class TradingSignalParser:
    """Parses raw LLM output into structured TradingSignal objects."""

    # Primary format: "AAPL — BUY (Confidence: 75%)"
    PRIMARY_PATTERN = re.compile(
        r"([A-Z\-\.]+)\s*[—–-]\s*(BUY|SELL|HOLD)\s*\(Confidence:\s*(\d{1,3})%\)",
        re.IGNORECASE,
    )

    # Fallback patterns for less structured output
    ACTION_PATTERN = re.compile(r"\b(BUY|SELL|HOLD)\b", re.IGNORECASE)
    CONFIDENCE_PATTERN = re.compile(r"(\d{1,3})\s*%")
    PRICE_PATTERN = re.compile(r"\$\s*([\d,]+\.?\d*)")

    # Reasoning-block markers Qwen3 wraps its deliberation in. We strip
    # these at the entry point so the downstream regex patterns can
    # match against the clean final answer rather than the verbose
    # chain-of-thought that surrounds it.
    THINK_BLOCK = re.compile(r"<think>.*?</think>", re.DOTALL | re.IGNORECASE)

    def parse(self, raw_output: str, ticker: str) -> Optional[TradingSignal]:
        """Parse raw output into a TradingSignal.

        Attempts primary pattern first, falls back to heuristic extraction.
        After parsing, prices are sanity-checked so the Strategist can't
        ship obviously-wrong numbers (e.g. $50 000 target on a $290 stock,
        or stop-loss on the wrong side of entry for the given action).

        Args:
            raw_output: Raw text from Chief Strategist agent
            ticker: Expected ticker symbol

        Returns:
            TradingSignal if parsing succeeds, None if output is unparseable
        """
        # Strip Qwen3's <think>...</think> reasoning blocks before
        # matching. Qwen will usually wrap its internal deliberation in
        # these tags and place the structured final answer *after* the
        # closing </think>, but a stray token or unclosed tag used to
        # push the whole output into the fallback path. Removing them
        # lets the primary pattern match reliably.
        cleaned = self.THINK_BLOCK.sub("", raw_output)
        # Also drop any leftover lone <think> or </think> tag so the
        # next regex doesn't accidentally anchor on them.
        cleaned = re.sub(r"</?think>", "", cleaned, flags=re.IGNORECASE)

        signal = self._parse_primary(cleaned, ticker)
        if signal is None:
            signal = self._parse_fallback(cleaned, ticker)
        if signal is None:
            return None
        return self._sanity_fix_prices(signal)

    @staticmethod
    def _sanity_fix_prices(signal: TradingSignal) -> TradingSignal:
        """Clamp runaway prices and enforce BUY/SELL inequality semantics.

        Small LLMs occasionally emit obvious absurdities at synthesis time
        (e.g. target that is 100× entry, or stop_loss on the wrong side
        of entry for a BUY). Rather than surface nonsense to the user,
        detect these and replace with conservative defaults derived from
        the entry price:

        * For BUY:  stop = entry × 0.97, target = entry × 1.05
        * For SELL: stop = entry × 1.03, target = entry × 0.95
        * For HOLD: defaults left as-is.

        We only override a field when it fails a plausibility test
        (>20 % away from entry, or on the wrong side of entry for the
        current action). Valid prices from the model are preserved.
        """
        entry = signal.entry_price
        # No entry price → nothing to clamp against.
        if entry is None or entry <= 0:
            return signal

        stop = signal.stop_loss
        target = signal.target_price
        action = signal.action

        def _out_of_band(value: Optional[float]) -> bool:
            """True if ``value`` is missing, non-positive, or >20 % from entry."""
            if value is None or value <= 0:
                return True
            return abs(value - entry) / entry > 0.20

        def _wrong_side_stop(value: Optional[float]) -> bool:
            """Stop on the wrong side of entry for the current action."""
            if value is None:
                return False
            if action == Action.BUY and value >= entry:
                return True
            if action == Action.SELL and value <= entry:
                return True
            return False

        def _wrong_side_target(value: Optional[float]) -> bool:
            """Target on the wrong side of entry for the current action."""
            if value is None:
                return False
            if action == Action.BUY and value <= entry:
                return True
            if action == Action.SELL and value >= entry:
                return True
            return False

        if action == Action.BUY:
            if _out_of_band(stop) or _wrong_side_stop(stop):
                stop = round(entry * 0.97, 2)
            if _out_of_band(target) or _wrong_side_target(target):
                target = round(entry * 1.05, 2)
        elif action == Action.SELL:
            if _out_of_band(stop) or _wrong_side_stop(stop):
                stop = round(entry * 1.03, 2)
            if _out_of_band(target) or _wrong_side_target(target):
                target = round(entry * 0.95, 2)
        # HOLD: leave as-is; the Strategist can describe a range.

        return TradingSignal(
            ticker=signal.ticker,
            action=signal.action,
            confidence=signal.confidence,
            entry_price=entry,
            stop_loss=stop,
            target_price=target,
            reasoning=signal.reasoning,
        )

    def _parse_primary(self, raw_output: str, ticker: str) -> Optional[TradingSignal]:
        """Attempt to parse using the primary structured format."""
        match = self.PRIMARY_PATTERN.search(raw_output)
        if not match:
            return None

        parsed_ticker = match.group(1).upper()
        action_str = match.group(2).upper()
        confidence_raw = int(match.group(3))

        action = Action(action_str)
        confidence = TradingSignal.validate_confidence(confidence_raw)

        prices = self._extract_prices(raw_output)
        reasoning = self._extract_reasoning(raw_output)

        return TradingSignal(
            ticker=parsed_ticker,
            action=action,
            confidence=confidence,
            entry_price=prices.get("entry"),
            stop_loss=prices.get("stop_loss"),
            target_price=prices.get("target"),
            reasoning=reasoning,
        )

    def _parse_fallback(self, raw_output: str, ticker: str) -> Optional[TradingSignal]:
        """Attempt heuristic extraction from unstructured output."""
        action_match = self.ACTION_PATTERN.search(raw_output)
        if not action_match:
            return None

        action = Action(action_match.group(1).upper())

        confidence_match = self.CONFIDENCE_PATTERN.search(raw_output)
        if confidence_match:
            confidence = TradingSignal.validate_confidence(int(confidence_match.group(1)))
        else:
            confidence = 50  # Default confidence when not specified

        prices = self._extract_prices(raw_output)

        return TradingSignal(
            ticker=ticker.upper(),
            action=action,
            confidence=confidence,
            entry_price=prices.get("entry"),
            stop_loss=prices.get("stop_loss"),
            target_price=prices.get("target"),
        )

    def _extract_prices(self, raw_output: str) -> dict[str, Optional[float]]:
        """Extract entry, stop-loss, and target prices from text.

        Finds all $XX.XX patterns and assigns:
        - First as entry price
        - Second as stop_loss
        - Third as target price
        """
        matches = self.PRICE_PATTERN.findall(raw_output)
        prices: dict[str, Optional[float]] = {
            "entry": None,
            "stop_loss": None,
            "target": None,
        }

        # Parse matched price strings, removing commas
        parsed = []
        for m in matches:
            cleaned = m.replace(",", "")
            try:
                parsed.append(float(cleaned))
            except ValueError:
                continue

        if len(parsed) >= 1:
            prices["entry"] = parsed[0]
        if len(parsed) >= 2:
            prices["stop_loss"] = parsed[1]
        if len(parsed) >= 3:
            prices["target"] = parsed[2]

        return prices

    def _extract_reasoning(self, raw_output: str) -> Optional[dict[str, str]]:
        """Extract per-agent reasoning summaries from text.

        Looks for lines like:
        - Market: ...
        - Fundamental: ...
        - Technical: ...
        - Risk: ...
        """
        reasoning_pattern = re.compile(
            r"^-\s*(Market|Fundamental|Technical|Risk)\s*:\s*(.+)$",
            re.MULTILINE | re.IGNORECASE,
        )
        matches = reasoning_pattern.findall(raw_output)

        if not matches:
            return None

        reasoning = {}
        for key, value in matches:
            reasoning[key.strip()] = value.strip()

        return reasoning if reasoning else None