ARBS / testing /eval /test_eval.py
CLIWorks's picture
Upload folder using huggingface_hub
d8bc908 verified
"""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
Tests for eval_metrics.py β€” generation quality metrics and BPB/perplexity helpers.
Follows the test runner pattern from testing/test_morph.py (manual test list
with passed/failed counting at the bottom).
"""
import sys
import os
import math
import json
import math
import os
import tempfile
import torch
import torch.nn.functional as F
from arbitor.main import ARBModel, CTX, VOCAB
from eval_metrics import (
bpb_from_loss,
perplexity_from_loss,
repetition_rate,
distinct_n,
self_perplexity,
)
# ── Test 1: bpb_from_loss ─────────────────────────────────────────────
def test_bpb_from_loss():
"""BPB of loss=1.0 should be 1.0 / ln(2) β‰ˆ 1.4427."""
result = bpb_from_loss(1.0)
expected = 1.0 / math.log(2)
assert abs(result - expected) < 1e-5, (
f"bpb_from_loss(1.0)={result}, expected={expected}"
)
print(f" PASS test_bpb_from_loss ({result:.4f})")
# ── Test 2: perplexity_from_loss ──────────────────────────────────────
def test_perplexity_from_loss():
"""Perplexity of loss=2.0 should be exp(2.0) β‰ˆ 7.389."""
result = perplexity_from_loss(2.0)
expected = math.exp(2.0)
assert abs(result - expected) < 1e-5, (
f"perplexity_from_loss(2.0)={result}, expected={expected}"
)
print(f" PASS test_perplexity_from_loss ({result:.4f})")
# ── Test 3: repetition_rate with repeated unigrams ───────────────────
def test_repetition_rate_with_repeated():
"""'aab' byte list with n=1 should have > 0.0 repetition (repeated 'a')."""
byte_list = [97, 97, 98] # "aab"
result = repetition_rate(byte_list, n=1)
assert result > 0.0, (
f"Expected > 0.0 for 'aab' with n=1, got {result}"
)
print(f" PASS test_repetition_rate_with_repeated ({result:.4f})")
# ── Test 4: repetition_rate empty list ────────────────────────────────
def test_repetition_rate_empty():
"""Empty list should return 0.0."""
result = repetition_rate([], n=2)
assert result == 0.0, (
f"Expected 0.0 for empty list, got {result}"
)
print(" PASS test_repetition_rate_empty")
# ── Test 5: distinct_n all unique bigrams ─────────────────────────────
def test_distinct_n_all_unique():
"""[1,2,3,4,5] with n=2 should return 1.0 (all unique bigrams)."""
byte_list = [1, 2, 3, 4, 5]
result = distinct_n(byte_list, n=2)
assert result == 1.0, (
f"Expected 1.0 for all unique bigrams, got {result}"
)
print(" PASS test_distinct_n_all_unique")
# ── Test 6: distinct_n all same bigrams ───────────────────────────────
def test_distinct_n_all_same():
"""[1,1,1,1] with n=2 should return ~0.333 (1 unique / 3 total)."""
byte_list = [1, 1, 1, 1]
result = distinct_n(byte_list, n=2)
expected = 1.0 / 3.0 # 1 unique bigram out of 3 total
assert abs(result - expected) < 1e-5, (
f"Expected {expected:.4f} for all-same bigrams, got {result}"
)
print(f" PASS test_distinct_n_all_same ({result:.4f})")
# ── Test 7: self_perplexity ───────────────────────────────────────────
def test_self_perplexity():
"""self_perplexity should return a float >= 1.0 for any model + sequence."""
device = "cuda" if torch.cuda.is_available() else "cpu"
model = ARBModel(
enable_vq=False,
enable_graph=False,
enable_image=False,
enable_memory_modules=False,
enable_moe=True,
).to(device)
byte_list = [72, 101, 108, 108, 111, 44, 32, 119, 111, 114, 108, 100, 33,
32, 84, 104, 105, 115, 32, 105, 115, 32, 97, 32, 116, 101,
115, 116, 46] # "Hello, world! This is a test."
result = self_perplexity(model, byte_list, ctx=64, device=device)
assert isinstance(result, float), (
f"Expected float, got {type(result)}"
)
assert result >= 1.0, (
f"Expected >= 1.0, got {result}"
)
print(f" PASS test_self_perplexity (result={result:.2f})")
# ── Test 8: download_enwik8 ──────────────────────────────────────────
def test_download_enwik8():
"""download_enwik8 should create data/enwik8 file or skip if exists."""
try:
from train import download_enwik8
except ImportError:
raise ImportError("download_enwik8 not yet implemented in train.py")
with tempfile.TemporaryDirectory() as tmpdir:
try:
data = download_enwik8(tmpdir)
except Exception as e:
print(f" SKIP test_download_enwik8 (network/download failed): {e}")
return
assert isinstance(data, torch.Tensor), (
f"Expected Tensor, got {type(data)}"
)
assert data.dtype == torch.long, (
f"Expected torch.long, got {data.dtype}"
)
assert data.numel() > 0, "Expected non-empty tensor"
enwik8_path = os.path.join(tmpdir, "enwik8")
assert os.path.exists(enwik8_path), (
f"Expected enwik8 file at {enwik8_path}"
)
file_size = os.path.getsize(enwik8_path)
print(f" PASS test_download_enwik8 (file={file_size:,} bytes, tensor={data.numel():,})")
# ── Test 9: download_text8 ───────────────────────────────────────────
def test_download_text8():
"""download_text8 should create data/text8 file or skip if exists."""
try:
from train import download_text8
except ImportError:
raise ImportError("download_text8 not yet implemented in train.py")
with tempfile.TemporaryDirectory() as tmpdir:
try:
data = download_text8(tmpdir)
except Exception as e:
print(f" SKIP test_download_text8 (network/download failed): {e}")
return
assert isinstance(data, torch.Tensor), (
f"Expected Tensor, got {type(data)}"
)
assert data.dtype == torch.long, (
f"Expected torch.long, got {data.dtype}"
)
assert data.numel() > 0, "Expected non-empty tensor"
print(f" PASS test_download_text8 (tensor={data.numel():,})")
# ── Test 10: evaluate returns (avg_loss, bpb, perplexity) ────────────
def test_evaluate_returns_bpb_perplexity():
"""evaluate() should return (avg_loss, bpb, perplexity) with bpb=loss/ln(2)."""
try:
from train import evaluate
except ImportError:
raise ImportError("evaluate not importable from train.py")
device = "cuda" if torch.cuda.is_available() else "cpu"
model = ARBModel(
enable_vq=False, enable_graph=False, enable_image=False,
enable_memory_modules=False, enable_moe=True,
).to(device)
# Create tiny validation data
val_data = torch.randint(0, min(VOCAB, 256), (500,), dtype=torch.long, device="cpu")
try:
result = evaluate(model, val_data, batch_size=4, ctx=CTX, device=device,
eval_steps=2, compute_dtype="bf16" if device == "cuda" else "none")
except TypeError as e:
raise TypeError(
f"evaluate() may not return 3 values yet: {e}"
)
assert isinstance(result, (tuple, list)) and len(result) == 3, (
f"Expected tuple of 3, got {type(result)} len={len(result) if isinstance(result, (tuple, list)) else 'N/A'}"
)
avg_loss, bpb, ppl = result
assert isinstance(avg_loss, float), f"avg_loss should be float, got {type(avg_loss)}"
assert isinstance(bpb, float), f"bpb should be float, got {type(bpb)}"
assert isinstance(ppl, float), f"perplexity should be float, got {type(ppl)}"
# Verify bpb β‰ˆ avg_loss / ln(2)
expected_bpb = avg_loss / math.log(2)
assert abs(bpb - expected_bpb) < 1e-5, (
f"bpb={bpb} != avg_loss/ln(2)={expected_bpb}"
)
# Verify perplexity β‰ˆ exp(avg_loss)
expected_ppl = math.exp(avg_loss)
assert abs(ppl - expected_ppl) < 1e-4, (
f"ppl={ppl} != exp(avg_loss)={expected_ppl}"
)
print(f" PASS test_evaluate_returns_bpb_perplexity (loss={avg_loss:.4f}, bpb={bpb:.4f}, ppl={ppl:.2f})")
# ── Test 11: save_eval_checkpoint ─────────────────────────────────═══
def test_save_eval_checkpoint():
"""save_eval_checkpoint should create JSON with required keys."""
try:
from train import save_eval_checkpoint
except ImportError:
raise ImportError("save_eval_checkpoint not yet implemented in train.py")
device = "cuda" if torch.cuda.is_available() else "cpu"
model = ARBModel(
enable_vq=False, enable_graph=False, enable_image=False,
enable_memory_modules=False, enable_moe=True,
).to(device)
gen_quality = {
"repetition_rate_2": 0.5,
"distinct_2": 0.3,
"distinct_3": 0.5,
"distinct_4": 0.6,
"self_perplexity": 100.0,
"printable_fraction": 0.9,
"byte_diversity": 0.5,
"n_bytes": 100,
}
with tempfile.TemporaryDirectory() as tmpdir:
save_eval_checkpoint(
tmpdir, step=100, bpb=1.5, perplexity=10.0,
model=model, generation_quality=gen_quality,
)
json_files = [f for f in os.listdir(tmpdir) if f.endswith(".json")]
assert len(json_files) > 0, (
f"No JSON files found in {tmpdir}"
)
with open(os.path.join(tmpdir, json_files[0]), "r") as f:
data = json.load(f)
required_keys = [
"step", "bpb", "perplexity", "codebook_utilization",
"expert_utilization", "routing_entropy", "generation_quality",
]
for key in required_keys:
assert key in data, (
f"Required key '{key}' missing from checkpoint JSON. Got keys: {list(data.keys())}"
)
assert data["step"] == 100
assert abs(data["bpb"] - 1.5) < 1e-5
assert abs(data["perplexity"] - 10.0) < 1e-5
print(" PASS test_save_eval_checkpoint")
# ── Test 12: generate() with top_k and min_new_tokens ────────────────
def test_generate_with_top_k():
"""generate() with top_k=40 and min_new_tokens=100 produces >= 100 new tokens."""
device = "cuda" if torch.cuda.is_available() else "cpu"
model = ARBModel(
enable_vq=False, enable_graph=False, enable_image=False,
enable_memory_modules=False, enable_moe=True,
).to(device)
model.eval()
seed = torch.tensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]], dtype=torch.long, device=device)
n_seed = seed.shape[1]
try:
result = model.generate(
seed, max_new_token=120, temperature=0.8,
top_k=40, min_new_tokens=100,
)
except TypeError as e:
raise TypeError(
f"generate() may not accept top_k/min_new_tokens yet: {e}"
)
# result could be (idx, metadata) tuple or just idx
if isinstance(result, tuple):
idx, metadata = result
assert isinstance(metadata, dict), (
f"Expected metadata dict, got {type(metadata)}"
)
assert "n_tokens" in metadata
else:
idx = result
assert idx.shape[0] == 1, f"Expected batch dim 1, got {idx.shape}"
n_total = idx.shape[1]
n_new = n_total - n_seed
assert n_new >= 100, (
f"Expected >= 100 new tokens, got {n_new} (total={n_total}, seed={n_seed})"
)
print(f" PASS test_generate_with_top_k (new_tokens={n_new}, total={n_total})")
# ── Profiling & Benchmark Tests ───────────────────────────────────────
def test_profiling_output_structure():
"""profile_training returns list of dicts with top-K hot path data."""
try:
from profiling import profile_training, analyze_profiler_output
except ImportError:
raise ImportError("profiling.py not yet implemented")
device = "cuda" if torch.cuda.is_available() else "cpu"
# lightweight model for profiling
model = ARBModel(
enable_vq=False, enable_graph=False, enable_image=False,
enable_memory_modules=False, enable_moe=True,
)
if device == "cuda":
model = model.cuda()
train_data = torch.randint(0, min(VOCAB, 256), (500,), dtype=torch.long)
if device == "cuda":
import signal
class TimeoutError(Exception):
pass
def _handler(signum, frame):
raise TimeoutError("profile_training timed out")
old_handler = signal.signal(signal.SIGALRM, _handler)
signal.alarm(30)
try:
result = profile_training(model, train_data, device, n_steps=2, warmup_steps=1, top_k=5)
except TimeoutError:
print(" WARN test_profiling_output_structure: profile_training timed out (CUPTI?)")
result = []
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
if result:
assert isinstance(result, list), f"Expected list, got {type(result)}"
keys = result[0].keys()
has_op_name = "op_name" in keys or "name" in keys
has_time = any("time" in k.lower() for k in keys)
assert has_op_name, f"Missing op_name/name in keys: {keys}"
assert has_time, f"Missing time field in keys: {keys}"
print(f" PASS test_profiling_output_structure ({len(result)} ops)")
else:
print(f" PASS test_profiling_output_structure (timeout-skip)")
else:
# CPU: test analyze_profiler_output with a synthetic JSON file
import tempfile
synthetic = [
{"name": "aten::mm", "cuda_time_us": 1500, "cpu_time_us": 200, "calls": 5},
{"name": "aten::softmax", "cuda_time_us": 800, "cpu_time_us": 100, "calls": 3},
]
tmpf = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
json.dump(synthetic, tmpf)
tmpf.close()
try:
result = analyze_profiler_output(tmpf.name)
finally:
os.unlink(tmpf.name)
assert isinstance(result, list), f"Expected list, got {type(result)}"
assert len(result) > 0, "Expected non-empty list"
assert "op_name" in result[0] or "name" in result[0], \
f"Missing op_name/name: {result[0].keys()}"
print(f" PASS test_profiling_output_structure ({len(result)} ops)")
def test_benchmark_output_structure():
"""run_benchmark returns dict with tokens_per_sec and peak_memory_mb."""
try:
from benchmark import run_benchmark
except ImportError:
raise ImportError("benchmark.py not yet implemented")
device = "cuda" if torch.cuda.is_available() else "cpu"
model = ARBModel(
enable_vq=False, enable_graph=False, enable_image=False,
enable_memory_modules=False, enable_moe=True,
)
if device == "cuda":
model = model.cuda()
model.eval()
train_data = torch.randint(0, min(VOCAB, 256), (2000,), dtype=torch.long)
import signal
class TimeoutError(Exception):
pass
def _handler(signum, frame):
raise TimeoutError("benchmark timed out")
old_handler = signal.signal(signal.SIGALRM, _handler)
signal.alarm(30)
try:
result = run_benchmark(
model, train_data, device, n_steps=2, warmup_steps=1,
batch_size=4, ctx=CTX,
)
except TimeoutError:
print(" WARN test_benchmark_output_structure: benchmark timed out")
result = {"tokens_per_sec": 0.0, "peak_memory_mb": 0.0, "n_steps": 0, "batch_size": 4, "ctx": CTX, "device": device}
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
assert isinstance(result, dict), f"Expected dict, got {type(result)}"
for key in ["tokens_per_sec", "peak_memory_mb", "n_steps", "batch_size", "ctx", "device"]:
assert key in result, f"Missing key '{key}' in result"
print(f" PASS test_benchmark_output_structure "
f"(tokens/s={result['tokens_per_sec']:.1f}, "
f"peak_mem={result['peak_memory_mb']:.1f}MB)")
def test_compare_benchmarks():
"""compare_benchmarks correctly computes delta between two runs."""
try:
from benchmark import compare_benchmarks
except ImportError:
raise ImportError("benchmark.py not yet implemented")
import tempfile
before = {
"tokens_per_sec": 1000.0,
"peak_memory_mb": 500.0,
"n_steps": 10, "batch_size": 64, "ctx": 66, "device": "cuda",
}
after = {
"tokens_per_sec": 1500.0,
"peak_memory_mb": 450.0,
"n_steps": 10, "batch_size": 64, "ctx": 66, "device": "cuda",
}
def _write_json(d, tmpdir, name):
path = os.path.join(tmpdir, name)
with open(path, "w") as f:
json.dump(d, f)
return path
with tempfile.TemporaryDirectory() as tmpdir:
before_path = _write_json(before, tmpdir, "before.json")
after_path = _write_json(after, tmpdir, "after.json")
comp = compare_benchmarks(before_path, after_path)
assert isinstance(comp, dict), f"Expected dict, got {type(comp)}"
assert "before" in comp, "Missing 'before' in comparison"
assert "after" in comp, "Missing 'after' in comparison"
assert "delta" in comp, "Missing 'delta' in comparison"
assert "pct_change" in comp, "Missing 'pct_change' in comparison"
# Verify math: tokens/sec delta = 1500 - 1000 = 500; pct = 500/1000 * 100 = 50%
assert abs(comp["pct_change"]["tokens_per_sec"] - 50.0) < 1e-5, \
f"Expected tokens/sec +50%, got {comp['pct_change']['tokens_per_sec']}"
assert abs(comp["delta"]["tokens_per_sec"] - 500.0) < 1e-5, \
f"Expected tokens/sec delta 500, got {comp['delta']['tokens_per_sec']}"
# Memory delta = 450 - 500 = -50
assert abs(comp["pct_change"]["peak_memory_mb"] - (-10.0)) < 1e-5, \
f"Expected memory -10%, got {comp['pct_change']['peak_memory_mb']}"
print(f" PASS test_compare_benchmarks "
f"(tokens/sec: {comp['delta']['tokens_per_sec']:+.1f} / {comp['pct_change']['tokens_per_sec']:+.1f}%)")
# ── Optimization Tests ────────────────────────────────────────────────
def test_torch_compile_no_regression():
"""Compiled model produces same output as uncompiled within tolerance."""
try:
from train import apply_torch_compile
except ImportError:
raise ImportError("apply_torch_compile not found in train.py")
device = "cuda" if torch.cuda.is_available() else "cpu"
model = ARBModel(
enable_vq=False, enable_graph=False, enable_image=False,
enable_memory_modules=False, enable_moe=True,
).to(device).eval()
# Baseline forward pass
torch.manual_seed(42)
x = torch.randint(0, min(VOCAB, 256), (2, CTX), device=device)
with torch.no_grad():
out_baseline, _, _, _ = model(x, targets=x[:, 3:])
# Compiled forward pass
compiled = apply_torch_compile(model, device)
torch.manual_seed(42)
x2 = torch.randint(0, min(VOCAB, 256), (2, CTX), device=device)
with torch.no_grad():
out_compiled, _, _, _ = compiled(x2, targets=x2[:, 3:])
# Compare logits within tolerance
logits_b = out_baseline.logits if hasattr(out_baseline, 'logits') else out_baseline
logits_c = out_compiled.logits if hasattr(out_compiled, 'logits') else out_compiled
if isinstance(logits_b, tuple):
logits_b = logits_b[0]
if isinstance(logits_c, tuple):
logits_c = logits_c[0]
atol = 5e-2 # relaxed tolerance for compilation differences
diff = (logits_b - logits_c).abs().max().item()
assert diff < atol, f"Compiled vs uncompiled output differs by {diff:.4f} > {atol}"
print(f" PASS test_torch_compile_no_regression (max_diff={diff:.4f}, device={device})")
def test_torchao_sparsity_no_ternary_layers():
"""TorchAO sparsity does NOT modify TernaryScaleTensor modules."""
try:
from train import apply_torchao_sparsity
except ImportError:
raise ImportError("apply_torchao_sparsity not found in train.py")
if not torch.cuda.is_available():
print(" SKIP test_torchao_sparsity_no_ternary_layers (CUDA required)")
return
device = "cuda"
model = ARBModel(
enable_vq=False, enable_graph=False, enable_image=False,
enable_memory_modules=False, enable_moe=True,
).to(device)
# Count TernaryScaleTensor modules before sparsification
from arbitor.kernel.ternary_scale import TernaryScaleTensor
ternary_before = 0
for mod in model.modules():
if isinstance(mod, TernaryScaleTensor):
ternary_before += 1
# Apply sparsity
try:
apply_torchao_sparsity(model, device)
except Exception as e:
print(f" apply_torchao_sparsity raised (non-fatal for this test): {e}")
# This test checks that ternary layers aren't modified, not that sparsity works
pass
# Verify TernaryScaleTensor modules still exist and are untouched
ternary_after = 0
for mod in model.modules():
if isinstance(mod, TernaryScaleTensor):
ternary_after += 1
assert ternary_after == ternary_before, \
f"TernaryScaleTensor count changed: {ternary_before} -> {ternary_after}"
print(f" PASS test_torchao_sparsity_no_ternary_layers "
f"({ternary_before} TernaryScaleTensor modules preserved)")
def test_regression_bar_check():
"""Regression bar correctly flags >bar BPB increase."""
try:
from train import check_regression_bar
except ImportError:
raise ImportError("check_regression_bar not found in train.py")
bar = 0.05 # 5%
# Below bar: 4.9% increase should pass
passed, delta, pct, msg = check_regression_bar(1.0, 1.049, bar)
assert passed, f"Expected PASS for 4.9% increase, got: {msg}"
# At bar exactly: 5.0% should pass (<=)
passed, delta, pct, msg = check_regression_bar(1.0, 1.05, bar)
assert passed, f"Expected PASS for 5.0% increase, got: {msg}"
# Above bar: 5.1% should fail
passed, delta, pct, msg = check_regression_bar(1.0, 1.051, bar)
assert not passed, f"Expected FAIL for 5.1% increase, got: {msg}"
# Zero baseline
passed, delta, pct, msg = check_regression_bar(0.0, 0.1, bar)
assert passed, f"Expected PASS for zero baseline, got: {msg}"
# Improvement (negative delta) always passes
passed, delta, pct, msg = check_regression_bar(1.0, 0.9, bar)
assert passed, f"Expected PASS for improvement, got: {msg}"
print(f" PASS test_regression_bar_check (all edge cases correct)")
# ── Runner ────────────────────────────────────────────────────────────
if __name__ == "__main__":
tests = [
test_bpb_from_loss,
test_perplexity_from_loss,
test_repetition_rate_with_repeated,
test_repetition_rate_empty,
test_distinct_n_all_unique,
test_distinct_n_all_same,
test_self_perplexity,
test_download_enwik8,
test_download_text8,
test_evaluate_returns_bpb_perplexity,
test_save_eval_checkpoint,
test_generate_with_top_k,
test_profiling_output_structure,
test_benchmark_output_structure,
test_compare_benchmarks,
test_torch_compile_no_regression,
test_torchao_sparsity_no_ternary_layers,
test_regression_bar_check,
]
print("Running eval_metrics tests...\n")
passed = 0
failed = 0
for t in tests:
try:
t()
passed += 1
except Exception as e:
print(f" FAIL {t.__name__}: {e}")
import traceback
traceback.print_exc()
failed += 1
print(f"\n{passed} passed, {failed} failed out of {len(tests)} tests")