File size: 4,103 Bytes
07ff2cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""ActivityFeedCallback implementation for real-time UI updates."""

from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import TYPE_CHECKING, Callable, Optional

if TYPE_CHECKING:
    from crew.signals import TradingSignal


class EventType(str, Enum):
    """Types of activity feed events."""

    TICKER_START = "ticker_start"
    TICKER_COMPLETE = "ticker_complete"
    TASK_START = "task_start"
    TASK_COMPLETE = "task_complete"
    TASK_FAILED = "task_failed"
    AGENT_OUTPUT = "agent_output"
    CREW_ERROR = "crew_error"


@dataclass
class ActivityEvent:
    """Structured payload for activity feed callbacks."""

    event_type: EventType
    agent_name: str
    ticker: str
    message: str
    timestamp: datetime


class ActivityFeedCallback:
    """Manages activity feed event dispatch to the Gradio UI."""

    def __init__(self, handler: Callable[[ActivityEvent], None]) -> None:
        """Initialize with a handler function that receives ActivityEvent payloads.

        Args:
            handler: Function that receives ActivityEvent payloads.
                     Typically connected to a Gradio state update.
        """
        self._handler = handler

    def on_ticker_start(self, ticker: str) -> None:
        """Emit event when a ticker analysis begins."""
        event = ActivityEvent(
            event_type=EventType.TICKER_START,
            agent_name="system",
            ticker=ticker,
            message=f"Starting analysis for {ticker}",
            timestamp=datetime.now(timezone.utc),
        )
        self._emit(event)

    def on_ticker_complete(
        self, ticker: str, signal: Optional[TradingSignal] = None
    ) -> None:
        """Emit event when a ticker analysis completes."""
        if signal is not None:
            message = f"Analysis complete for {ticker}: {signal.action.value} (Confidence: {signal.confidence}%)"
        else:
            message = f"Analysis complete for {ticker}"
        event = ActivityEvent(
            event_type=EventType.TICKER_COMPLETE,
            agent_name="system",
            ticker=ticker,
            message=message,
            timestamp=datetime.now(timezone.utc),
        )
        self._emit(event)

    def on_task_start(self, agent_name: str, ticker: str) -> None:
        """Emit event when an agent task begins execution."""
        event = ActivityEvent(
            event_type=EventType.TASK_START,
            agent_name=agent_name,
            ticker=ticker,
            message=f"{agent_name} started task for {ticker}",
            timestamp=datetime.now(timezone.utc),
        )
        self._emit(event)

    def on_task_complete(
        self, agent_name: str, ticker: str, output_summary: str
    ) -> None:
        """Emit event when an agent task completes successfully."""
        event = ActivityEvent(
            event_type=EventType.TASK_COMPLETE,
            agent_name=agent_name,
            ticker=ticker,
            message=output_summary,
            timestamp=datetime.now(timezone.utc),
        )
        self._emit(event)

    def on_task_failed(self, agent_name: str, ticker: str, error: str) -> None:
        """Emit event when an agent task fails."""
        event = ActivityEvent(
            event_type=EventType.TASK_FAILED,
            agent_name=agent_name,
            ticker=ticker,
            message=f"{agent_name} failed for {ticker}: {error}",
            timestamp=datetime.now(timezone.utc),
        )
        self._emit(event)

    def on_agent_output(self, agent_name: str, ticker: str, output: str) -> None:
        """Emit event for intermediate agent output."""
        event = ActivityEvent(
            event_type=EventType.AGENT_OUTPUT,
            agent_name=agent_name,
            ticker=ticker,
            message=output,
            timestamp=datetime.now(timezone.utc),
        )
        self._emit(event)

    def _emit(self, event: ActivityEvent) -> None:
        """Dispatch event to the registered handler."""
        self._handler(event)