""" tests/test_phase8_9_telemetry_benchmark.py ─────────────────────────────────────────── Tests for Phase 8 (Telemetry) and Phase 9 (Benchmarking). All tests run without external services (Prometheus, Redis, SWE-bench). Run with: pytest tests/test_phase8_9_telemetry_benchmark.py -v """ from __future__ import annotations import json import time from pathlib import Path from unittest.mock import MagicMock, patch import pytest # ══════════════════════════════════════════════════════════════════════ # Phase 8 — Telemetry # ══════════════════════════════════════════════════════════════════════ class TestCostTracker: def test_initial_state(self): from telemetry.metrics import CostTracker ct = CostTracker() assert ct.total_tokens == 0 assert ct.estimated_usd == 0.0 def test_add_llm_tokens(self): from telemetry.metrics import CostTracker ct = CostTracker() ct.add_llm_tokens(prompt=800, completion=200) assert ct.total_tokens == 1000 def test_add_embedding_tokens(self): from telemetry.metrics import CostTracker ct = CostTracker() ct.add_embedding_tokens(5000) assert ct.total_tokens == 5000 def test_cost_estimation_positive(self): from telemetry.metrics import CostTracker ct = CostTracker() ct.add_llm_tokens(1_000_000, 500_000) # 1M prompt + 500K completion usd = ct.estimated_usd # 1M prompt @ $5 = $5 + 500K completion @ $15 = $7.50 → total $12.50 assert 10.0 < usd < 15.0 def test_embedding_cost_is_cheap(self): from telemetry.metrics import CostTracker ct = CostTracker() ct.add_embedding_tokens(1_000_000) # 1M embedding tokens # $0.02/M → $0.02 assert ct.estimated_usd < 0.1 def test_to_dict_has_expected_keys(self): from telemetry.metrics import CostTracker ct = CostTracker() ct.add_llm_tokens(100, 50) d = ct.to_dict() assert "prompt_tokens" in d assert "completion_tokens" in d assert "total_tokens" in d assert "estimated_usd" in d def test_total_tokens_sum(self): from telemetry.metrics import CostTracker ct = CostTracker() ct.add_llm_tokens(500, 300) ct.add_embedding_tokens(200) assert ct.total_tokens == 1000 class TestAgentMetrics: def test_time_phase_context_manager(self): from telemetry.metrics import AgentMetrics m = AgentMetrics() with m.time_phase("localisation"): time.sleep(0.01) # Should not raise def test_record_resolution_no_error(self): from telemetry.metrics import AgentMetrics m = AgentMetrics() m.record_resolution(resolved=True, attempts=2) m.record_resolution(resolved=False, attempts=3) def test_record_cache_hit_no_error(self): from telemetry.metrics import AgentMetrics m = AgentMetrics() m.record_cache_hit("ast", hit=True) m.record_cache_hit("embedding", hit=False) def test_prometheus_output_returns_bytes(self): from telemetry.metrics import AgentMetrics m = AgentMetrics() content, content_type = m.prometheus_output() assert isinstance(content, bytes) assert isinstance(content_type, str) def test_task_started_finished(self): from telemetry.metrics import AgentMetrics m = AgentMetrics() m.task_started() m.task_finished() # Should not raise class TestSlidingWindowRateLimiter: def test_allows_within_limit(self): from telemetry.rate_limiter import SlidingWindowRateLimiter lim = SlidingWindowRateLimiter(requests=5, window_seconds=60) for _ in range(5): assert lim.is_allowed("user_1") def test_blocks_over_limit(self): from telemetry.rate_limiter import SlidingWindowRateLimiter lim = SlidingWindowRateLimiter(requests=3, window_seconds=60) for _ in range(3): lim.is_allowed("user_x") assert not lim.is_allowed("user_x") def test_different_keys_independent(self): from telemetry.rate_limiter import SlidingWindowRateLimiter lim = SlidingWindowRateLimiter(requests=2, window_seconds=60) assert lim.is_allowed("alice") assert lim.is_allowed("alice") assert not lim.is_allowed("alice") # Bob's quota is independent assert lim.is_allowed("bob") def test_remaining_decreases(self): from telemetry.rate_limiter import SlidingWindowRateLimiter lim = SlidingWindowRateLimiter(requests=10, window_seconds=60) r0 = lim.remaining("u") lim.is_allowed("u") r1 = lim.remaining("u") assert r1 == r0 - 1 def test_reset_clears_bucket(self): from telemetry.rate_limiter import SlidingWindowRateLimiter lim = SlidingWindowRateLimiter(requests=2, window_seconds=60) lim.is_allowed("u"); lim.is_allowed("u") assert not lim.is_allowed("u") lim.reset_for("u") assert lim.is_allowed("u") # back to full quota def test_stats_returns_dict(self): from telemetry.rate_limiter import SlidingWindowRateLimiter lim = SlidingWindowRateLimiter(requests=5, window_seconds=60) stats = lim.stats() assert stats["limit"] == 5 assert stats["window_seconds"] == 60 class TestQueueDepthMonitor: def test_initial_state(self): from telemetry.rate_limiter import QueueDepthMonitor m = QueueDepthMonitor(max_concurrent=3) snap = m.snapshot() assert snap["running"] == 0 assert snap["queued"] == 0 def test_task_accepted_under_capacity(self): from telemetry.rate_limiter import QueueDepthMonitor m = QueueDepthMonitor(max_concurrent=3) assert m.task_queued() is True def test_task_rejected_at_capacity(self): from telemetry.rate_limiter import QueueDepthMonitor m = QueueDepthMonitor(max_concurrent=2) m.task_queued(); m.task_started() m.task_queued(); m.task_started() assert m.is_at_capacity assert m.task_queued() is False def test_task_lifecycle(self): from telemetry.rate_limiter import QueueDepthMonitor m = QueueDepthMonitor(max_concurrent=5) m.task_queued() m.task_started() m.task_finished() snap = m.snapshot() assert snap["completed"] == 1 assert snap["running"] == 0 def test_utilisation_pct(self): from telemetry.rate_limiter import QueueDepthMonitor m = QueueDepthMonitor(max_concurrent=4) m.task_queued(); m.task_started() m.task_queued(); m.task_started() snap = m.snapshot() assert snap["utilisation_pct"] == 50.0 class TestStructuredLogging: def test_get_logger_returns_logger(self): from telemetry.structured_logging import get_logger log = get_logger("test.module") assert log is not None def test_configure_logging_no_error(self): from telemetry.structured_logging import configure_logging configure_logging(level="WARNING", json_output=False) def test_request_context_no_error(self): from telemetry.structured_logging import RequestContext with RequestContext(task_id="test-123", repo="django/django"): pass # Should not raise # ══════════════════════════════════════════════════════════════════════ # Phase 9 — Benchmarking # ══════════════════════════════════════════════════════════════════════ class TestBenchmarkReport: def _make_report(self, n_resolved: int, n_total: int, variant: str = "test") -> object: from experiments.benchmark import BenchmarkReport results = [] for i in range(n_total): results.append({ "instance_id": f"inst-{i}", "repo": "django/django", "resolved": i < n_resolved, "attempts": 1 if i < n_resolved else 3, "failure_category": "success" if i < n_resolved else "assertion_error", "total_tokens": 2000, "patch": "--- a/f.py\n+++b/f.py\n", "variant": variant, }) return BenchmarkReport(variant=variant, results=results) def test_pct_resolved(self): report = self._make_report(30, 100) assert abs(report.pct_resolved - 0.30) < 1e-6 def test_avg_attempts(self): report = self._make_report(50, 100) # 50 at 1 attempt + 50 at 3 attempts = (50 + 150)/100 = 2.0 assert abs(report.avg_attempts - 2.0) < 1e-6 def test_avg_tokens(self): report = self._make_report(10, 50) assert report.avg_tokens == 2000.0 def test_failure_breakdown(self): report = self._make_report(10, 30) bd = report.failure_breakdown assert "success" in bd assert bd["success"] == 10 def test_save_and_load(self, tmp_path): from experiments.benchmark import BenchmarkReport report = self._make_report(20, 100) path = tmp_path / "report.json" report.save(path) assert path.exists() loaded = BenchmarkReport.load(path) assert loaded.n_total == 100 assert loaded.n_resolved == 20 assert abs(loaded.pct_resolved - 0.20) < 1e-6 def test_summary_dict_keys(self): report = self._make_report(10, 50) d = report.summary_dict() assert "variant" in d assert "pct_resolved" in d assert "avg_attempts" in d assert "failure_breakdown" in d def test_empty_report(self): from experiments.benchmark import BenchmarkReport report = BenchmarkReport(variant="empty", results=[]) assert report.n_total == 0 assert report.pct_resolved == 0.0 assert report.avg_attempts == 0.0 class TestAblationTable: def test_build_from_results_dir(self, tmp_path): from experiments.benchmark import BenchmarkReport, build_ablation_table # Create a fake report file report = BenchmarkReport(variant="with_reflection", results=[ { "instance_id": "i1", "repo": "r", "resolved": True, "attempts": 2, "failure_category": "success", "total_tokens": 3000, "patch": "", "variant": "with_reflection" } ]) report.save(tmp_path / "report_with_reflection.json") table = build_ablation_table(tmp_path) assert isinstance(table, str) assert "Devin" in table assert "System Variant" in table def test_table_includes_published_baselines(self, tmp_path): from experiments.benchmark import build_ablation_table # Empty results dir — should still have baselines table = build_ablation_table(tmp_path) assert "Devin" in table or "SWE-agent" in table def test_ablation_md_file_created(self, tmp_path): from experiments.benchmark import build_ablation_table build_ablation_table(tmp_path) assert (tmp_path / "ablation_table.md").exists() def test_ablation_json_file_created(self, tmp_path): from experiments.benchmark import build_ablation_table build_ablation_table(tmp_path) assert (tmp_path / "ablation_table.json").exists() class TestBenchmarkRunner: def _make_runner(self, tmp_path, variant="with_reflection"): from experiments.benchmark import BenchmarkRunner runner = BenchmarkRunner( variant=variant, output_dir=tmp_path, max_instances=5, ) return runner def _make_instances(self, n=3): return [ { "instance_id": f"django__django-{i}", "repo": "django/django", "problem_statement": "Fix the bug in query filtering logic", "base_commit": "abc123", "FAIL_TO_PASS": ["tests/test_query.py::test_filter"], "PASS_TO_PASS": [], } for i in range(n) ] def test_runner_initialisation(self, tmp_path): runner = self._make_runner(tmp_path) assert runner.variant == "with_reflection" assert runner.max_instances == 5 def test_results_path_includes_variant(self, tmp_path): runner = self._make_runner(tmp_path, "baseline_gpt4o") assert "baseline_gpt4o" in str(runner.results_path) def test_error_result_format(self, tmp_path): runner = self._make_runner(tmp_path) instance = {"instance_id": "test-1", "repo": "r"} result = runner._error_result(instance, "boom") assert result["resolved"] is False assert result["failure_category"] == "run_error" assert "boom" in result["error"] def test_summary_dict_completeness(self, tmp_path): from experiments.benchmark import BenchmarkReport results = [ {"instance_id": "i1", "resolved": True, "attempts": 1, "failure_category": "success", "total_tokens": 1000, "patch": "", "repo": "r", "variant": "v"} ] report = BenchmarkReport("v", results) d = report.summary_dict() required_keys = {"variant", "n_total", "n_resolved", "pct_resolved", "avg_attempts", "avg_token_cost", "failure_breakdown"} assert required_keys.issubset(d.keys())