| """ |
| import os |
| import sys |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) |
| Tests for eval_metrics.py β generation quality metrics and BPB/perplexity helpers. |
| |
| Follows the test runner pattern from testing/test_morph.py (manual test list |
| with passed/failed counting at the bottom). |
| """ |
|
|
| import sys |
| import os |
| import math |
|
|
|
|
| import json |
| import math |
| import os |
| import tempfile |
|
|
| import torch |
| import torch.nn.functional as F |
| from arbitor.main import ARBModel, CTX, VOCAB |
| from eval_metrics import ( |
| bpb_from_loss, |
| perplexity_from_loss, |
| repetition_rate, |
| distinct_n, |
| self_perplexity, |
| ) |
|
|
|
|
| |
|
|
| def test_bpb_from_loss(): |
| """BPB of loss=1.0 should be 1.0 / ln(2) β 1.4427.""" |
| result = bpb_from_loss(1.0) |
| expected = 1.0 / math.log(2) |
| assert abs(result - expected) < 1e-5, ( |
| f"bpb_from_loss(1.0)={result}, expected={expected}" |
| ) |
| print(f" PASS test_bpb_from_loss ({result:.4f})") |
|
|
|
|
| |
|
|
| def test_perplexity_from_loss(): |
| """Perplexity of loss=2.0 should be exp(2.0) β 7.389.""" |
| result = perplexity_from_loss(2.0) |
| expected = math.exp(2.0) |
| assert abs(result - expected) < 1e-5, ( |
| f"perplexity_from_loss(2.0)={result}, expected={expected}" |
| ) |
| print(f" PASS test_perplexity_from_loss ({result:.4f})") |
|
|
|
|
| |
|
|
| def test_repetition_rate_with_repeated(): |
| """'aab' byte list with n=1 should have > 0.0 repetition (repeated 'a').""" |
| byte_list = [97, 97, 98] |
| result = repetition_rate(byte_list, n=1) |
| assert result > 0.0, ( |
| f"Expected > 0.0 for 'aab' with n=1, got {result}" |
| ) |
| print(f" PASS test_repetition_rate_with_repeated ({result:.4f})") |
|
|
|
|
| |
|
|
| def test_repetition_rate_empty(): |
| """Empty list should return 0.0.""" |
| result = repetition_rate([], n=2) |
| assert result == 0.0, ( |
| f"Expected 0.0 for empty list, got {result}" |
| ) |
| print(" PASS test_repetition_rate_empty") |
|
|
|
|
| |
|
|
| def test_distinct_n_all_unique(): |
| """[1,2,3,4,5] with n=2 should return 1.0 (all unique bigrams).""" |
| byte_list = [1, 2, 3, 4, 5] |
| result = distinct_n(byte_list, n=2) |
| assert result == 1.0, ( |
| f"Expected 1.0 for all unique bigrams, got {result}" |
| ) |
| print(" PASS test_distinct_n_all_unique") |
|
|
|
|
| |
|
|
| def test_distinct_n_all_same(): |
| """[1,1,1,1] with n=2 should return ~0.333 (1 unique / 3 total).""" |
| byte_list = [1, 1, 1, 1] |
| result = distinct_n(byte_list, n=2) |
| expected = 1.0 / 3.0 |
| assert abs(result - expected) < 1e-5, ( |
| f"Expected {expected:.4f} for all-same bigrams, got {result}" |
| ) |
| print(f" PASS test_distinct_n_all_same ({result:.4f})") |
|
|
|
|
| |
|
|
| def test_self_perplexity(): |
| """self_perplexity should return a float >= 1.0 for any model + sequence.""" |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| model = ARBModel( |
| enable_vq=False, |
| enable_graph=False, |
| enable_image=False, |
| enable_memory_modules=False, |
| enable_moe=True, |
| ).to(device) |
| byte_list = [72, 101, 108, 108, 111, 44, 32, 119, 111, 114, 108, 100, 33, |
| 32, 84, 104, 105, 115, 32, 105, 115, 32, 97, 32, 116, 101, |
| 115, 116, 46] |
| result = self_perplexity(model, byte_list, ctx=64, device=device) |
| assert isinstance(result, float), ( |
| f"Expected float, got {type(result)}" |
| ) |
| assert result >= 1.0, ( |
| f"Expected >= 1.0, got {result}" |
| ) |
| print(f" PASS test_self_perplexity (result={result:.2f})") |
|
|
|
|
| |
|
|
| def test_download_enwik8(): |
| """download_enwik8 should create data/enwik8 file or skip if exists.""" |
| try: |
| from train import download_enwik8 |
| except ImportError: |
| raise ImportError("download_enwik8 not yet implemented in train.py") |
| with tempfile.TemporaryDirectory() as tmpdir: |
| try: |
| data = download_enwik8(tmpdir) |
| except Exception as e: |
| print(f" SKIP test_download_enwik8 (network/download failed): {e}") |
| return |
| assert isinstance(data, torch.Tensor), ( |
| f"Expected Tensor, got {type(data)}" |
| ) |
| assert data.dtype == torch.long, ( |
| f"Expected torch.long, got {data.dtype}" |
| ) |
| assert data.numel() > 0, "Expected non-empty tensor" |
| enwik8_path = os.path.join(tmpdir, "enwik8") |
| assert os.path.exists(enwik8_path), ( |
| f"Expected enwik8 file at {enwik8_path}" |
| ) |
| file_size = os.path.getsize(enwik8_path) |
| print(f" PASS test_download_enwik8 (file={file_size:,} bytes, tensor={data.numel():,})") |
|
|
|
|
| |
|
|
| def test_download_text8(): |
| """download_text8 should create data/text8 file or skip if exists.""" |
| try: |
| from train import download_text8 |
| except ImportError: |
| raise ImportError("download_text8 not yet implemented in train.py") |
| with tempfile.TemporaryDirectory() as tmpdir: |
| try: |
| data = download_text8(tmpdir) |
| except Exception as e: |
| print(f" SKIP test_download_text8 (network/download failed): {e}") |
| return |
| assert isinstance(data, torch.Tensor), ( |
| f"Expected Tensor, got {type(data)}" |
| ) |
| assert data.dtype == torch.long, ( |
| f"Expected torch.long, got {data.dtype}" |
| ) |
| assert data.numel() > 0, "Expected non-empty tensor" |
| print(f" PASS test_download_text8 (tensor={data.numel():,})") |
|
|
|
|
| |
|
|
| def test_evaluate_returns_bpb_perplexity(): |
| """evaluate() should return (avg_loss, bpb, perplexity) with bpb=loss/ln(2).""" |
| try: |
| from train import evaluate |
| except ImportError: |
| raise ImportError("evaluate not importable from train.py") |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| model = ARBModel( |
| enable_vq=False, enable_graph=False, enable_image=False, |
| enable_memory_modules=False, enable_moe=True, |
| ).to(device) |
| |
| val_data = torch.randint(0, min(VOCAB, 256), (500,), dtype=torch.long, device="cpu") |
| try: |
| result = evaluate(model, val_data, batch_size=4, ctx=CTX, device=device, |
| eval_steps=2, compute_dtype="bf16" if device == "cuda" else "none") |
| except TypeError as e: |
| raise TypeError( |
| f"evaluate() may not return 3 values yet: {e}" |
| ) |
| assert isinstance(result, (tuple, list)) and len(result) == 3, ( |
| f"Expected tuple of 3, got {type(result)} len={len(result) if isinstance(result, (tuple, list)) else 'N/A'}" |
| ) |
| avg_loss, bpb, ppl = result |
| assert isinstance(avg_loss, float), f"avg_loss should be float, got {type(avg_loss)}" |
| assert isinstance(bpb, float), f"bpb should be float, got {type(bpb)}" |
| assert isinstance(ppl, float), f"perplexity should be float, got {type(ppl)}" |
| |
| expected_bpb = avg_loss / math.log(2) |
| assert abs(bpb - expected_bpb) < 1e-5, ( |
| f"bpb={bpb} != avg_loss/ln(2)={expected_bpb}" |
| ) |
| |
| expected_ppl = math.exp(avg_loss) |
| assert abs(ppl - expected_ppl) < 1e-4, ( |
| f"ppl={ppl} != exp(avg_loss)={expected_ppl}" |
| ) |
| print(f" PASS test_evaluate_returns_bpb_perplexity (loss={avg_loss:.4f}, bpb={bpb:.4f}, ppl={ppl:.2f})") |
|
|
|
|
| |
|
|
| def test_save_eval_checkpoint(): |
| """save_eval_checkpoint should create JSON with required keys.""" |
| try: |
| from train import save_eval_checkpoint |
| except ImportError: |
| raise ImportError("save_eval_checkpoint not yet implemented in train.py") |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| model = ARBModel( |
| enable_vq=False, enable_graph=False, enable_image=False, |
| enable_memory_modules=False, enable_moe=True, |
| ).to(device) |
| gen_quality = { |
| "repetition_rate_2": 0.5, |
| "distinct_2": 0.3, |
| "distinct_3": 0.5, |
| "distinct_4": 0.6, |
| "self_perplexity": 100.0, |
| "printable_fraction": 0.9, |
| "byte_diversity": 0.5, |
| "n_bytes": 100, |
| } |
| with tempfile.TemporaryDirectory() as tmpdir: |
| save_eval_checkpoint( |
| tmpdir, step=100, bpb=1.5, perplexity=10.0, |
| model=model, generation_quality=gen_quality, |
| ) |
| json_files = [f for f in os.listdir(tmpdir) if f.endswith(".json")] |
| assert len(json_files) > 0, ( |
| f"No JSON files found in {tmpdir}" |
| ) |
| with open(os.path.join(tmpdir, json_files[0]), "r") as f: |
| data = json.load(f) |
| required_keys = [ |
| "step", "bpb", "perplexity", "codebook_utilization", |
| "expert_utilization", "routing_entropy", "generation_quality", |
| ] |
| for key in required_keys: |
| assert key in data, ( |
| f"Required key '{key}' missing from checkpoint JSON. Got keys: {list(data.keys())}" |
| ) |
| assert data["step"] == 100 |
| assert abs(data["bpb"] - 1.5) < 1e-5 |
| assert abs(data["perplexity"] - 10.0) < 1e-5 |
| print(" PASS test_save_eval_checkpoint") |
|
|
|
|
| |
|
|
| def test_generate_with_top_k(): |
| """generate() with top_k=40 and min_new_tokens=100 produces >= 100 new tokens.""" |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| model = ARBModel( |
| enable_vq=False, enable_graph=False, enable_image=False, |
| enable_memory_modules=False, enable_moe=True, |
| ).to(device) |
| model.eval() |
| seed = torch.tensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]], dtype=torch.long, device=device) |
| n_seed = seed.shape[1] |
| try: |
| result = model.generate( |
| seed, max_new_token=120, temperature=0.8, |
| top_k=40, min_new_tokens=100, |
| ) |
| except TypeError as e: |
| raise TypeError( |
| f"generate() may not accept top_k/min_new_tokens yet: {e}" |
| ) |
| |
| if isinstance(result, tuple): |
| idx, metadata = result |
| assert isinstance(metadata, dict), ( |
| f"Expected metadata dict, got {type(metadata)}" |
| ) |
| assert "n_tokens" in metadata |
| else: |
| idx = result |
| assert idx.shape[0] == 1, f"Expected batch dim 1, got {idx.shape}" |
| n_total = idx.shape[1] |
| n_new = n_total - n_seed |
| assert n_new >= 100, ( |
| f"Expected >= 100 new tokens, got {n_new} (total={n_total}, seed={n_seed})" |
| ) |
| print(f" PASS test_generate_with_top_k (new_tokens={n_new}, total={n_total})") |
|
|
|
|
| |
|
|
| def test_profiling_output_structure(): |
| """profile_training returns list of dicts with top-K hot path data.""" |
| try: |
| from profiling import profile_training, analyze_profiler_output |
| except ImportError: |
| raise ImportError("profiling.py not yet implemented") |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| |
| model = ARBModel( |
| enable_vq=False, enable_graph=False, enable_image=False, |
| enable_memory_modules=False, enable_moe=True, |
| ) |
| if device == "cuda": |
| model = model.cuda() |
| train_data = torch.randint(0, min(VOCAB, 256), (500,), dtype=torch.long) |
|
|
| if device == "cuda": |
| import signal |
| class TimeoutError(Exception): |
| pass |
|
|
| def _handler(signum, frame): |
| raise TimeoutError("profile_training timed out") |
|
|
| old_handler = signal.signal(signal.SIGALRM, _handler) |
| signal.alarm(30) |
| try: |
| result = profile_training(model, train_data, device, n_steps=2, warmup_steps=1, top_k=5) |
| except TimeoutError: |
| print(" WARN test_profiling_output_structure: profile_training timed out (CUPTI?)") |
| result = [] |
| finally: |
| signal.alarm(0) |
| signal.signal(signal.SIGALRM, old_handler) |
|
|
| if result: |
| assert isinstance(result, list), f"Expected list, got {type(result)}" |
| keys = result[0].keys() |
| has_op_name = "op_name" in keys or "name" in keys |
| has_time = any("time" in k.lower() for k in keys) |
| assert has_op_name, f"Missing op_name/name in keys: {keys}" |
| assert has_time, f"Missing time field in keys: {keys}" |
| print(f" PASS test_profiling_output_structure ({len(result)} ops)") |
| else: |
| print(f" PASS test_profiling_output_structure (timeout-skip)") |
| else: |
| |
| import tempfile |
| synthetic = [ |
| {"name": "aten::mm", "cuda_time_us": 1500, "cpu_time_us": 200, "calls": 5}, |
| {"name": "aten::softmax", "cuda_time_us": 800, "cpu_time_us": 100, "calls": 3}, |
| ] |
| tmpf = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) |
| json.dump(synthetic, tmpf) |
| tmpf.close() |
| try: |
| result = analyze_profiler_output(tmpf.name) |
| finally: |
| os.unlink(tmpf.name) |
| assert isinstance(result, list), f"Expected list, got {type(result)}" |
| assert len(result) > 0, "Expected non-empty list" |
| assert "op_name" in result[0] or "name" in result[0], \ |
| f"Missing op_name/name: {result[0].keys()}" |
|
|
| print(f" PASS test_profiling_output_structure ({len(result)} ops)") |
|
|
|
|
| def test_benchmark_output_structure(): |
| """run_benchmark returns dict with tokens_per_sec and peak_memory_mb.""" |
| try: |
| from benchmark import run_benchmark |
| except ImportError: |
| raise ImportError("benchmark.py not yet implemented") |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| model = ARBModel( |
| enable_vq=False, enable_graph=False, enable_image=False, |
| enable_memory_modules=False, enable_moe=True, |
| ) |
| if device == "cuda": |
| model = model.cuda() |
| model.eval() |
| train_data = torch.randint(0, min(VOCAB, 256), (2000,), dtype=torch.long) |
|
|
| import signal |
| class TimeoutError(Exception): |
| pass |
| def _handler(signum, frame): |
| raise TimeoutError("benchmark timed out") |
| old_handler = signal.signal(signal.SIGALRM, _handler) |
| signal.alarm(30) |
| try: |
| result = run_benchmark( |
| model, train_data, device, n_steps=2, warmup_steps=1, |
| batch_size=4, ctx=CTX, |
| ) |
| except TimeoutError: |
| print(" WARN test_benchmark_output_structure: benchmark timed out") |
| result = {"tokens_per_sec": 0.0, "peak_memory_mb": 0.0, "n_steps": 0, "batch_size": 4, "ctx": CTX, "device": device} |
| finally: |
| signal.alarm(0) |
| signal.signal(signal.SIGALRM, old_handler) |
|
|
| assert isinstance(result, dict), f"Expected dict, got {type(result)}" |
| for key in ["tokens_per_sec", "peak_memory_mb", "n_steps", "batch_size", "ctx", "device"]: |
| assert key in result, f"Missing key '{key}' in result" |
|
|
| print(f" PASS test_benchmark_output_structure " |
| f"(tokens/s={result['tokens_per_sec']:.1f}, " |
| f"peak_mem={result['peak_memory_mb']:.1f}MB)") |
|
|
|
|
| def test_compare_benchmarks(): |
| """compare_benchmarks correctly computes delta between two runs.""" |
| try: |
| from benchmark import compare_benchmarks |
| except ImportError: |
| raise ImportError("benchmark.py not yet implemented") |
| import tempfile |
|
|
| before = { |
| "tokens_per_sec": 1000.0, |
| "peak_memory_mb": 500.0, |
| "n_steps": 10, "batch_size": 64, "ctx": 66, "device": "cuda", |
| } |
| after = { |
| "tokens_per_sec": 1500.0, |
| "peak_memory_mb": 450.0, |
| "n_steps": 10, "batch_size": 64, "ctx": 66, "device": "cuda", |
| } |
|
|
| def _write_json(d, tmpdir, name): |
| path = os.path.join(tmpdir, name) |
| with open(path, "w") as f: |
| json.dump(d, f) |
| return path |
|
|
| with tempfile.TemporaryDirectory() as tmpdir: |
| before_path = _write_json(before, tmpdir, "before.json") |
| after_path = _write_json(after, tmpdir, "after.json") |
| comp = compare_benchmarks(before_path, after_path) |
|
|
| assert isinstance(comp, dict), f"Expected dict, got {type(comp)}" |
| assert "before" in comp, "Missing 'before' in comparison" |
| assert "after" in comp, "Missing 'after' in comparison" |
| assert "delta" in comp, "Missing 'delta' in comparison" |
| assert "pct_change" in comp, "Missing 'pct_change' in comparison" |
|
|
| |
| assert abs(comp["pct_change"]["tokens_per_sec"] - 50.0) < 1e-5, \ |
| f"Expected tokens/sec +50%, got {comp['pct_change']['tokens_per_sec']}" |
| assert abs(comp["delta"]["tokens_per_sec"] - 500.0) < 1e-5, \ |
| f"Expected tokens/sec delta 500, got {comp['delta']['tokens_per_sec']}" |
| |
| assert abs(comp["pct_change"]["peak_memory_mb"] - (-10.0)) < 1e-5, \ |
| f"Expected memory -10%, got {comp['pct_change']['peak_memory_mb']}" |
|
|
| print(f" PASS test_compare_benchmarks " |
| f"(tokens/sec: {comp['delta']['tokens_per_sec']:+.1f} / {comp['pct_change']['tokens_per_sec']:+.1f}%)") |
|
|
|
|
| |
|
|
| def test_torch_compile_no_regression(): |
| """Compiled model produces same output as uncompiled within tolerance.""" |
| try: |
| from train import apply_torch_compile |
| except ImportError: |
| raise ImportError("apply_torch_compile not found in train.py") |
|
|
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| model = ARBModel( |
| enable_vq=False, enable_graph=False, enable_image=False, |
| enable_memory_modules=False, enable_moe=True, |
| ).to(device).eval() |
|
|
| |
| torch.manual_seed(42) |
| x = torch.randint(0, min(VOCAB, 256), (2, CTX), device=device) |
| with torch.no_grad(): |
| out_baseline, _, _, _ = model(x, targets=x[:, 3:]) |
|
|
| |
| compiled = apply_torch_compile(model, device) |
| torch.manual_seed(42) |
| x2 = torch.randint(0, min(VOCAB, 256), (2, CTX), device=device) |
| with torch.no_grad(): |
| out_compiled, _, _, _ = compiled(x2, targets=x2[:, 3:]) |
|
|
| |
| logits_b = out_baseline.logits if hasattr(out_baseline, 'logits') else out_baseline |
| logits_c = out_compiled.logits if hasattr(out_compiled, 'logits') else out_compiled |
| if isinstance(logits_b, tuple): |
| logits_b = logits_b[0] |
| if isinstance(logits_c, tuple): |
| logits_c = logits_c[0] |
|
|
| atol = 5e-2 |
| diff = (logits_b - logits_c).abs().max().item() |
| assert diff < atol, f"Compiled vs uncompiled output differs by {diff:.4f} > {atol}" |
|
|
| print(f" PASS test_torch_compile_no_regression (max_diff={diff:.4f}, device={device})") |
|
|
|
|
| def test_torchao_sparsity_no_ternary_layers(): |
| """TorchAO sparsity does NOT modify TernaryScaleTensor modules.""" |
| try: |
| from train import apply_torchao_sparsity |
| except ImportError: |
| raise ImportError("apply_torchao_sparsity not found in train.py") |
|
|
| if not torch.cuda.is_available(): |
| print(" SKIP test_torchao_sparsity_no_ternary_layers (CUDA required)") |
| return |
|
|
| device = "cuda" |
| model = ARBModel( |
| enable_vq=False, enable_graph=False, enable_image=False, |
| enable_memory_modules=False, enable_moe=True, |
| ).to(device) |
|
|
| |
| from arbitor.kernel.ternary_scale import TernaryScaleTensor |
| ternary_before = 0 |
| for mod in model.modules(): |
| if isinstance(mod, TernaryScaleTensor): |
| ternary_before += 1 |
|
|
| |
| try: |
| apply_torchao_sparsity(model, device) |
| except Exception as e: |
| print(f" apply_torchao_sparsity raised (non-fatal for this test): {e}") |
| |
| pass |
|
|
| |
| ternary_after = 0 |
| for mod in model.modules(): |
| if isinstance(mod, TernaryScaleTensor): |
| ternary_after += 1 |
|
|
| assert ternary_after == ternary_before, \ |
| f"TernaryScaleTensor count changed: {ternary_before} -> {ternary_after}" |
|
|
| print(f" PASS test_torchao_sparsity_no_ternary_layers " |
| f"({ternary_before} TernaryScaleTensor modules preserved)") |
|
|
|
|
| def test_regression_bar_check(): |
| """Regression bar correctly flags >bar BPB increase.""" |
| try: |
| from train import check_regression_bar |
| except ImportError: |
| raise ImportError("check_regression_bar not found in train.py") |
|
|
| bar = 0.05 |
|
|
| |
| passed, delta, pct, msg = check_regression_bar(1.0, 1.049, bar) |
| assert passed, f"Expected PASS for 4.9% increase, got: {msg}" |
|
|
| |
| passed, delta, pct, msg = check_regression_bar(1.0, 1.05, bar) |
| assert passed, f"Expected PASS for 5.0% increase, got: {msg}" |
|
|
| |
| passed, delta, pct, msg = check_regression_bar(1.0, 1.051, bar) |
| assert not passed, f"Expected FAIL for 5.1% increase, got: {msg}" |
|
|
| |
| passed, delta, pct, msg = check_regression_bar(0.0, 0.1, bar) |
| assert passed, f"Expected PASS for zero baseline, got: {msg}" |
|
|
| |
| passed, delta, pct, msg = check_regression_bar(1.0, 0.9, bar) |
| assert passed, f"Expected PASS for improvement, got: {msg}" |
|
|
| print(f" PASS test_regression_bar_check (all edge cases correct)") |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| tests = [ |
| test_bpb_from_loss, |
| test_perplexity_from_loss, |
| test_repetition_rate_with_repeated, |
| test_repetition_rate_empty, |
| test_distinct_n_all_unique, |
| test_distinct_n_all_same, |
| test_self_perplexity, |
| test_download_enwik8, |
| test_download_text8, |
| test_evaluate_returns_bpb_perplexity, |
| test_save_eval_checkpoint, |
| test_generate_with_top_k, |
| test_profiling_output_structure, |
| test_benchmark_output_structure, |
| test_compare_benchmarks, |
| test_torch_compile_no_regression, |
| test_torchao_sparsity_no_ternary_layers, |
| test_regression_bar_check, |
| ] |
| print("Running eval_metrics tests...\n") |
| passed = 0 |
| failed = 0 |
| for t in tests: |
| try: |
| t() |
| passed += 1 |
| except Exception as e: |
| print(f" FAIL {t.__name__}: {e}") |
| import traceback |
| traceback.print_exc() |
| failed += 1 |
| print(f"\n{passed} passed, {failed} failed out of {len(tests)} tests") |
|
|