Spaces:
Running
Running
File size: 7,495 Bytes
5f3e9f5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 | """Performance metrics tracking for screenshot generation.
Thread-safe by way of a single ``threading.RLock``. Concurrent SSE workers
that finish at the same time would otherwise race on the underlying dicts
and silently drop entries. The recursive lock allows ``end()`` (which may
call ``get_metrics()`` indirectly) to nest without deadlocking.
Memory is bounded with a simple ordered eviction (``MAX_TRACKED_OPS``).
Without this every run leaves ~5 entries in memory forever.
"""
import threading
import time
from collections import OrderedDict
from datetime import datetime
# How many operations we keep before evicting the oldest. ~500 = a comfortable
# afternoon's worth of runs and only ~few hundred KB of RAM.
MAX_TRACKED_OPS = 500
class PerformanceMetrics:
"""Track and report performance metrics."""
def __init__(self, max_tracked: int = MAX_TRACKED_OPS):
self._lock = threading.RLock()
self._max_tracked = max_tracked
# OrderedDict preserves insertion order so ``popitem(last=False)``
# evicts the oldest tracked operation.
self.metrics: "OrderedDict[str, dict]" = OrderedDict()
self.start_times: "OrderedDict[str, float]" = OrderedDict()
def _evict_if_needed_locked(self) -> None:
"""Evict oldest entries while holding the lock."""
while len(self.metrics) > self._max_tracked:
oldest, _ = self.metrics.popitem(last=False)
self.start_times.pop(oldest, None)
def start(self, operation_id):
"""Start timing an operation."""
with self._lock:
self.start_times[operation_id] = time.time()
self.metrics[operation_id] = {
'start_time': datetime.now().isoformat(),
'status': 'running',
}
self._evict_if_needed_locked()
def end(self, operation_id, success=True, metadata=None):
"""End timing an operation."""
with self._lock:
if operation_id not in self.start_times:
return None
end_time = time.time()
duration = end_time - self.start_times[operation_id]
self.metrics[operation_id].update({
'end_time': datetime.now().isoformat(),
'duration_seconds': round(duration, 3),
'duration_ms': round(duration * 1000, 2),
'status': 'success' if success else 'failed',
'metadata': metadata or {},
})
return dict(self.metrics[operation_id])
def get_metrics(self, operation_id):
"""Get metrics for a specific operation."""
with self._lock:
entry = self.metrics.get(operation_id)
return dict(entry) if entry else None
def get_all_metrics(self):
"""Get all tracked metrics (snapshot copy)."""
with self._lock:
return {k: dict(v) for k, v in self.metrics.items()}
def clear(self):
"""Clear all metrics."""
with self._lock:
self.metrics.clear()
self.start_times.clear()
def format_duration(self, seconds):
"""Format duration in human-readable format."""
if seconds < 1:
return f"{seconds * 1000:.0f}ms"
elif seconds < 60:
return f"{seconds:.2f}s"
else:
minutes = int(seconds // 60)
secs = seconds % 60
return f"{minutes}m {secs:.0f}s"
def get_summary(self, operation_id):
"""Get formatted summary of operation metrics.
Works while an operation is still running — in that case we report
elapsed time instead of crashing on the missing duration_seconds
key (which is only set by end()).
"""
with self._lock:
metrics = self.metrics.get(operation_id)
if not metrics:
return None
if 'duration_seconds' not in metrics:
start = self.start_times.get(operation_id, time.time())
elapsed = time.time() - start
return {
'operation_id': operation_id,
'duration': self.format_duration(elapsed),
'duration_seconds': round(elapsed, 3),
'duration_ms': round(elapsed * 1000, 2),
'status': metrics.get('status', 'running'),
'start_time': metrics.get('start_time'),
'end_time': 'N/A',
'metadata': dict(metrics.get('metadata', {})),
}
summary = {
'operation_id': operation_id,
'duration': self.format_duration(metrics['duration_seconds']),
'duration_seconds': metrics['duration_seconds'],
'duration_ms': metrics['duration_ms'],
'status': metrics['status'],
'start_time': metrics['start_time'],
'end_time': metrics.get('end_time', 'N/A'),
}
if 'metadata' in metrics:
summary['metadata'] = dict(metrics['metadata'])
return summary
class ScreenshotMetrics(PerformanceMetrics):
"""Specialized metrics for screenshot operations."""
def _update_metadata(self, operation_id, extra: dict):
"""Merge ``extra`` into the operation's metadata under the lock.
Returns a snapshot of the resulting metric entry, or None if the
operation hasn't been started.
"""
with self._lock:
entry = self.metrics.get(operation_id)
if not entry:
return None
entry.setdefault('metadata', {}).update(extra)
return dict(entry)
def track_screenshot_generation(
self,
operation_id,
num_screenshots,
total_height,
viewport_size,
file_sizes=None,
):
"""Track screenshot-specific metrics."""
with self._lock:
entry = self.metrics.get(operation_id)
if not entry:
return None
duration = entry.get('duration_seconds', 0) or 0
metadata = {
'screenshot_count': num_screenshots,
'total_page_height': total_height,
'viewport_width': viewport_size[0],
'viewport_height': viewport_size[1],
'avg_time_per_screenshot': (
round(duration / num_screenshots, 3) if num_screenshots > 0 else 0
),
}
if file_sizes:
total_size = sum(file_sizes)
metadata['total_size_kb'] = round(total_size / 1024, 2)
metadata['avg_size_kb'] = round(total_size / len(file_sizes) / 1024, 2)
return self._update_metadata(operation_id, metadata)
def track_ai_request(self, operation_id, input_length, output_length, cached=False):
"""Track AI request metrics."""
with self._lock:
entry = self.metrics.get(operation_id)
if not entry:
return None
duration = entry.get('duration_seconds', 0) or 0
metadata = {
'input_length': input_length,
'output_length': output_length,
'cached': cached,
'tokens_per_second': (
round(output_length / duration, 2) if duration > 0 else 0
),
}
return self._update_metadata(operation_id, metadata)
# Global metrics instance
metrics_tracker = ScreenshotMetrics()
|