Spaces:
Running
Running
| """Performance metrics tracking for screenshot generation. | |
| Thread-safe by way of a single ``threading.RLock``. Concurrent SSE workers | |
| that finish at the same time would otherwise race on the underlying dicts | |
| and silently drop entries. The recursive lock allows ``end()`` (which may | |
| call ``get_metrics()`` indirectly) to nest without deadlocking. | |
| Memory is bounded with a simple ordered eviction (``MAX_TRACKED_OPS``). | |
| Without this every run leaves ~5 entries in memory forever. | |
| """ | |
| import threading | |
| import time | |
| from collections import OrderedDict | |
| from datetime import datetime | |
| # How many operations we keep before evicting the oldest. ~500 = a comfortable | |
| # afternoon's worth of runs and only ~few hundred KB of RAM. | |
| MAX_TRACKED_OPS = 500 | |
| class PerformanceMetrics: | |
| """Track and report performance metrics.""" | |
| def __init__(self, max_tracked: int = MAX_TRACKED_OPS): | |
| self._lock = threading.RLock() | |
| self._max_tracked = max_tracked | |
| # OrderedDict preserves insertion order so ``popitem(last=False)`` | |
| # evicts the oldest tracked operation. | |
| self.metrics: "OrderedDict[str, dict]" = OrderedDict() | |
| self.start_times: "OrderedDict[str, float]" = OrderedDict() | |
| def _evict_if_needed_locked(self) -> None: | |
| """Evict oldest entries while holding the lock.""" | |
| while len(self.metrics) > self._max_tracked: | |
| oldest, _ = self.metrics.popitem(last=False) | |
| self.start_times.pop(oldest, None) | |
| def start(self, operation_id): | |
| """Start timing an operation.""" | |
| with self._lock: | |
| self.start_times[operation_id] = time.time() | |
| self.metrics[operation_id] = { | |
| 'start_time': datetime.now().isoformat(), | |
| 'status': 'running', | |
| } | |
| self._evict_if_needed_locked() | |
| def end(self, operation_id, success=True, metadata=None): | |
| """End timing an operation.""" | |
| with self._lock: | |
| if operation_id not in self.start_times: | |
| return None | |
| end_time = time.time() | |
| duration = end_time - self.start_times[operation_id] | |
| self.metrics[operation_id].update({ | |
| 'end_time': datetime.now().isoformat(), | |
| 'duration_seconds': round(duration, 3), | |
| 'duration_ms': round(duration * 1000, 2), | |
| 'status': 'success' if success else 'failed', | |
| 'metadata': metadata or {}, | |
| }) | |
| return dict(self.metrics[operation_id]) | |
| def get_metrics(self, operation_id): | |
| """Get metrics for a specific operation.""" | |
| with self._lock: | |
| entry = self.metrics.get(operation_id) | |
| return dict(entry) if entry else None | |
| def get_all_metrics(self): | |
| """Get all tracked metrics (snapshot copy).""" | |
| with self._lock: | |
| return {k: dict(v) for k, v in self.metrics.items()} | |
| def clear(self): | |
| """Clear all metrics.""" | |
| with self._lock: | |
| self.metrics.clear() | |
| self.start_times.clear() | |
| def format_duration(self, seconds): | |
| """Format duration in human-readable format.""" | |
| if seconds < 1: | |
| return f"{seconds * 1000:.0f}ms" | |
| elif seconds < 60: | |
| return f"{seconds:.2f}s" | |
| else: | |
| minutes = int(seconds // 60) | |
| secs = seconds % 60 | |
| return f"{minutes}m {secs:.0f}s" | |
| def get_summary(self, operation_id): | |
| """Get formatted summary of operation metrics. | |
| Works while an operation is still running — in that case we report | |
| elapsed time instead of crashing on the missing duration_seconds | |
| key (which is only set by end()). | |
| """ | |
| with self._lock: | |
| metrics = self.metrics.get(operation_id) | |
| if not metrics: | |
| return None | |
| if 'duration_seconds' not in metrics: | |
| start = self.start_times.get(operation_id, time.time()) | |
| elapsed = time.time() - start | |
| return { | |
| 'operation_id': operation_id, | |
| 'duration': self.format_duration(elapsed), | |
| 'duration_seconds': round(elapsed, 3), | |
| 'duration_ms': round(elapsed * 1000, 2), | |
| 'status': metrics.get('status', 'running'), | |
| 'start_time': metrics.get('start_time'), | |
| 'end_time': 'N/A', | |
| 'metadata': dict(metrics.get('metadata', {})), | |
| } | |
| summary = { | |
| 'operation_id': operation_id, | |
| 'duration': self.format_duration(metrics['duration_seconds']), | |
| 'duration_seconds': metrics['duration_seconds'], | |
| 'duration_ms': metrics['duration_ms'], | |
| 'status': metrics['status'], | |
| 'start_time': metrics['start_time'], | |
| 'end_time': metrics.get('end_time', 'N/A'), | |
| } | |
| if 'metadata' in metrics: | |
| summary['metadata'] = dict(metrics['metadata']) | |
| return summary | |
| class ScreenshotMetrics(PerformanceMetrics): | |
| """Specialized metrics for screenshot operations.""" | |
| def _update_metadata(self, operation_id, extra: dict): | |
| """Merge ``extra`` into the operation's metadata under the lock. | |
| Returns a snapshot of the resulting metric entry, or None if the | |
| operation hasn't been started. | |
| """ | |
| with self._lock: | |
| entry = self.metrics.get(operation_id) | |
| if not entry: | |
| return None | |
| entry.setdefault('metadata', {}).update(extra) | |
| return dict(entry) | |
| def track_screenshot_generation( | |
| self, | |
| operation_id, | |
| num_screenshots, | |
| total_height, | |
| viewport_size, | |
| file_sizes=None, | |
| ): | |
| """Track screenshot-specific metrics.""" | |
| with self._lock: | |
| entry = self.metrics.get(operation_id) | |
| if not entry: | |
| return None | |
| duration = entry.get('duration_seconds', 0) or 0 | |
| metadata = { | |
| 'screenshot_count': num_screenshots, | |
| 'total_page_height': total_height, | |
| 'viewport_width': viewport_size[0], | |
| 'viewport_height': viewport_size[1], | |
| 'avg_time_per_screenshot': ( | |
| round(duration / num_screenshots, 3) if num_screenshots > 0 else 0 | |
| ), | |
| } | |
| if file_sizes: | |
| total_size = sum(file_sizes) | |
| metadata['total_size_kb'] = round(total_size / 1024, 2) | |
| metadata['avg_size_kb'] = round(total_size / len(file_sizes) / 1024, 2) | |
| return self._update_metadata(operation_id, metadata) | |
| def track_ai_request(self, operation_id, input_length, output_length, cached=False): | |
| """Track AI request metrics.""" | |
| with self._lock: | |
| entry = self.metrics.get(operation_id) | |
| if not entry: | |
| return None | |
| duration = entry.get('duration_seconds', 0) or 0 | |
| metadata = { | |
| 'input_length': input_length, | |
| 'output_length': output_length, | |
| 'cached': cached, | |
| 'tokens_per_second': ( | |
| round(output_length / duration, 2) if duration > 0 else 0 | |
| ), | |
| } | |
| return self._update_metadata(operation_id, metadata) | |
| # Global metrics instance | |
| metrics_tracker = ScreenshotMetrics() | |