interviewbot / backend /services /latency_service.py
sajith-0701's picture
v3.1
5837391
import math
from typing import Iterable
from database import get_redis
LATENCY_METRICS = ("stt_ms", "submit_ms", "gemini_ms")
_LATENCY_PREFIX = "metrics:latency"
_DEFAULT_SAMPLE_SIZE = 500
_MAX_SAMPLE_SIZE = 5000
_MAX_STORED_ITEMS = 5000
_METRICS_TTL_SECONDS = 7 * 24 * 60 * 60
def _metric_key(metric_name: str) -> str:
return f"{_LATENCY_PREFIX}:{metric_name}"
def _normalize_metric_names(metric_names: Iterable[str] | None) -> list[str]:
if not metric_names:
return list(LATENCY_METRICS)
normalized: list[str] = []
for metric in metric_names:
name = (metric or "").strip().lower()
if name in LATENCY_METRICS and name not in normalized:
normalized.append(name)
return normalized
def _normalize_sample_size(sample_size: int) -> int:
try:
value = int(sample_size)
except Exception:
value = _DEFAULT_SAMPLE_SIZE
return max(1, min(_MAX_SAMPLE_SIZE, value))
def _safe_float(value) -> float | None:
try:
parsed = float(value)
except Exception:
return None
if math.isnan(parsed) or math.isinf(parsed) or parsed < 0:
return None
return parsed
def _percentile(sorted_values: list[float], percentile: float) -> float | None:
if not sorted_values:
return None
if len(sorted_values) == 1:
return sorted_values[0]
position = ((len(sorted_values) - 1) * percentile) / 100.0
lower = int(math.floor(position))
upper = int(math.ceil(position))
if lower == upper:
return sorted_values[lower]
weight = position - lower
return sorted_values[lower] + (sorted_values[upper] - sorted_values[lower]) * weight
def _round(value: float | None) -> float | None:
if value is None:
return None
return round(value, 2)
async def record_latency(
metric_name: str,
duration_ms: float,
*,
ttl_seconds: int = _METRICS_TTL_SECONDS,
max_items: int = _MAX_STORED_ITEMS,
) -> None:
name = (metric_name or "").strip().lower()
if name not in LATENCY_METRICS:
return
value = _safe_float(duration_ms)
if value is None:
return
redis = get_redis()
if not redis:
return
key = _metric_key(name)
await redis.lpush(key, f"{value:.3f}")
await redis.ltrim(key, 0, max(0, int(max_items) - 1))
await redis.expire(key, int(ttl_seconds))
async def get_latency_metrics(
*,
metric_names: Iterable[str] | None = None,
sample_size: int = _DEFAULT_SAMPLE_SIZE,
) -> dict:
metrics = _normalize_metric_names(metric_names)
size = _normalize_sample_size(sample_size)
redis = get_redis()
if not redis:
return {
"sample_size": size,
"metrics": {name: _empty_summary() for name in metrics},
"message": "Redis is not available",
}
output: dict[str, dict] = {}
for metric in metrics:
raw = await redis.lrange(_metric_key(metric), 0, size - 1)
values: list[float] = []
for item in raw:
parsed = _safe_float(item)
if parsed is not None:
values.append(parsed)
# Stored newest-first in Redis; reverse to chronological for last_ms.
values.reverse()
output[metric] = _build_summary(values)
return {
"sample_size": size,
"metrics": output,
}
async def reset_latency_metrics(metric_names: Iterable[str] | None = None) -> dict:
metrics = _normalize_metric_names(metric_names)
redis = get_redis()
if not redis:
return {
"cleared": [],
"message": "Redis is not available",
}
keys = [_metric_key(metric) for metric in metrics]
if keys:
await redis.delete(*keys)
return {
"cleared": metrics,
}
def _empty_summary() -> dict:
return {
"count": 0,
"min_ms": None,
"avg_ms": None,
"p50_ms": None,
"p95_ms": None,
"max_ms": None,
"last_ms": None,
}
def _build_summary(values: list[float]) -> dict:
if not values:
return _empty_summary()
sorted_values = sorted(values)
count = len(sorted_values)
avg = sum(sorted_values) / count
return {
"count": count,
"min_ms": _round(sorted_values[0]),
"avg_ms": _round(avg),
"p50_ms": _round(_percentile(sorted_values, 50)),
"p95_ms": _round(_percentile(sorted_values, 95)),
"max_ms": _round(sorted_values[-1]),
"last_ms": _round(values[-1]),
}