Spaces:
Sleeping
Sleeping
| # MERGED: OpenCode (deep KV physics) + CC (surface coverage) | |
| # All tests hermetic: no GPU, no TCP, no downloaded weights required | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| import httpx | |
| import pytest | |
| import pytest | |
| # Optional dep guard — skip entire module if CC benchmark functions not present | |
| # (OpenCode has demo/benchmark.py but different functions than CC's benchmark) | |
| try: | |
| import demo.benchmark as _bm | |
| _bm._aggregate # AttributeError if CC's functions not present | |
| _bm._build_results | |
| _bm._run_async | |
| _bm._run_one | |
| _bm.cli | |
| except (ImportError, AttributeError): | |
| pytest.skip( | |
| "CC benchmark functions (_aggregate, _build_results, etc.) not in OpenCode's demo/benchmark.py", | |
| allow_module_level=True, | |
| ) | |
| import demo.benchmark as benchmark | |
| from demo.benchmark import ( | |
| _aggregate, | |
| _build_results, | |
| _fetch_hardware, | |
| _preflight_mcp, | |
| _run_async, | |
| _run_one, | |
| cli, | |
| ) | |
| from apohara_context_forge.serving.vllm_client import VLLMClient | |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| # ---- Helpers -------------------------------------------------------------------------- | |
| def _per_agent_payload( | |
| *, completion_tokens: int = 10, prompt_tokens: int = 100 | |
| ) -> dict[str, dict]: | |
| return { | |
| name: { | |
| "tokens": completion_tokens, | |
| "prompt_tokens": prompt_tokens, | |
| "total_tokens": completion_tokens + prompt_tokens, | |
| "ttft_ms": 50.0, | |
| "vram_peak_gb": 4.0, | |
| } | |
| for name in ("retriever", "reranker", "summarizer", "critic", "responder") | |
| } | |
| def _make_vllm_with_handler(handler) -> VLLMClient: | |
| return VLLMClient( | |
| base_url="http://vllm.test/v1", | |
| api_key="EMPTY", | |
| transport=httpx.MockTransport(handler), | |
| ) | |
| def _vllm_chat_handler(prompt_tokens: int): | |
| def handler(request: httpx.Request) -> httpx.Response: | |
| return httpx.Response( | |
| 200, | |
| json={ | |
| "id": "cmpl-1", | |
| "choices": [ | |
| {"index": 0, "message": {"role": "assistant", "content": "ok"}} | |
| ], | |
| "usage": { | |
| "completion_tokens": 5, | |
| "prompt_tokens": prompt_tokens, | |
| "total_tokens": prompt_tokens + 5, | |
| }, | |
| }, | |
| ) | |
| return handler | |
| def _mcp_handler_factory(*, gpu: str = "MI300X", health_status_code: int = 200): | |
| """MCP MockTransport handler covering the routes run_pipeline + benchmark hit.""" | |
| counts: dict[str, int] = {} | |
| def handler(request: httpx.Request) -> httpx.Response: | |
| path = request.url.path | |
| counts[path] = counts.get(path, 0) + 1 | |
| if path == "/health": | |
| if health_status_code != 200: | |
| return httpx.Response(health_status_code, text="degraded") | |
| return httpx.Response(200, json={"status": "ok", "gpu": gpu}) | |
| if path == "/tools/register_context": | |
| return httpx.Response( | |
| 200, | |
| json={ | |
| "agent_id": "x", | |
| "context": "ctx", | |
| "token_count": 1, | |
| "created_at": "2026-05-09T12:00:00+00:00", | |
| "expires_at": "2026-05-09T12:15:00+00:00", | |
| }, | |
| ) | |
| if path == "/tools/get_optimized_context": | |
| return httpx.Response( | |
| 200, | |
| json={ | |
| "strategy": "compress", | |
| "final_context": "OPT", | |
| "shared_prefix": "", | |
| "original_tokens": 10, | |
| "final_tokens": 5, | |
| "tokens_saved": 5, | |
| "rationale": "test", | |
| }, | |
| ) | |
| if path == "/metrics/snapshot": | |
| return httpx.Response( | |
| 200, | |
| json={ | |
| "vram_source": "psutil", | |
| "compressor_model": "xlm-roberta-large", | |
| "vram_used_gb": 2.0, | |
| "vram_total_gb": 8.0, | |
| "ttft_ms": 0.0, | |
| "tokens_processed": 0, | |
| "tokens_saved": 0, | |
| "dedup_rate": 0.0, | |
| "compression_ratio": 0.0, | |
| "degradations": [], | |
| }, | |
| ) | |
| return httpx.Response(404) | |
| handler.counts = counts # type: ignore[attr-defined] | |
| return handler | |
| # ---- _run_one ------------------------------------------------------------------------- | |
| def test_run_one_aggregates_per_agent_dict() -> None: | |
| per_agent = { | |
| "a": {"tokens": 10, "prompt_tokens": 100, "ttft_ms": 40.0, "vram_peak_gb": 3.0}, | |
| "b": {"tokens": 20, "prompt_tokens": 200, "ttft_ms": 60.0, "vram_peak_gb": 5.0}, | |
| } | |
| out = _run_one(per_agent) | |
| assert out == { | |
| "tokens": 30, | |
| "prompt_tokens": 300, | |
| "ttft_ms": 50.0, | |
| "vram_peak_gb": 5.0, | |
| } | |
| def test_run_one_handles_empty_dict() -> None: | |
| assert _run_one({}) == { | |
| "tokens": 0, | |
| "prompt_tokens": 0, | |
| "ttft_ms": 0.0, | |
| "vram_peak_gb": 0.0, | |
| } | |
| # ---- _aggregate ----------------------------------------------------------------------- | |
| def test_aggregate_computes_delta_pct() -> None: | |
| warm_on = [ | |
| {"tokens": 5, "prompt_tokens": 800, "ttft_ms": 50.0, "vram_peak_gb": 4.0}, | |
| {"tokens": 5, "prompt_tokens": 800, "ttft_ms": 50.0, "vram_peak_gb": 4.0}, | |
| ] | |
| warm_off = [ | |
| {"tokens": 5, "prompt_tokens": 1000, "ttft_ms": 60.0, "vram_peak_gb": 5.0}, | |
| {"tokens": 5, "prompt_tokens": 1000, "ttft_ms": 60.0, "vram_peak_gb": 5.0}, | |
| ] | |
| totals = _aggregate(warm_on, warm_off) | |
| assert totals["tokens_with"] == 800 | |
| assert totals["tokens_without"] == 1000 | |
| assert totals["delta_pct"] == 20.0 | |
| assert totals["vram_with"] == 4.0 | |
| assert totals["vram_without"] == 5.0 | |
| assert totals["ttft_with"] == 50.0 | |
| assert totals["ttft_without"] == 60.0 | |
| def test_aggregate_returns_null_delta_when_tokens_without_is_zero() -> None: | |
| warm_on = [ | |
| {"tokens": 5, "prompt_tokens": 0, "ttft_ms": 50.0, "vram_peak_gb": 4.0}, | |
| ] | |
| warm_off = [ | |
| {"tokens": 5, "prompt_tokens": 0, "ttft_ms": 60.0, "vram_peak_gb": 5.0}, | |
| ] | |
| totals = _aggregate(warm_on, warm_off) | |
| assert totals["delta_pct"] is None | |
| # JSON-serializable as null | |
| assert json.loads(json.dumps(totals))["delta_pct"] is None | |
| # ---- _build_results ------------------------------------------------------------------- | |
| def test_build_results_schema() -> None: | |
| cold = [{"tokens": 100, "prompt_tokens": 200, "ttft_ms": 50.0, "vram_peak_gb": 4.0}] | |
| warm = [ | |
| {"tokens": 80, "prompt_tokens": 150, "ttft_ms": 40.0, "vram_peak_gb": 4.5}, | |
| {"tokens": 70, "prompt_tokens": 140, "ttft_ms": 35.0, "vram_peak_gb": 4.5}, | |
| ] | |
| off = [ | |
| {"tokens": 100, "prompt_tokens": 300, "ttft_ms": 60.0, "vram_peak_gb": 5.0}, | |
| {"tokens": 95, "prompt_tokens": 290, "ttft_ms": 55.0, "vram_peak_gb": 5.0}, | |
| {"tokens": 90, "prompt_tokens": 280, "ttft_ms": 55.0, "vram_peak_gb": 5.0}, | |
| ] | |
| config = { | |
| "runs": 3, | |
| "warmup": 1, | |
| "vllm_base_url": "http://x", | |
| "model": "m", | |
| "started_at": "t1", | |
| "completed_at": "t2", | |
| } | |
| result = _build_results("MI300X", config, cold, warm, off) | |
| expected_top = { | |
| "hardware", | |
| "config", | |
| "cold", | |
| "warm", | |
| "off", | |
| "cold_cache_baseline", | |
| "totals", | |
| } | |
| assert set(result.keys()) == expected_top | |
| expected_totals = { | |
| "tokens_with", | |
| "tokens_without", | |
| "vram_with", | |
| "vram_without", | |
| "ttft_with", | |
| "ttft_without", | |
| "delta_pct", | |
| } | |
| assert set(result["totals"].keys()) == expected_totals | |
| expected_baseline = {"tokens", "prompt_tokens", "ttft_ms", "vram_peak_gb"} | |
| assert set(result["cold_cache_baseline"].keys()) == expected_baseline | |
| # cold_cache_baseline is the per-run reduction of cold[0] (MEM007). | |
| assert result["cold_cache_baseline"] == cold[0] | |
| # ---- _fetch_hardware ------------------------------------------------------------------ | |
| async def test_hardware_field_from_health_response() -> None: | |
| handler = _mcp_handler_factory(gpu="MI300X") | |
| mcp = httpx.AsyncClient( | |
| base_url="http://mcp.test", transport=httpx.MockTransport(handler) | |
| ) | |
| try: | |
| gpu = await _fetch_hardware(mcp) | |
| finally: | |
| await mcp.aclose() | |
| assert gpu == "MI300X" | |
| async def test_hardware_unknown_on_health_failure() -> None: | |
| handler = _mcp_handler_factory(health_status_code=503) | |
| mcp = httpx.AsyncClient( | |
| base_url="http://mcp.test", transport=httpx.MockTransport(handler) | |
| ) | |
| try: | |
| gpu = await _fetch_hardware(mcp) | |
| finally: | |
| await mcp.aclose() | |
| assert gpu == "unknown" | |
| # ---- _run_async: schema, run-counts, warmup discard, prompt-token math ---------------- | |
| async def test_runs_three_on_three_off(monkeypatch, tmp_path) -> None: | |
| invocations: list[bool] = [] | |
| async def fake_run_pipeline( | |
| *, | |
| contextforge_enabled, | |
| mcp_client, | |
| vllm_client, | |
| shared_base, | |
| query, | |
| ): | |
| invocations.append(contextforge_enabled) | |
| return _per_agent_payload() | |
| monkeypatch.setattr(benchmark, "run_pipeline", fake_run_pipeline) | |
| handler = _mcp_handler_factory() | |
| mcp = httpx.AsyncClient( | |
| base_url="http://mcp.test", transport=httpx.MockTransport(handler) | |
| ) | |
| vllm = _make_vllm_with_handler(_vllm_chat_handler(prompt_tokens=100)) | |
| output_path = tmp_path / "out.json" | |
| try: | |
| rc = await _run_async( | |
| runs=3, | |
| warmup=1, | |
| output_path=output_path, | |
| query="q", | |
| shared_base="s", | |
| mcp_client=mcp, | |
| vllm_client=vllm, | |
| ) | |
| finally: | |
| await mcp.aclose() | |
| await vllm.aclose() | |
| assert rc == 0 | |
| assert len(invocations) == 6 | |
| # ON first 3, then OFF next 3 | |
| assert invocations[:3] == [True, True, True] | |
| assert invocations[3:] == [False, False, False] | |
| async def test_warmup_discarded(monkeypatch, tmp_path) -> None: | |
| async def fake_run_pipeline(**_kwargs): | |
| return _per_agent_payload() | |
| monkeypatch.setattr(benchmark, "run_pipeline", fake_run_pipeline) | |
| handler = _mcp_handler_factory() | |
| mcp = httpx.AsyncClient( | |
| base_url="http://mcp.test", transport=httpx.MockTransport(handler) | |
| ) | |
| vllm = _make_vllm_with_handler(_vllm_chat_handler(prompt_tokens=100)) | |
| output_path = tmp_path / "out.json" | |
| try: | |
| rc = await _run_async( | |
| runs=3, | |
| warmup=1, | |
| output_path=output_path, | |
| query="q", | |
| shared_base="s", | |
| mcp_client=mcp, | |
| vllm_client=vllm, | |
| ) | |
| finally: | |
| await mcp.aclose() | |
| await vllm.aclose() | |
| assert rc == 0 | |
| payload = json.loads(output_path.read_text()) | |
| assert len(payload["cold"]) == 1 | |
| assert len(payload["warm"]) == 2 | |
| assert len(payload["off"]) == 3 | |
| async def test_warm_tokens_with_lt_without(monkeypatch, tmp_path) -> None: | |
| """ON returns lower prompt_tokens than OFF → totals.tokens_with < tokens_without.""" | |
| async def fake_run_pipeline(*, contextforge_enabled, **_kwargs): | |
| prompt = 200 if contextforge_enabled else 400 | |
| return _per_agent_payload(prompt_tokens=prompt) | |
| monkeypatch.setattr(benchmark, "run_pipeline", fake_run_pipeline) | |
| handler = _mcp_handler_factory() | |
| mcp = httpx.AsyncClient( | |
| base_url="http://mcp.test", transport=httpx.MockTransport(handler) | |
| ) | |
| vllm = _make_vllm_with_handler(_vllm_chat_handler(prompt_tokens=100)) | |
| output_path = tmp_path / "out.json" | |
| try: | |
| rc = await _run_async( | |
| runs=3, | |
| warmup=1, | |
| output_path=output_path, | |
| query="q", | |
| shared_base="s", | |
| mcp_client=mcp, | |
| vllm_client=vllm, | |
| ) | |
| finally: | |
| await mcp.aclose() | |
| await vllm.aclose() | |
| assert rc == 0 | |
| payload = json.loads(output_path.read_text()) | |
| totals = payload["totals"] | |
| assert totals["tokens_with"] < totals["tokens_without"] | |
| assert totals["delta_pct"] > 0 | |
| async def test_run_async_writes_full_schema(monkeypatch, tmp_path) -> None: | |
| """Every required top-level key + totals + cold_cache_baseline + hardware.""" | |
| async def fake_run_pipeline(**_kwargs): | |
| return _per_agent_payload() | |
| monkeypatch.setattr(benchmark, "run_pipeline", fake_run_pipeline) | |
| handler = _mcp_handler_factory(gpu="MI300X") | |
| mcp = httpx.AsyncClient( | |
| base_url="http://mcp.test", transport=httpx.MockTransport(handler) | |
| ) | |
| vllm = _make_vllm_with_handler(_vllm_chat_handler(prompt_tokens=100)) | |
| output_path = tmp_path / "out.json" | |
| try: | |
| rc = await _run_async( | |
| runs=3, | |
| warmup=1, | |
| output_path=output_path, | |
| query="q", | |
| shared_base="s", | |
| mcp_client=mcp, | |
| vllm_client=vllm, | |
| ) | |
| finally: | |
| await mcp.aclose() | |
| await vllm.aclose() | |
| assert rc == 0 | |
| payload = json.loads(output_path.read_text()) | |
| assert payload["hardware"] == "MI300X" | |
| assert {"runs", "warmup", "vllm_base_url", "model", "started_at", "completed_at"} <= set( | |
| payload["config"].keys() | |
| ) | |
| assert payload["config"]["runs"] == 3 | |
| assert payload["config"]["warmup"] == 1 | |
| async def test_run_async_caller_owned_clients_survive(monkeypatch, tmp_path) -> None: | |
| """MEM048: caller-injected mcp/vllm clients must NOT be closed by _run_async.""" | |
| async def fake_run_pipeline(**_kwargs): | |
| return _per_agent_payload() | |
| monkeypatch.setattr(benchmark, "run_pipeline", fake_run_pipeline) | |
| handler = _mcp_handler_factory() | |
| mcp = httpx.AsyncClient( | |
| base_url="http://mcp.test", transport=httpx.MockTransport(handler) | |
| ) | |
| vllm = _make_vllm_with_handler(_vllm_chat_handler(prompt_tokens=100)) | |
| output_path = tmp_path / "out.json" | |
| try: | |
| await _run_async( | |
| runs=2, | |
| warmup=1, | |
| output_path=output_path, | |
| query="q", | |
| shared_base="s", | |
| mcp_client=mcp, | |
| vllm_client=vllm, | |
| ) | |
| assert not mcp.is_closed | |
| # VLLMClient probe: any HTTP call works iff client open. | |
| resp = await vllm._client.get("/models") | |
| assert resp.status_code in (200, 404) | |
| finally: | |
| await mcp.aclose() | |
| await vllm.aclose() | |
| async def test_run_async_does_not_write_partial_json_on_failure( | |
| monkeypatch, tmp_path | |
| ) -> None: | |
| """Mid-run exception → file write never happens; output file does NOT appear.""" | |
| call_count = {"n": 0} | |
| async def fake_run_pipeline(**_kwargs): | |
| call_count["n"] += 1 | |
| if call_count["n"] >= 3: | |
| raise RuntimeError("simulated mid-run failure") | |
| return _per_agent_payload() | |
| monkeypatch.setattr(benchmark, "run_pipeline", fake_run_pipeline) | |
| handler = _mcp_handler_factory() | |
| mcp = httpx.AsyncClient( | |
| base_url="http://mcp.test", transport=httpx.MockTransport(handler) | |
| ) | |
| vllm = _make_vllm_with_handler(_vllm_chat_handler(prompt_tokens=100)) | |
| output_path = tmp_path / "should_not_exist.json" | |
| try: | |
| with pytest.raises(RuntimeError): | |
| await _run_async( | |
| runs=3, | |
| warmup=1, | |
| output_path=output_path, | |
| query="q", | |
| shared_base="s", | |
| mcp_client=mcp, | |
| vllm_client=vllm, | |
| ) | |
| finally: | |
| await mcp.aclose() | |
| await vllm.aclose() | |
| assert not output_path.exists() | |
| # ---- Structured logging --------------------------------------------------------------- | |
| async def test_structured_log_per_run(monkeypatch, tmp_path, caplog) -> None: | |
| async def fake_run_pipeline(**_kwargs): | |
| return _per_agent_payload() | |
| monkeypatch.setattr(benchmark, "run_pipeline", fake_run_pipeline) | |
| handler = _mcp_handler_factory() | |
| mcp = httpx.AsyncClient( | |
| base_url="http://mcp.test", transport=httpx.MockTransport(handler) | |
| ) | |
| vllm = _make_vllm_with_handler(_vllm_chat_handler(prompt_tokens=100)) | |
| output_path = tmp_path / "out.json" | |
| import logging | |
| try: | |
| with caplog.at_level(logging.INFO, logger="demo.benchmark"): | |
| await _run_async( | |
| runs=3, | |
| warmup=1, | |
| output_path=output_path, | |
| query="q", | |
| shared_base="s", | |
| mcp_client=mcp, | |
| vllm_client=vllm, | |
| ) | |
| finally: | |
| await mcp.aclose() | |
| await vllm.aclose() | |
| records = [r for r in caplog.records if r.name == "demo.benchmark"] | |
| # 3 ON + 3 OFF = 6 "benchmark run done" records | |
| assert len(records) == 6 | |
| modes = [getattr(r, "mode", None) for r in records] | |
| assert modes == ["cold", "warm", "warm", "off", "off", "off"] | |
| for r in records: | |
| assert getattr(r, "component", None) == "demo.benchmark" | |
| assert isinstance(getattr(r, "run_index", None), int) | |
| # ---- D009 verbatim fail-fast ---------------------------------------------------------- | |
| def test_d009_vllm_unreachable_smoke() -> None: | |
| env = {**os.environ, "VLLM_BASE_URL": "http://127.0.0.1:1"} | |
| result = subprocess.run( | |
| [sys.executable, "-m", "demo.benchmark"], | |
| env=env, | |
| cwd=PROJECT_ROOT, | |
| capture_output=True, | |
| timeout=30, | |
| ) | |
| assert result.returncode == 1 | |
| expected = ( | |
| b"vLLM unreachable at http://127.0.0.1:1. " | |
| b"Start the vLLM server first or set VLLM_BASE_URL." | |
| ) | |
| assert expected in result.stderr | |
| def test_d009_mcp_unreachable_smoke(monkeypatch, capsys, tmp_path) -> None: | |
| """In-process gate per the slice plan (separate fake vLLM is impractical for subprocess).""" | |
| async def fake_vllm_ok(client): | |
| return True | |
| async def fake_mcp_fail(client): | |
| return False | |
| monkeypatch.setattr(benchmark, "_preflight_vllm", fake_vllm_ok) | |
| monkeypatch.setattr(benchmark, "_preflight_mcp", fake_mcp_fail) | |
| monkeypatch.setattr( | |
| sys, | |
| "argv", | |
| ["demo.benchmark", "--output", str(tmp_path / "out.json")], | |
| ) | |
| rc = cli() | |
| captured = capsys.readouterr() | |
| expected = ( | |
| "ContextForge MCP unreachable at http://127.0.0.1:8001. " | |
| "Start the MCP server first or set CONTEXTFORGE_HOST/CONTEXTFORGE_PORT." | |
| ) | |
| assert rc == 1 | |
| assert expected in captured.err | |
| # ---- Preflight unit ------------------------------------------------------------------ | |
| async def test_preflight_mcp_returns_true_on_ok_health() -> None: | |
| handler = _mcp_handler_factory() | |
| mcp = httpx.AsyncClient( | |
| base_url="http://mcp.test", transport=httpx.MockTransport(handler) | |
| ) | |
| try: | |
| ok = await _preflight_mcp(mcp) | |
| finally: | |
| await mcp.aclose() | |
| assert ok is True | |
| async def test_preflight_mcp_returns_false_on_non_2xx() -> None: | |
| handler = _mcp_handler_factory(health_status_code=503) | |
| mcp = httpx.AsyncClient( | |
| base_url="http://mcp.test", transport=httpx.MockTransport(handler) | |
| ) | |
| try: | |
| ok = await _preflight_mcp(mcp) | |
| finally: | |
| await mcp.aclose() | |
| assert ok is False | |
| # ---- Preflight failure logging -------------------------------------------------------- | |
| def test_vllm_preflight_failure_logs_warning_before_stderr( | |
| monkeypatch, capsys, caplog, tmp_path | |
| ) -> None: | |
| """logger.warning fires BEFORE the verbatim stderr print (MEM053 extension).""" | |
| async def fake_vllm_fail(client): | |
| return False | |
| monkeypatch.setattr(benchmark, "_preflight_vllm", fake_vllm_fail) | |
| monkeypatch.setattr( | |
| sys, | |
| "argv", | |
| ["demo.benchmark", "--output", str(tmp_path / "out.json")], | |
| ) | |
| import logging | |
| with caplog.at_level(logging.WARNING, logger="demo.benchmark"): | |
| rc = cli() | |
| captured = capsys.readouterr() | |
| assert rc == 1 | |
| warn_records = [r for r in caplog.records if r.name == "demo.benchmark"] | |
| assert any( | |
| getattr(r, "component", None) == "demo.benchmark" | |
| and getattr(r, "base_url", None) is not None | |
| for r in warn_records | |
| ) | |
| assert "vLLM unreachable at" in captured.err | |