Spaces:
Running
Running
File size: 8,332 Bytes
dc71cad | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | """
telemetry/metrics.py
βββββββββββββββββββββ
Prometheus metrics for the Code Review Agent API.
Metrics tracked:
- code_agent_requests_total Counter: API requests by endpoint + status
- code_agent_latency_seconds Histogram: end-to-end latency per phase
- code_agent_token_cost_total Counter: OpenAI tokens consumed
- code_agent_resolved_total Counter: issues resolved vs failed
- code_agent_attempts_histogram Histogram: attempts per resolved issue
- code_agent_localisation_recall Gauge: rolling recall@5 average
- code_agent_cache_hits_total Counter: AST + embedding cache hits/misses
- code_agent_active_tasks Gauge: currently running tasks
- code_agent_failure_category_total Counter: failure categories breakdown
Usage:
from telemetry.metrics import METRICS
METRICS.record_request("solve", 200, elapsed=12.3)
METRICS.record_token_cost(prompt_tokens=800, completion_tokens=200)
METRICS.record_resolution(resolved=True, attempts=2)
"""
from __future__ import annotations
import logging
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Literal
logger = logging.getLogger(__name__)
# ββ Prometheus (graceful no-op if not installed) ββββββββββββββββββββββββββββββ
try:
from prometheus_client import (
Counter, Gauge, Histogram, Summary,
CollectorRegistry, generate_latest, CONTENT_TYPE_LATEST,
)
_PROM_AVAILABLE = True
except ImportError:
_PROM_AVAILABLE = False
logger.debug("prometheus_client not installed β metrics disabled")
class _NoOpMetric:
"""Fallback metric that silently ignores all calls."""
def labels(self, **kwargs): return self
def inc(self, n=1): pass
def dec(self, n=1): pass
def set(self, v): pass
def observe(self, v): pass
def _make_counter(name, doc, labels=()):
if _PROM_AVAILABLE:
return Counter(name, doc, labels)
return _NoOpMetric()
def _make_histogram(name, doc, labels=(), buckets=None):
if _PROM_AVAILABLE:
kwargs = {"labelnames": labels}
if buckets:
kwargs["buckets"] = buckets
return Histogram(name, doc, **kwargs)
return _NoOpMetric()
def _make_gauge(name, doc, labels=()):
if _PROM_AVAILABLE:
return Gauge(name, doc, labels)
return _NoOpMetric()
# ββ Metric definitions βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
_requests_total = _make_counter(
"code_agent_requests_total",
"Total API requests", ["endpoint", "status"]
)
_latency_seconds = _make_histogram(
"code_agent_latency_seconds",
"Request latency in seconds", ["phase"],
buckets=[1, 5, 15, 30, 60, 120, 300]
)
_token_cost_total = _make_counter(
"code_agent_token_cost_total",
"Total OpenAI tokens consumed", ["token_type"]
)
_resolved_total = _make_counter(
"code_agent_resolved_total",
"Issues resolved vs failed", ["outcome"]
)
_attempts_histogram = _make_histogram(
"code_agent_attempts_histogram",
"Attempts per issue", [],
buckets=[1, 2, 3, 4, 5]
)
_localisation_recall = _make_gauge(
"code_agent_localisation_recall",
"Rolling recall@5 average", ["k"]
)
_cache_hits_total = _make_counter(
"code_agent_cache_hits_total",
"Cache hits and misses", ["cache_type", "result"]
)
_active_tasks = _make_gauge(
"code_agent_active_tasks",
"Currently running agent tasks", []
)
_failure_category_total = _make_counter(
"code_agent_failure_category_total",
"Failure categories", ["category"]
)
# ββ High-level metrics interface βββββββββββββββββββββββββββββββββββββββββββββββ
class AgentMetrics:
"""
High-level metrics interface β wraps raw Prometheus metrics with
domain-friendly methods. Can be used as a context manager for timing.
"""
def record_request(self, endpoint: str, status_code: int, elapsed: float) -> None:
status = "2xx" if 200 <= status_code < 300 else f"{status_code // 100}xx"
_requests_total.labels(endpoint=endpoint, status=status).inc()
_latency_seconds.labels(phase="request").observe(elapsed)
def record_phase_latency(self, phase: str, elapsed: float) -> None:
"""Record latency for a specific pipeline phase."""
_latency_seconds.labels(phase=phase).observe(elapsed)
def record_token_cost(self, prompt_tokens: int, completion_tokens: int) -> None:
_token_cost_total.labels(token_type="prompt").inc(prompt_tokens)
_token_cost_total.labels(token_type="completion").inc(completion_tokens)
def record_resolution(self, resolved: bool, attempts: int) -> None:
outcome = "resolved" if resolved else "failed"
_resolved_total.labels(outcome=outcome).inc()
_attempts_histogram.observe(attempts)
def record_localisation_recall(self, recall_at_5: float, recall_at_10: float) -> None:
_localisation_recall.labels(k="5").set(recall_at_5)
_localisation_recall.labels(k="10").set(recall_at_10)
def record_cache_hit(self, cache_type: Literal["ast", "embedding", "repo"], hit: bool) -> None:
result = "hit" if hit else "miss"
_cache_hits_total.labels(cache_type=cache_type, result=result).inc()
def record_failure_category(self, category: str) -> None:
_failure_category_total.labels(category=category).inc()
def task_started(self) -> None:
_active_tasks.inc()
def task_finished(self) -> None:
_active_tasks.dec()
@contextmanager
def time_phase(self, phase: str):
"""Context manager: time a pipeline phase."""
start = time.monotonic()
try:
yield
finally:
self.record_phase_latency(phase, time.monotonic() - start)
def prometheus_output(self) -> tuple[bytes, str]:
"""Return (metrics_bytes, content_type) for the /metrics endpoint."""
if _PROM_AVAILABLE:
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
return generate_latest(), CONTENT_TYPE_LATEST
return b"# prometheus_client not installed\n", "text/plain"
# Singleton
METRICS = AgentMetrics()
# ββ Cost tracker βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@dataclass
class CostTracker:
"""
Per-issue cost tracker.
Estimates USD cost from token usage.
Pricing (May 2025 approximate):
GPT-4o: $5.00/M input, $15.00/M output
text-embedding-3s: $0.02/M tokens
DeepSeek-7B: ~$0.14/M tokens (self-hosted on RunPod)
"""
_prompt_tokens: int = 0
_completion_tokens: int = 0
_embedding_tokens: int = 0
# USD per 1M tokens
PROMPT_COST_PER_M: float = 5.00
COMPLETION_COST_PER_M: float = 15.00
EMBEDDING_COST_PER_M: float = 0.02
def add_llm_tokens(self, prompt: int, completion: int) -> None:
self._prompt_tokens += prompt
self._completion_tokens += completion
def add_embedding_tokens(self, n: int) -> None:
self._embedding_tokens += n
@property
def total_tokens(self) -> int:
return self._prompt_tokens + self._completion_tokens + self._embedding_tokens
@property
def estimated_usd(self) -> float:
prompt_cost = self._prompt_tokens / 1e6 * self.PROMPT_COST_PER_M
comp_cost = self._completion_tokens / 1e6 * self.COMPLETION_COST_PER_M
embed_cost = self._embedding_tokens / 1e6 * self.EMBEDDING_COST_PER_M
return round(prompt_cost + comp_cost + embed_cost, 6)
def to_dict(self) -> dict:
return {
"prompt_tokens": self._prompt_tokens,
"completion_tokens": self._completion_tokens,
"embedding_tokens": self._embedding_tokens,
"total_tokens": self.total_tokens,
"estimated_usd": self.estimated_usd,
}
|