File size: 5,091 Bytes
5f3e9f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""Progress tracking system for PowerPoint automation operations.

This module provides real-time progress monitoring and event broadcasting
for long-running PowerPoint operations like presentation creation and video export.
"""

from dataclasses import dataclass, asdict
from datetime import datetime
from queue import Queue
from typing import Dict, Any, Callable, List, Optional
import json


@dataclass
class ProgressEvent:
    """Progress update event for SSE streaming."""
    
    event_type: str  # "slide_inserted", "video_export_started", etc.
    operation_id: str
    timestamp: datetime
    data: Dict[str, Any]  # Event-specific data
    
    def to_sse_format(self) -> str:
        """Convert to Server-Sent Events format.
        
        Returns:
            Formatted SSE string with event type and JSON data
        """
        # Convert datetime to ISO format string for JSON serialization
        event_dict = {
            'event_type': self.event_type,
            'operation_id': self.operation_id,
            'timestamp': self.timestamp.isoformat(),
            'data': self.data
        }
        
        # SSE format: event: <type>\ndata: <json>\n\n
        return f"event: {self.event_type}\ndata: {json.dumps(self.data)}\n\n"


class ProgressTracker:
    """Track and broadcast operation progress.
    
    This class manages progress events for PowerPoint operations,
    allowing multiple listeners to receive real-time updates via
    callbacks or event queues.
    """
    
    def __init__(self, operation_id: str):
        """Initialize progress tracker for an operation.
        
        Args:
            operation_id: Unique identifier for the operation being tracked
        """
        self.operation_id = operation_id
        self.event_queue: Queue = Queue()
        self.listeners: List[Callable[[ProgressEvent], None]] = []
        self._events_history: List[ProgressEvent] = []
    
    def emit(self, event_type: str, data: Dict[str, Any]) -> None:
        """Emit a progress event.
        
        Args:
            event_type: Type of event (e.g., "slide_inserted", "completed")
            data: Event-specific data dictionary
        """
        event = ProgressEvent(
            event_type=event_type,
            operation_id=self.operation_id,
            timestamp=datetime.now(),
            data=data
        )
        
        # Store in history
        self._events_history.append(event)
        
        # Add to queue for SSE streaming
        self.event_queue.put(event)
        
        # Notify all registered listeners
        for listener in self.listeners:
            try:
                listener(event)
            except Exception as e:
                # Don't let listener errors break the tracker
                print(f"Error in progress listener: {e}")
    
    def add_listener(self, callback: Callable[[ProgressEvent], None]) -> None:
        """Add a progress listener callback.
        
        Args:
            callback: Function to call when events are emitted
        """
        self.listeners.append(callback)
    
    def remove_listener(self, callback: Callable[[ProgressEvent], None]) -> None:
        """Remove a progress listener callback.
        
        Args:
            callback: Function to remove from listeners
        """
        if callback in self.listeners:
            self.listeners.remove(callback)
    
    def get_events(self) -> List[ProgressEvent]:
        """Get all events emitted so far.
        
        Returns:
            List of all progress events in chronological order
        """
        return self._events_history.copy()
    
    def get_latest_event(self) -> Optional[ProgressEvent]:
        """Get the most recent event.
        
        Returns:
            Latest progress event or None if no events emitted
        """
        return self._events_history[-1] if self._events_history else None
    
    def clear_history(self) -> None:
        """Clear the event history."""
        self._events_history.clear()


# Global registry of active progress trackers
_active_trackers: Dict[str, ProgressTracker] = {}


def get_tracker(operation_id: str) -> Optional[ProgressTracker]:
    """Get an existing progress tracker by operation ID.
    
    Args:
        operation_id: Unique identifier for the operation
        
    Returns:
        ProgressTracker instance or None if not found
    """
    return _active_trackers.get(operation_id)


def create_tracker(operation_id: str) -> ProgressTracker:
    """Create and register a new progress tracker.
    
    Args:
        operation_id: Unique identifier for the operation
        
    Returns:
        New ProgressTracker instance
    """
    tracker = ProgressTracker(operation_id)
    _active_trackers[operation_id] = tracker
    return tracker


def remove_tracker(operation_id: str) -> None:
    """Remove a progress tracker from the registry.
    
    Args:
        operation_id: Unique identifier for the operation
    """
    if operation_id in _active_trackers:
        del _active_trackers[operation_id]