Spaces:
Running
Running
| """ | |
| telemetry/metrics.py | |
| βββββββββββββββββββββ | |
| Prometheus metrics for the Code Review Agent API. | |
| Metrics tracked: | |
| - code_agent_requests_total Counter: API requests by endpoint + status | |
| - code_agent_latency_seconds Histogram: end-to-end latency per phase | |
| - code_agent_token_cost_total Counter: OpenAI tokens consumed | |
| - code_agent_resolved_total Counter: issues resolved vs failed | |
| - code_agent_attempts_histogram Histogram: attempts per resolved issue | |
| - code_agent_localisation_recall Gauge: rolling recall@5 average | |
| - code_agent_cache_hits_total Counter: AST + embedding cache hits/misses | |
| - code_agent_active_tasks Gauge: currently running tasks | |
| - code_agent_failure_category_total Counter: failure categories breakdown | |
| Usage: | |
| from telemetry.metrics import METRICS | |
| METRICS.record_request("solve", 200, elapsed=12.3) | |
| METRICS.record_token_cost(prompt_tokens=800, completion_tokens=200) | |
| METRICS.record_resolution(resolved=True, attempts=2) | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import time | |
| from contextlib import contextmanager | |
| from dataclasses import dataclass, field | |
| from typing import Literal | |
| logger = logging.getLogger(__name__) | |
| # ββ Prometheus (graceful no-op if not installed) ββββββββββββββββββββββββββββββ | |
| try: | |
| from prometheus_client import ( | |
| Counter, Gauge, Histogram, Summary, | |
| CollectorRegistry, generate_latest, CONTENT_TYPE_LATEST, | |
| ) | |
| _PROM_AVAILABLE = True | |
| except ImportError: | |
| _PROM_AVAILABLE = False | |
| logger.debug("prometheus_client not installed β metrics disabled") | |
| class _NoOpMetric: | |
| """Fallback metric that silently ignores all calls.""" | |
| def labels(self, **kwargs): return self | |
| def inc(self, n=1): pass | |
| def dec(self, n=1): pass | |
| def set(self, v): pass | |
| def observe(self, v): pass | |
| def _make_counter(name, doc, labels=()): | |
| if _PROM_AVAILABLE: | |
| return Counter(name, doc, labels) | |
| return _NoOpMetric() | |
| def _make_histogram(name, doc, labels=(), buckets=None): | |
| if _PROM_AVAILABLE: | |
| kwargs = {"labelnames": labels} | |
| if buckets: | |
| kwargs["buckets"] = buckets | |
| return Histogram(name, doc, **kwargs) | |
| return _NoOpMetric() | |
| def _make_gauge(name, doc, labels=()): | |
| if _PROM_AVAILABLE: | |
| return Gauge(name, doc, labels) | |
| return _NoOpMetric() | |
| # ββ Metric definitions βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _requests_total = _make_counter( | |
| "code_agent_requests_total", | |
| "Total API requests", ["endpoint", "status"] | |
| ) | |
| _latency_seconds = _make_histogram( | |
| "code_agent_latency_seconds", | |
| "Request latency in seconds", ["phase"], | |
| buckets=[1, 5, 15, 30, 60, 120, 300] | |
| ) | |
| _token_cost_total = _make_counter( | |
| "code_agent_token_cost_total", | |
| "Total OpenAI tokens consumed", ["token_type"] | |
| ) | |
| _resolved_total = _make_counter( | |
| "code_agent_resolved_total", | |
| "Issues resolved vs failed", ["outcome"] | |
| ) | |
| _attempts_histogram = _make_histogram( | |
| "code_agent_attempts_histogram", | |
| "Attempts per issue", [], | |
| buckets=[1, 2, 3, 4, 5] | |
| ) | |
| _localisation_recall = _make_gauge( | |
| "code_agent_localisation_recall", | |
| "Rolling recall@5 average", ["k"] | |
| ) | |
| _cache_hits_total = _make_counter( | |
| "code_agent_cache_hits_total", | |
| "Cache hits and misses", ["cache_type", "result"] | |
| ) | |
| _active_tasks = _make_gauge( | |
| "code_agent_active_tasks", | |
| "Currently running agent tasks", [] | |
| ) | |
| _failure_category_total = _make_counter( | |
| "code_agent_failure_category_total", | |
| "Failure categories", ["category"] | |
| ) | |
| # ββ High-level metrics interface βββββββββββββββββββββββββββββββββββββββββββββββ | |
| class AgentMetrics: | |
| """ | |
| High-level metrics interface β wraps raw Prometheus metrics with | |
| domain-friendly methods. Can be used as a context manager for timing. | |
| """ | |
| def record_request(self, endpoint: str, status_code: int, elapsed: float) -> None: | |
| status = "2xx" if 200 <= status_code < 300 else f"{status_code // 100}xx" | |
| _requests_total.labels(endpoint=endpoint, status=status).inc() | |
| _latency_seconds.labels(phase="request").observe(elapsed) | |
| def record_phase_latency(self, phase: str, elapsed: float) -> None: | |
| """Record latency for a specific pipeline phase.""" | |
| _latency_seconds.labels(phase=phase).observe(elapsed) | |
| def record_token_cost(self, prompt_tokens: int, completion_tokens: int) -> None: | |
| _token_cost_total.labels(token_type="prompt").inc(prompt_tokens) | |
| _token_cost_total.labels(token_type="completion").inc(completion_tokens) | |
| def record_resolution(self, resolved: bool, attempts: int) -> None: | |
| outcome = "resolved" if resolved else "failed" | |
| _resolved_total.labels(outcome=outcome).inc() | |
| _attempts_histogram.observe(attempts) | |
| def record_localisation_recall(self, recall_at_5: float, recall_at_10: float) -> None: | |
| _localisation_recall.labels(k="5").set(recall_at_5) | |
| _localisation_recall.labels(k="10").set(recall_at_10) | |
| def record_cache_hit(self, cache_type: Literal["ast", "embedding", "repo"], hit: bool) -> None: | |
| result = "hit" if hit else "miss" | |
| _cache_hits_total.labels(cache_type=cache_type, result=result).inc() | |
| def record_failure_category(self, category: str) -> None: | |
| _failure_category_total.labels(category=category).inc() | |
| def task_started(self) -> None: | |
| _active_tasks.inc() | |
| def task_finished(self) -> None: | |
| _active_tasks.dec() | |
| def time_phase(self, phase: str): | |
| """Context manager: time a pipeline phase.""" | |
| start = time.monotonic() | |
| try: | |
| yield | |
| finally: | |
| self.record_phase_latency(phase, time.monotonic() - start) | |
| def prometheus_output(self) -> tuple[bytes, str]: | |
| """Return (metrics_bytes, content_type) for the /metrics endpoint.""" | |
| if _PROM_AVAILABLE: | |
| from prometheus_client import generate_latest, CONTENT_TYPE_LATEST | |
| return generate_latest(), CONTENT_TYPE_LATEST | |
| return b"# prometheus_client not installed\n", "text/plain" | |
| # Singleton | |
| METRICS = AgentMetrics() | |
| # ββ Cost tracker βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class CostTracker: | |
| """ | |
| Per-issue cost tracker. | |
| Estimates USD cost from token usage. | |
| Pricing (May 2025 approximate): | |
| GPT-4o: $5.00/M input, $15.00/M output | |
| text-embedding-3s: $0.02/M tokens | |
| DeepSeek-7B: ~$0.14/M tokens (self-hosted on RunPod) | |
| """ | |
| _prompt_tokens: int = 0 | |
| _completion_tokens: int = 0 | |
| _embedding_tokens: int = 0 | |
| # USD per 1M tokens | |
| PROMPT_COST_PER_M: float = 5.00 | |
| COMPLETION_COST_PER_M: float = 15.00 | |
| EMBEDDING_COST_PER_M: float = 0.02 | |
| def add_llm_tokens(self, prompt: int, completion: int) -> None: | |
| self._prompt_tokens += prompt | |
| self._completion_tokens += completion | |
| def add_embedding_tokens(self, n: int) -> None: | |
| self._embedding_tokens += n | |
| def total_tokens(self) -> int: | |
| return self._prompt_tokens + self._completion_tokens + self._embedding_tokens | |
| def estimated_usd(self) -> float: | |
| prompt_cost = self._prompt_tokens / 1e6 * self.PROMPT_COST_PER_M | |
| comp_cost = self._completion_tokens / 1e6 * self.COMPLETION_COST_PER_M | |
| embed_cost = self._embedding_tokens / 1e6 * self.EMBEDDING_COST_PER_M | |
| return round(prompt_cost + comp_cost + embed_cost, 6) | |
| def to_dict(self) -> dict: | |
| return { | |
| "prompt_tokens": self._prompt_tokens, | |
| "completion_tokens": self._completion_tokens, | |
| "embedding_tokens": self._embedding_tokens, | |
| "total_tokens": self.total_tokens, | |
| "estimated_usd": self.estimated_usd, | |
| } | |