llm-cal / src /llm_cal /core /explain.py
GitHub Actions
Auto-deploy from GitHub Actions
cc6274a
"""Full derivation traces for each non-trivial number in the report.
This module is only invoked when the user passes `--explain`. It doesn't
recompute anything — it reads the values that the main evaluator already
produced and wraps them in a formatted explanation with formula, inputs,
step-by-step computation, and primary source citation.
Design rationale: the tool's core promise is deterministic, auditable
output. `--explain` makes that auditability human-readable. A user can:
1. Read the explanation themselves
2. Paste it into an LLM and ask "does this math check out?"
3. Cross-reference docs/methodology.md for the primary source
All three preserve determinism — the LLM is the user's tool, not ours.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from llm_cal.core.evaluator import EvaluationReport
@dataclass(frozen=True)
class ExplainInput:
"""One input variable to a formula."""
name: str
value: str # pre-formatted for display
label: str # e.g. "[verified]", "[estimated]"
note: str = "" # optional disambiguation
@dataclass(frozen=True)
class ExplainEntry:
"""A full derivation trace for one output number."""
heading: str # localized section title, e.g. "KV cache @ 128K"
formula: str # the formula, literally
inputs: list[ExplainInput] = field(default_factory=list)
steps: list[str] = field(default_factory=list) # step-by-step computation
result: str = "" # final formatted answer with label
source: str = "" # primary source citation
methodology_anchor: str = "" # anchor in docs/methodology.md, e.g. "#prefill-latency"
def build(report: EvaluationReport) -> list[ExplainEntry]:
"""Produce explanation entries in the order they appear in the main report."""
entries: list[ExplainEntry] = []
_weight_bytes(report, entries)
_quantization(report, entries)
_kv_cache_contexts(report, entries)
_fleet_tiers(report, entries)
_prefill(report, entries)
_decode(report, entries)
_concurrency(report, entries)
return entries
# ======================================================================
# Weight
# ======================================================================
def _weight_bytes(report: EvaluationReport, entries: list[ExplainEntry]) -> None:
w = report.weight.total_bytes
entries.append(
ExplainEntry(
heading="Weight bytes (safetensors file sum)",
formula="sum(sibling.size for sibling in HF model_info(files_metadata=True).siblings if sibling.endswith('.safetensors'))",
inputs=[
ExplainInput(
name="HF model_info API",
value=f"source={report.source}, sha={report.commit_sha or 'HEAD'}",
label="[verified]",
),
],
steps=[
f"Raw value from API = {w.value:,} bytes",
f"= {w.value / 1e9:.2f} GB",
],
result=f"{w.value:,} bytes [verified]",
source=w.source or "HF siblings API",
methodology_anchor="#weight-bytes",
)
)
def _quantization(report: EvaluationReport, entries: list[ExplainEntry]) -> None:
r = report.reconciliation
if not r.candidates:
return
best = r.candidates[0]
cands_table = "\n".join(
f" {c.scheme:<16} predicted={c.predicted_bytes / 1e9:.2f} GB "
f"error={c.relative_error * 100:.1f}%"
for c in r.candidates[:6]
)
entries.append(
ExplainEntry(
heading="Quantization scheme (reconciliation)",
formula="best_match = argmin_scheme |observed_bytes - scheme.bpp × total_params|",
inputs=[
ExplainInput(
name="observed_bytes",
value=f"{r.observed_bytes:,}",
label="[verified]",
),
ExplainInput(
name="total_params",
value=f"{r.total_params:,}",
label="[estimated]",
note="from architecture formula — see '#params-estimate' entry below",
),
],
steps=[
"For each known quantization scheme, predict total bytes = bpp × params:",
cands_table,
f"Winner: {best.scheme} at {best.relative_error * 100:.1f}% error",
],
result=f"{r.best.value} [{r.best.label.value}]",
source="Nearest-anchor match against known bytes-per-param values",
methodology_anchor="#quantization-scheme",
)
)
# ======================================================================
# KV cache
# ======================================================================
def _kv_cache_contexts(report: EvaluationReport, entries: list[ExplainEntry]) -> None:
profile = report.profile
attn = profile.attention
if attn is None:
return
is_mla = attn.variant == "MLA"
is_csa_hca = attn.variant == "CSA_HCA"
for ctx, av in report.kv_cache_by_context.items():
if av.value == 0:
continue
# Rebuild the computation for transparency
if is_mla and attn.kv_lora_rank:
per_tok_per_layer = attn.kv_lora_rank * 2 # kv_lora_rank × dtype(2)
formula = "per_tok_per_layer = kv_lora_rank × dtype_bytes (MLA: compressed latent KV)"
inputs = [
ExplainInput("kv_lora_rank", str(attn.kv_lora_rank), "[verified]"),
ExplainInput("dtype_bytes", "2", "[verified]", note="BF16/FP16"),
ExplainInput("seq_len", f"{ctx:,}", "[verified]"),
ExplainInput("num_layers", str(profile.num_hidden_layers), "[verified]"),
]
else:
per_tok_per_layer = 2 * attn.num_kv_heads * attn.head_dim * 2
formula = "per_tok_per_layer = 2 × num_kv_heads × head_dim × dtype_bytes (standard attention)"
inputs = [
ExplainInput("num_kv_heads", str(attn.num_kv_heads), "[verified]"),
ExplainInput("head_dim", str(attn.head_dim), "[verified]"),
ExplainInput("dtype_bytes", "2", "[verified]", note="BF16/FP16"),
ExplainInput("seq_len", f"{ctx:,}", "[verified]"),
ExplainInput("num_layers", str(profile.num_hidden_layers), "[verified]"),
]
baseline = per_tok_per_layer * ctx * profile.num_hidden_layers
steps = [
f"per_tok_per_layer = {per_tok_per_layer:,} bytes",
f"baseline = per_tok_per_layer × seq_len × num_layers = {baseline:,} bytes",
]
if is_csa_hca and attn.compress_ratios:
ratios = attn.compress_ratios
avg = sum(1.0 if r == 0 else 1.0 / r for r in ratios) / len(ratios)
inputs.append(
ExplainInput(
"compress_ratios",
f"len={len(ratios)} (avg keep-fraction={avg:.4f})",
"[verified]",
)
)
formula += (
"\napply_csa_hca: baseline × avg(1/r_i for r_i in compress_ratios, 0 = keep-all=1)"
)
steps.extend(
[
f"avg_keep_fraction = {avg:.4f}",
f"result = baseline × avg_keep_fraction = {av.value:,} bytes",
]
)
else:
steps.append(f"result = baseline = {av.value:,} bytes")
entries.append(
ExplainEntry(
heading=f"KV cache @ {_fmt_ctx(ctx)} context",
formula=formula,
inputs=inputs,
steps=steps,
result=f"{av.value:,} bytes = {av.value / 1e9:.2f} GB [{av.label.value}]",
source=(
"DeepSeek-V2 paper (MLA); DeepSeek-V4 tech report (CSA+HCA); "
"standard attention formula per Attention Is All You Need (Vaswani 2017)"
),
methodology_anchor="#kv-cache-per-request",
)
)
# ======================================================================
# Fleet tiers
# ======================================================================
def _fleet_tiers(report: EvaluationReport, entries: list[ExplainEntry]) -> None:
if report.fleet is None or report.gpu_spec is None:
return
# One explain block per tier (min / dev / prod)
for opt in report.fleet.options:
tier_label = opt.tier
headroom = opt.usable_bytes_per_gpu - opt.weight_bytes_per_gpu
steps = [
f"per-GPU HBM usable (@ 90% util) = {opt.usable_bytes_per_gpu:,} bytes",
f"weight per GPU = total_weight / TP_size = "
f"{report.weight.total_bytes.value:,} / {opt.gpu_count} = "
f"{opt.weight_bytes_per_gpu:,} bytes",
f"headroom per GPU = usable - weight = {headroom:,} bytes ({headroom / 1e9:.2f} GB)",
]
fit_criterion = {"min": 1, "dev": 8, "prod": 16}.get(tier_label, 1)
steps.append(
f"tier criterion: headroom ≥ weight_per_gpu + {fit_criterion} × kv_per_request_128K"
)
steps.append(
f"smallest TP count in {list(report.fleet.valid_tp_sizes)} that "
f"satisfies the criterion: {opt.gpu_count}"
)
if not opt.fits:
steps.append(
f"NOTE: does not fit the criterion — the chosen {opt.gpu_count} "
"is the best available."
)
entries.append(
ExplainEntry(
heading=f"Fleet tier: {tier_label} ({opt.gpu_count} GPUs)",
formula=(
"smallest TP in valid_set where "
"weight_per_gpu + concurrent × kv_per_request ≤ usable_per_gpu"
),
inputs=[
ExplainInput(
"total_weight_bytes",
f"{report.weight.total_bytes.value:,}",
"[verified]",
),
ExplainInput(
"valid_TP_sizes",
str(list(report.fleet.valid_tp_sizes)),
"[estimated]",
note="divisors of num_attention_heads capped at 8 (single node)",
),
ExplainInput(
"GPU memory_gb",
f"{report.gpu_spec.memory_gb} GB",
"[verified]",
),
],
steps=steps,
result=f"{opt.gpu_count} GPUs, fit={opt.fits}",
source="vLLM --gpu-memory-utilization 0.9 convention; TP divisibility required by vLLM/SGLang",
methodology_anchor="#tp-aware-kv-sharding",
)
)
# ======================================================================
# Prefill
# ======================================================================
def _prefill(report: EvaluationReport, entries: list[ExplainEntry]) -> None:
if (
report.prefill is None
or report.gpu_spec is None
or report.fleet is None
or report.perf_input_tokens is None
):
return
p = report.prefill
# Figure out chosen GPU count from the fleet
chosen = next(
(o.gpu_count for o in report.fleet.options if o.tier == report.fleet.best_tier),
report.fleet.options[0].gpu_count,
)
entries.append(
ExplainEntry(
heading="Prefill latency (single request)",
formula=(
"FLOPs = 2 × params × input_tokens\n"
"effective_TFLOPS = peak_fp16_TFLOPS × num_gpus × utilization\n"
"latency_ms = (FLOPs / (effective_TFLOPS × 1e12)) × 1000"
),
inputs=[
ExplainInput(
"params",
f"{report.total_params_estimate.value:,}",
"[estimated]",
note="from architecture formula (see weight.py)",
),
ExplainInput("input_tokens", f"{report.perf_input_tokens:,}", "[user-set]"),
ExplainInput(
"peak_fp16_TFLOPS",
f"{report.gpu_spec.fp16_tflops}",
"[verified]",
note=f"from GPU database, {report.gpu_spec.id} spec",
),
ExplainInput("num_gpus", f"{chosen}", "[estimated]"),
ExplainInput(
"utilization",
f"{p.utilization:.2f}",
"[user-set]",
note="empirical MFU, default 0.40 — override with --prefill-util",
),
],
steps=[
f"FLOPs = 2 × {report.total_params_estimate.value:,} × "
f"{report.perf_input_tokens:,} = {p.total_flops.value:.3e}",
f"effective_TFLOPS = {report.gpu_spec.fp16_tflops} × {chosen} × "
f"{p.utilization:.2f} = {p.peak_effective_tflops.value:.1f}",
f"latency = {p.total_flops.value:.3e} / "
f"({p.peak_effective_tflops.value:.1f} × 1e12) × 1000 = "
f"{p.latency_ms.value:.1f} ms",
],
result=f"{p.latency_ms.value:.1f} ms [{p.latency_ms.label.value}]",
source="Kaplan et al. 2020 'Scaling Laws for Neural Language Models' (arxiv.org/abs/2001.08361)",
methodology_anchor="#prefill-latency",
)
)
# ======================================================================
# Decode
# ======================================================================
def _decode(report: EvaluationReport, entries: list[ExplainEntry]) -> None:
if report.decode is None or report.gpu_spec is None or report.fleet is None:
return
d = report.decode
bw = report.gpu_spec.memory_bandwidth_gbps or 0
chosen = next(
(o.gpu_count for o in report.fleet.options if o.tier == report.fleet.best_tier),
report.fleet.options[0].gpu_count,
)
weight_per_gpu = d.active_weight_bytes_per_gpu.value
effective_bw_gbs = bw * d.bw_utilization
steps = [
f"weight_per_gpu = {report.weight.total_bytes.value:,} / {chosen} = "
f"{weight_per_gpu:,} bytes ({weight_per_gpu / 1e9:.2f} GB)",
f"effective_bw = {bw} × {d.bw_utilization:.2f} = {effective_bw_gbs:.0f} GB/s",
f"per_gpu_tok_per_sec = effective_bw / weight_per_gpu = "
f"{effective_bw_gbs * 1e9 / weight_per_gpu:.1f} tok/s",
f"cluster_tok_per_sec = per_gpu × {chosen} × "
f"{d.cluster_comm_efficiency:.2f} = {d.cluster_tokens_per_sec.value:.1f} tok/s",
]
entries.append(
ExplainEntry(
heading="Decode throughput (cluster)",
formula=(
"per_gpu_tok_per_sec = memory_bandwidth × bw_util / weight_bytes_per_gpu\n"
"cluster_tok_per_sec = per_gpu × num_gpus × cluster_comm_efficiency"
),
inputs=[
ExplainInput(
"GPU memory_bandwidth_gbps",
f"{bw}",
"[verified]",
note=f"from GPU database, {report.gpu_spec.id}",
),
ExplainInput(
"bw_util",
f"{d.bw_utilization:.2f}",
"[user-set]",
note="empirical, default 0.50 — override with --decode-bw-util",
),
ExplainInput("weight_bytes_per_gpu", f"{weight_per_gpu:,}", "[estimated]"),
ExplainInput("num_gpus", f"{chosen}", "[estimated]"),
ExplainInput(
"cluster_comm_efficiency",
f"{d.cluster_comm_efficiency:.2f}",
"[user-set]",
note="NCCL AllReduce efficiency on NVLink, default 0.90",
),
],
steps=steps,
result=f"{d.cluster_tokens_per_sec.value:.1f} tok/s [estimated]",
source="vLLM paper (Kwon et al. SOSP 2023, arxiv.org/abs/2309.06180)",
methodology_anchor="#decode-tokens-per-second",
)
)
# ======================================================================
# Concurrency bounds
# ======================================================================
def _concurrency(report: EvaluationReport, entries: list[ExplainEntry]) -> None:
if report.concurrency is None:
return
c = report.concurrency
entries.append(
ExplainEntry(
heading="K bound (memory capacity)",
formula="K = floor(per_GPU_headroom_bytes / per_GPU_kv_bytes_per_request)",
inputs=[
ExplainInput(
"per_GPU_headroom_bytes",
f"{c.k_source_headroom_bytes:,}",
"[estimated]",
),
ExplainInput(
"per_GPU_kv_bytes_per_request",
f"{c.k_source_kv_per_req_bytes:,}",
"[estimated]",
note="post-TP-sharding via min(tp, num_kv_heads)",
),
],
steps=[
f"K = floor({c.k_source_headroom_bytes:,} / "
f"{c.k_source_kv_per_req_bytes:,}) = {c.k_bound.value}",
],
result=f"K = {c.k_bound.value} [{c.k_bound.label.value}]",
source="TP sharding rule from vLLM source code (verified)",
methodology_anchor="#k-bound-memory-capacity",
)
)
l_tps = report.decode.cluster_tokens_per_sec.value if report.decode else 0
entries.append(
ExplainEntry(
heading="L bound (compute/bandwidth at SLA)",
formula=(
"L = floor(cluster_tok_per_sec / target_per_user_tok_per_sec / degradation_factor)"
),
inputs=[
ExplainInput("cluster_tok_per_sec", f"{l_tps:.1f}", "[estimated]"),
ExplainInput(
"target_per_user_tok_per_sec",
f"{c.target_tokens_per_sec:.1f}",
"[user-set]",
note="SLA, override with --target-tokens-per-sec",
),
ExplainInput(
"degradation_factor",
f"{c.degradation_factor:.2f}",
"[user-set]",
note="default 1.0 = no degradation; override with --concurrency-degradation",
),
],
steps=[
f"L = floor({l_tps:.1f} / {c.target_tokens_per_sec:.1f} / "
f"{c.degradation_factor:.2f}) = {c.l_bound.value}",
],
result=f"L = {c.l_bound.value} [{c.l_bound.label.value}]",
source="Standard SLA-based capacity planning",
methodology_anchor="#l-bound-compute-bandwidth-at-sla",
)
)
entries.append(
ExplainEntry(
heading="Max concurrent + bottleneck verdict",
formula="max_concurrent = min(K, L); bottleneck = 'memory_capacity' if K ≤ L else 'memory_bandwidth / compute'",
inputs=[
ExplainInput("K", str(c.k_bound.value), f"[{c.k_bound.label.value}]"),
ExplainInput("L", str(c.l_bound.value), f"[{c.l_bound.label.value}]"),
],
steps=[
f"max_concurrent = min(K={c.k_bound.value}, L={c.l_bound.value}) = "
f"{c.max_concurrent.value}",
f"bottleneck = {c.bottleneck}",
],
result=(f"{c.max_concurrent.value} concurrent, bottleneck = {c.bottleneck}"),
source=c.bottleneck_reason_en,
methodology_anchor="#concurrency-bounds-k-l",
)
)
# Sanity check to silence "unused math import" if no steps triggered math.
_ = math.floor(0)
# ======================================================================
# Helpers
# ======================================================================
def _fmt_ctx(ctx: int) -> str:
if ctx >= 1_000_000:
return f"{ctx // 1_000_000}M"
if ctx >= 1024:
return f"{ctx // 1024}K"
return str(ctx)