"""ActivityFeedCallback implementation for real-time UI updates.""" from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone from enum import Enum from typing import TYPE_CHECKING, Callable, Optional if TYPE_CHECKING: from crew.signals import TradingSignal class EventType(str, Enum): """Types of activity feed events.""" TICKER_START = "ticker_start" TICKER_COMPLETE = "ticker_complete" TASK_START = "task_start" TASK_COMPLETE = "task_complete" TASK_FAILED = "task_failed" AGENT_OUTPUT = "agent_output" CREW_ERROR = "crew_error" @dataclass class ActivityEvent: """Structured payload for activity feed callbacks.""" event_type: EventType agent_name: str ticker: str message: str timestamp: datetime class ActivityFeedCallback: """Manages activity feed event dispatch to the Gradio UI.""" def __init__(self, handler: Callable[[ActivityEvent], None]) -> None: """Initialize with a handler function that receives ActivityEvent payloads. Args: handler: Function that receives ActivityEvent payloads. Typically connected to a Gradio state update. """ self._handler = handler def on_ticker_start(self, ticker: str) -> None: """Emit event when a ticker analysis begins.""" event = ActivityEvent( event_type=EventType.TICKER_START, agent_name="system", ticker=ticker, message=f"Starting analysis for {ticker}", timestamp=datetime.now(timezone.utc), ) self._emit(event) def on_ticker_complete( self, ticker: str, signal: Optional[TradingSignal] = None ) -> None: """Emit event when a ticker analysis completes.""" if signal is not None: message = f"Analysis complete for {ticker}: {signal.action.value} (Confidence: {signal.confidence}%)" else: message = f"Analysis complete for {ticker}" event = ActivityEvent( event_type=EventType.TICKER_COMPLETE, agent_name="system", ticker=ticker, message=message, timestamp=datetime.now(timezone.utc), ) self._emit(event) def on_task_start(self, agent_name: str, ticker: str) -> None: """Emit event when an agent task begins execution.""" event = ActivityEvent( event_type=EventType.TASK_START, agent_name=agent_name, ticker=ticker, message=f"{agent_name} started task for {ticker}", timestamp=datetime.now(timezone.utc), ) self._emit(event) def on_task_complete( self, agent_name: str, ticker: str, output_summary: str ) -> None: """Emit event when an agent task completes successfully.""" event = ActivityEvent( event_type=EventType.TASK_COMPLETE, agent_name=agent_name, ticker=ticker, message=output_summary, timestamp=datetime.now(timezone.utc), ) self._emit(event) def on_task_failed(self, agent_name: str, ticker: str, error: str) -> None: """Emit event when an agent task fails.""" event = ActivityEvent( event_type=EventType.TASK_FAILED, agent_name=agent_name, ticker=ticker, message=f"{agent_name} failed for {ticker}: {error}", timestamp=datetime.now(timezone.utc), ) self._emit(event) def on_agent_output(self, agent_name: str, ticker: str, output: str) -> None: """Emit event for intermediate agent output.""" event = ActivityEvent( event_type=EventType.AGENT_OUTPUT, agent_name=agent_name, ticker=ticker, message=output, timestamp=datetime.now(timezone.utc), ) self._emit(event) def _emit(self, event: ActivityEvent) -> None: """Dispatch event to the registered handler.""" self._handler(event)