Spaces:
Running
Running
File size: 5,091 Bytes
5f3e9f5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | """Progress tracking system for PowerPoint automation operations.
This module provides real-time progress monitoring and event broadcasting
for long-running PowerPoint operations like presentation creation and video export.
"""
from dataclasses import dataclass, asdict
from datetime import datetime
from queue import Queue
from typing import Dict, Any, Callable, List, Optional
import json
@dataclass
class ProgressEvent:
"""Progress update event for SSE streaming."""
event_type: str # "slide_inserted", "video_export_started", etc.
operation_id: str
timestamp: datetime
data: Dict[str, Any] # Event-specific data
def to_sse_format(self) -> str:
"""Convert to Server-Sent Events format.
Returns:
Formatted SSE string with event type and JSON data
"""
# Convert datetime to ISO format string for JSON serialization
event_dict = {
'event_type': self.event_type,
'operation_id': self.operation_id,
'timestamp': self.timestamp.isoformat(),
'data': self.data
}
# SSE format: event: <type>\ndata: <json>\n\n
return f"event: {self.event_type}\ndata: {json.dumps(self.data)}\n\n"
class ProgressTracker:
"""Track and broadcast operation progress.
This class manages progress events for PowerPoint operations,
allowing multiple listeners to receive real-time updates via
callbacks or event queues.
"""
def __init__(self, operation_id: str):
"""Initialize progress tracker for an operation.
Args:
operation_id: Unique identifier for the operation being tracked
"""
self.operation_id = operation_id
self.event_queue: Queue = Queue()
self.listeners: List[Callable[[ProgressEvent], None]] = []
self._events_history: List[ProgressEvent] = []
def emit(self, event_type: str, data: Dict[str, Any]) -> None:
"""Emit a progress event.
Args:
event_type: Type of event (e.g., "slide_inserted", "completed")
data: Event-specific data dictionary
"""
event = ProgressEvent(
event_type=event_type,
operation_id=self.operation_id,
timestamp=datetime.now(),
data=data
)
# Store in history
self._events_history.append(event)
# Add to queue for SSE streaming
self.event_queue.put(event)
# Notify all registered listeners
for listener in self.listeners:
try:
listener(event)
except Exception as e:
# Don't let listener errors break the tracker
print(f"Error in progress listener: {e}")
def add_listener(self, callback: Callable[[ProgressEvent], None]) -> None:
"""Add a progress listener callback.
Args:
callback: Function to call when events are emitted
"""
self.listeners.append(callback)
def remove_listener(self, callback: Callable[[ProgressEvent], None]) -> None:
"""Remove a progress listener callback.
Args:
callback: Function to remove from listeners
"""
if callback in self.listeners:
self.listeners.remove(callback)
def get_events(self) -> List[ProgressEvent]:
"""Get all events emitted so far.
Returns:
List of all progress events in chronological order
"""
return self._events_history.copy()
def get_latest_event(self) -> Optional[ProgressEvent]:
"""Get the most recent event.
Returns:
Latest progress event or None if no events emitted
"""
return self._events_history[-1] if self._events_history else None
def clear_history(self) -> None:
"""Clear the event history."""
self._events_history.clear()
# Global registry of active progress trackers
_active_trackers: Dict[str, ProgressTracker] = {}
def get_tracker(operation_id: str) -> Optional[ProgressTracker]:
"""Get an existing progress tracker by operation ID.
Args:
operation_id: Unique identifier for the operation
Returns:
ProgressTracker instance or None if not found
"""
return _active_trackers.get(operation_id)
def create_tracker(operation_id: str) -> ProgressTracker:
"""Create and register a new progress tracker.
Args:
operation_id: Unique identifier for the operation
Returns:
New ProgressTracker instance
"""
tracker = ProgressTracker(operation_id)
_active_trackers[operation_id] = tracker
return tracker
def remove_tracker(operation_id: str) -> None:
"""Remove a progress tracker from the registry.
Args:
operation_id: Unique identifier for the operation
"""
if operation_id in _active_trackers:
del _active_trackers[operation_id]
|