Spaces:
Sleeping
Sleeping
File size: 4,552 Bytes
5837391 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | import math
from typing import Iterable
from database import get_redis
LATENCY_METRICS = ("stt_ms", "submit_ms", "gemini_ms")
_LATENCY_PREFIX = "metrics:latency"
_DEFAULT_SAMPLE_SIZE = 500
_MAX_SAMPLE_SIZE = 5000
_MAX_STORED_ITEMS = 5000
_METRICS_TTL_SECONDS = 7 * 24 * 60 * 60
def _metric_key(metric_name: str) -> str:
return f"{_LATENCY_PREFIX}:{metric_name}"
def _normalize_metric_names(metric_names: Iterable[str] | None) -> list[str]:
if not metric_names:
return list(LATENCY_METRICS)
normalized: list[str] = []
for metric in metric_names:
name = (metric or "").strip().lower()
if name in LATENCY_METRICS and name not in normalized:
normalized.append(name)
return normalized
def _normalize_sample_size(sample_size: int) -> int:
try:
value = int(sample_size)
except Exception:
value = _DEFAULT_SAMPLE_SIZE
return max(1, min(_MAX_SAMPLE_SIZE, value))
def _safe_float(value) -> float | None:
try:
parsed = float(value)
except Exception:
return None
if math.isnan(parsed) or math.isinf(parsed) or parsed < 0:
return None
return parsed
def _percentile(sorted_values: list[float], percentile: float) -> float | None:
if not sorted_values:
return None
if len(sorted_values) == 1:
return sorted_values[0]
position = ((len(sorted_values) - 1) * percentile) / 100.0
lower = int(math.floor(position))
upper = int(math.ceil(position))
if lower == upper:
return sorted_values[lower]
weight = position - lower
return sorted_values[lower] + (sorted_values[upper] - sorted_values[lower]) * weight
def _round(value: float | None) -> float | None:
if value is None:
return None
return round(value, 2)
async def record_latency(
metric_name: str,
duration_ms: float,
*,
ttl_seconds: int = _METRICS_TTL_SECONDS,
max_items: int = _MAX_STORED_ITEMS,
) -> None:
name = (metric_name or "").strip().lower()
if name not in LATENCY_METRICS:
return
value = _safe_float(duration_ms)
if value is None:
return
redis = get_redis()
if not redis:
return
key = _metric_key(name)
await redis.lpush(key, f"{value:.3f}")
await redis.ltrim(key, 0, max(0, int(max_items) - 1))
await redis.expire(key, int(ttl_seconds))
async def get_latency_metrics(
*,
metric_names: Iterable[str] | None = None,
sample_size: int = _DEFAULT_SAMPLE_SIZE,
) -> dict:
metrics = _normalize_metric_names(metric_names)
size = _normalize_sample_size(sample_size)
redis = get_redis()
if not redis:
return {
"sample_size": size,
"metrics": {name: _empty_summary() for name in metrics},
"message": "Redis is not available",
}
output: dict[str, dict] = {}
for metric in metrics:
raw = await redis.lrange(_metric_key(metric), 0, size - 1)
values: list[float] = []
for item in raw:
parsed = _safe_float(item)
if parsed is not None:
values.append(parsed)
# Stored newest-first in Redis; reverse to chronological for last_ms.
values.reverse()
output[metric] = _build_summary(values)
return {
"sample_size": size,
"metrics": output,
}
async def reset_latency_metrics(metric_names: Iterable[str] | None = None) -> dict:
metrics = _normalize_metric_names(metric_names)
redis = get_redis()
if not redis:
return {
"cleared": [],
"message": "Redis is not available",
}
keys = [_metric_key(metric) for metric in metrics]
if keys:
await redis.delete(*keys)
return {
"cleared": metrics,
}
def _empty_summary() -> dict:
return {
"count": 0,
"min_ms": None,
"avg_ms": None,
"p50_ms": None,
"p95_ms": None,
"max_ms": None,
"last_ms": None,
}
def _build_summary(values: list[float]) -> dict:
if not values:
return _empty_summary()
sorted_values = sorted(values)
count = len(sorted_values)
avg = sum(sorted_values) / count
return {
"count": count,
"min_ms": _round(sorted_values[0]),
"avg_ms": _round(avg),
"p50_ms": _round(_percentile(sorted_values, 50)),
"p95_ms": _round(_percentile(sorted_values, 95)),
"max_ms": _round(sorted_values[-1]),
"last_ms": _round(values[-1]),
}
|