Spaces:
Running
Running
| """Full derivation traces for each non-trivial number in the report. | |
| This module is only invoked when the user passes `--explain`. It doesn't | |
| recompute anything — it reads the values that the main evaluator already | |
| produced and wraps them in a formatted explanation with formula, inputs, | |
| step-by-step computation, and primary source citation. | |
| Design rationale: the tool's core promise is deterministic, auditable | |
| output. `--explain` makes that auditability human-readable. A user can: | |
| 1. Read the explanation themselves | |
| 2. Paste it into an LLM and ask "does this math check out?" | |
| 3. Cross-reference docs/methodology.md for the primary source | |
| All three preserve determinism — the LLM is the user's tool, not ours. | |
| """ | |
| from __future__ import annotations | |
| import math | |
| from dataclasses import dataclass, field | |
| from llm_cal.core.evaluator import EvaluationReport | |
| class ExplainInput: | |
| """One input variable to a formula.""" | |
| name: str | |
| value: str # pre-formatted for display | |
| label: str # e.g. "[verified]", "[estimated]" | |
| note: str = "" # optional disambiguation | |
| class ExplainEntry: | |
| """A full derivation trace for one output number.""" | |
| heading: str # localized section title, e.g. "KV cache @ 128K" | |
| formula: str # the formula, literally | |
| inputs: list[ExplainInput] = field(default_factory=list) | |
| steps: list[str] = field(default_factory=list) # step-by-step computation | |
| result: str = "" # final formatted answer with label | |
| source: str = "" # primary source citation | |
| methodology_anchor: str = "" # anchor in docs/methodology.md, e.g. "#prefill-latency" | |
| def build(report: EvaluationReport) -> list[ExplainEntry]: | |
| """Produce explanation entries in the order they appear in the main report.""" | |
| entries: list[ExplainEntry] = [] | |
| _weight_bytes(report, entries) | |
| _quantization(report, entries) | |
| _kv_cache_contexts(report, entries) | |
| _fleet_tiers(report, entries) | |
| _prefill(report, entries) | |
| _decode(report, entries) | |
| _concurrency(report, entries) | |
| return entries | |
| # ====================================================================== | |
| # Weight | |
| # ====================================================================== | |
| def _weight_bytes(report: EvaluationReport, entries: list[ExplainEntry]) -> None: | |
| w = report.weight.total_bytes | |
| entries.append( | |
| ExplainEntry( | |
| heading="Weight bytes (safetensors file sum)", | |
| formula="sum(sibling.size for sibling in HF model_info(files_metadata=True).siblings if sibling.endswith('.safetensors'))", | |
| inputs=[ | |
| ExplainInput( | |
| name="HF model_info API", | |
| value=f"source={report.source}, sha={report.commit_sha or 'HEAD'}", | |
| label="[verified]", | |
| ), | |
| ], | |
| steps=[ | |
| f"Raw value from API = {w.value:,} bytes", | |
| f"= {w.value / 1e9:.2f} GB", | |
| ], | |
| result=f"{w.value:,} bytes [verified]", | |
| source=w.source or "HF siblings API", | |
| methodology_anchor="#weight-bytes", | |
| ) | |
| ) | |
| def _quantization(report: EvaluationReport, entries: list[ExplainEntry]) -> None: | |
| r = report.reconciliation | |
| if not r.candidates: | |
| return | |
| best = r.candidates[0] | |
| cands_table = "\n".join( | |
| f" {c.scheme:<16} predicted={c.predicted_bytes / 1e9:.2f} GB " | |
| f"error={c.relative_error * 100:.1f}%" | |
| for c in r.candidates[:6] | |
| ) | |
| entries.append( | |
| ExplainEntry( | |
| heading="Quantization scheme (reconciliation)", | |
| formula="best_match = argmin_scheme |observed_bytes - scheme.bpp × total_params|", | |
| inputs=[ | |
| ExplainInput( | |
| name="observed_bytes", | |
| value=f"{r.observed_bytes:,}", | |
| label="[verified]", | |
| ), | |
| ExplainInput( | |
| name="total_params", | |
| value=f"{r.total_params:,}", | |
| label="[estimated]", | |
| note="from architecture formula — see '#params-estimate' entry below", | |
| ), | |
| ], | |
| steps=[ | |
| "For each known quantization scheme, predict total bytes = bpp × params:", | |
| cands_table, | |
| f"Winner: {best.scheme} at {best.relative_error * 100:.1f}% error", | |
| ], | |
| result=f"{r.best.value} [{r.best.label.value}]", | |
| source="Nearest-anchor match against known bytes-per-param values", | |
| methodology_anchor="#quantization-scheme", | |
| ) | |
| ) | |
| # ====================================================================== | |
| # KV cache | |
| # ====================================================================== | |
| def _kv_cache_contexts(report: EvaluationReport, entries: list[ExplainEntry]) -> None: | |
| profile = report.profile | |
| attn = profile.attention | |
| if attn is None: | |
| return | |
| is_mla = attn.variant == "MLA" | |
| is_csa_hca = attn.variant == "CSA_HCA" | |
| for ctx, av in report.kv_cache_by_context.items(): | |
| if av.value == 0: | |
| continue | |
| # Rebuild the computation for transparency | |
| if is_mla and attn.kv_lora_rank: | |
| per_tok_per_layer = attn.kv_lora_rank * 2 # kv_lora_rank × dtype(2) | |
| formula = "per_tok_per_layer = kv_lora_rank × dtype_bytes (MLA: compressed latent KV)" | |
| inputs = [ | |
| ExplainInput("kv_lora_rank", str(attn.kv_lora_rank), "[verified]"), | |
| ExplainInput("dtype_bytes", "2", "[verified]", note="BF16/FP16"), | |
| ExplainInput("seq_len", f"{ctx:,}", "[verified]"), | |
| ExplainInput("num_layers", str(profile.num_hidden_layers), "[verified]"), | |
| ] | |
| else: | |
| per_tok_per_layer = 2 * attn.num_kv_heads * attn.head_dim * 2 | |
| formula = "per_tok_per_layer = 2 × num_kv_heads × head_dim × dtype_bytes (standard attention)" | |
| inputs = [ | |
| ExplainInput("num_kv_heads", str(attn.num_kv_heads), "[verified]"), | |
| ExplainInput("head_dim", str(attn.head_dim), "[verified]"), | |
| ExplainInput("dtype_bytes", "2", "[verified]", note="BF16/FP16"), | |
| ExplainInput("seq_len", f"{ctx:,}", "[verified]"), | |
| ExplainInput("num_layers", str(profile.num_hidden_layers), "[verified]"), | |
| ] | |
| baseline = per_tok_per_layer * ctx * profile.num_hidden_layers | |
| steps = [ | |
| f"per_tok_per_layer = {per_tok_per_layer:,} bytes", | |
| f"baseline = per_tok_per_layer × seq_len × num_layers = {baseline:,} bytes", | |
| ] | |
| if is_csa_hca and attn.compress_ratios: | |
| ratios = attn.compress_ratios | |
| avg = sum(1.0 if r == 0 else 1.0 / r for r in ratios) / len(ratios) | |
| inputs.append( | |
| ExplainInput( | |
| "compress_ratios", | |
| f"len={len(ratios)} (avg keep-fraction={avg:.4f})", | |
| "[verified]", | |
| ) | |
| ) | |
| formula += ( | |
| "\napply_csa_hca: baseline × avg(1/r_i for r_i in compress_ratios, 0 = keep-all=1)" | |
| ) | |
| steps.extend( | |
| [ | |
| f"avg_keep_fraction = {avg:.4f}", | |
| f"result = baseline × avg_keep_fraction = {av.value:,} bytes", | |
| ] | |
| ) | |
| else: | |
| steps.append(f"result = baseline = {av.value:,} bytes") | |
| entries.append( | |
| ExplainEntry( | |
| heading=f"KV cache @ {_fmt_ctx(ctx)} context", | |
| formula=formula, | |
| inputs=inputs, | |
| steps=steps, | |
| result=f"{av.value:,} bytes = {av.value / 1e9:.2f} GB [{av.label.value}]", | |
| source=( | |
| "DeepSeek-V2 paper (MLA); DeepSeek-V4 tech report (CSA+HCA); " | |
| "standard attention formula per Attention Is All You Need (Vaswani 2017)" | |
| ), | |
| methodology_anchor="#kv-cache-per-request", | |
| ) | |
| ) | |
| # ====================================================================== | |
| # Fleet tiers | |
| # ====================================================================== | |
| def _fleet_tiers(report: EvaluationReport, entries: list[ExplainEntry]) -> None: | |
| if report.fleet is None or report.gpu_spec is None: | |
| return | |
| # One explain block per tier (min / dev / prod) | |
| for opt in report.fleet.options: | |
| tier_label = opt.tier | |
| headroom = opt.usable_bytes_per_gpu - opt.weight_bytes_per_gpu | |
| steps = [ | |
| f"per-GPU HBM usable (@ 90% util) = {opt.usable_bytes_per_gpu:,} bytes", | |
| f"weight per GPU = total_weight / TP_size = " | |
| f"{report.weight.total_bytes.value:,} / {opt.gpu_count} = " | |
| f"{opt.weight_bytes_per_gpu:,} bytes", | |
| f"headroom per GPU = usable - weight = {headroom:,} bytes ({headroom / 1e9:.2f} GB)", | |
| ] | |
| fit_criterion = {"min": 1, "dev": 8, "prod": 16}.get(tier_label, 1) | |
| steps.append( | |
| f"tier criterion: headroom ≥ weight_per_gpu + {fit_criterion} × kv_per_request_128K" | |
| ) | |
| steps.append( | |
| f"smallest TP count in {list(report.fleet.valid_tp_sizes)} that " | |
| f"satisfies the criterion: {opt.gpu_count}" | |
| ) | |
| if not opt.fits: | |
| steps.append( | |
| f"NOTE: does not fit the criterion — the chosen {opt.gpu_count} " | |
| "is the best available." | |
| ) | |
| entries.append( | |
| ExplainEntry( | |
| heading=f"Fleet tier: {tier_label} ({opt.gpu_count} GPUs)", | |
| formula=( | |
| "smallest TP in valid_set where " | |
| "weight_per_gpu + concurrent × kv_per_request ≤ usable_per_gpu" | |
| ), | |
| inputs=[ | |
| ExplainInput( | |
| "total_weight_bytes", | |
| f"{report.weight.total_bytes.value:,}", | |
| "[verified]", | |
| ), | |
| ExplainInput( | |
| "valid_TP_sizes", | |
| str(list(report.fleet.valid_tp_sizes)), | |
| "[estimated]", | |
| note="divisors of num_attention_heads capped at 8 (single node)", | |
| ), | |
| ExplainInput( | |
| "GPU memory_gb", | |
| f"{report.gpu_spec.memory_gb} GB", | |
| "[verified]", | |
| ), | |
| ], | |
| steps=steps, | |
| result=f"{opt.gpu_count} GPUs, fit={opt.fits}", | |
| source="vLLM --gpu-memory-utilization 0.9 convention; TP divisibility required by vLLM/SGLang", | |
| methodology_anchor="#tp-aware-kv-sharding", | |
| ) | |
| ) | |
| # ====================================================================== | |
| # Prefill | |
| # ====================================================================== | |
| def _prefill(report: EvaluationReport, entries: list[ExplainEntry]) -> None: | |
| if ( | |
| report.prefill is None | |
| or report.gpu_spec is None | |
| or report.fleet is None | |
| or report.perf_input_tokens is None | |
| ): | |
| return | |
| p = report.prefill | |
| # Figure out chosen GPU count from the fleet | |
| chosen = next( | |
| (o.gpu_count for o in report.fleet.options if o.tier == report.fleet.best_tier), | |
| report.fleet.options[0].gpu_count, | |
| ) | |
| entries.append( | |
| ExplainEntry( | |
| heading="Prefill latency (single request)", | |
| formula=( | |
| "FLOPs = 2 × params × input_tokens\n" | |
| "effective_TFLOPS = peak_fp16_TFLOPS × num_gpus × utilization\n" | |
| "latency_ms = (FLOPs / (effective_TFLOPS × 1e12)) × 1000" | |
| ), | |
| inputs=[ | |
| ExplainInput( | |
| "params", | |
| f"{report.total_params_estimate.value:,}", | |
| "[estimated]", | |
| note="from architecture formula (see weight.py)", | |
| ), | |
| ExplainInput("input_tokens", f"{report.perf_input_tokens:,}", "[user-set]"), | |
| ExplainInput( | |
| "peak_fp16_TFLOPS", | |
| f"{report.gpu_spec.fp16_tflops}", | |
| "[verified]", | |
| note=f"from GPU database, {report.gpu_spec.id} spec", | |
| ), | |
| ExplainInput("num_gpus", f"{chosen}", "[estimated]"), | |
| ExplainInput( | |
| "utilization", | |
| f"{p.utilization:.2f}", | |
| "[user-set]", | |
| note="empirical MFU, default 0.40 — override with --prefill-util", | |
| ), | |
| ], | |
| steps=[ | |
| f"FLOPs = 2 × {report.total_params_estimate.value:,} × " | |
| f"{report.perf_input_tokens:,} = {p.total_flops.value:.3e}", | |
| f"effective_TFLOPS = {report.gpu_spec.fp16_tflops} × {chosen} × " | |
| f"{p.utilization:.2f} = {p.peak_effective_tflops.value:.1f}", | |
| f"latency = {p.total_flops.value:.3e} / " | |
| f"({p.peak_effective_tflops.value:.1f} × 1e12) × 1000 = " | |
| f"{p.latency_ms.value:.1f} ms", | |
| ], | |
| result=f"{p.latency_ms.value:.1f} ms [{p.latency_ms.label.value}]", | |
| source="Kaplan et al. 2020 'Scaling Laws for Neural Language Models' (arxiv.org/abs/2001.08361)", | |
| methodology_anchor="#prefill-latency", | |
| ) | |
| ) | |
| # ====================================================================== | |
| # Decode | |
| # ====================================================================== | |
| def _decode(report: EvaluationReport, entries: list[ExplainEntry]) -> None: | |
| if report.decode is None or report.gpu_spec is None or report.fleet is None: | |
| return | |
| d = report.decode | |
| bw = report.gpu_spec.memory_bandwidth_gbps or 0 | |
| chosen = next( | |
| (o.gpu_count for o in report.fleet.options if o.tier == report.fleet.best_tier), | |
| report.fleet.options[0].gpu_count, | |
| ) | |
| weight_per_gpu = d.active_weight_bytes_per_gpu.value | |
| effective_bw_gbs = bw * d.bw_utilization | |
| steps = [ | |
| f"weight_per_gpu = {report.weight.total_bytes.value:,} / {chosen} = " | |
| f"{weight_per_gpu:,} bytes ({weight_per_gpu / 1e9:.2f} GB)", | |
| f"effective_bw = {bw} × {d.bw_utilization:.2f} = {effective_bw_gbs:.0f} GB/s", | |
| f"per_gpu_tok_per_sec = effective_bw / weight_per_gpu = " | |
| f"{effective_bw_gbs * 1e9 / weight_per_gpu:.1f} tok/s", | |
| f"cluster_tok_per_sec = per_gpu × {chosen} × " | |
| f"{d.cluster_comm_efficiency:.2f} = {d.cluster_tokens_per_sec.value:.1f} tok/s", | |
| ] | |
| entries.append( | |
| ExplainEntry( | |
| heading="Decode throughput (cluster)", | |
| formula=( | |
| "per_gpu_tok_per_sec = memory_bandwidth × bw_util / weight_bytes_per_gpu\n" | |
| "cluster_tok_per_sec = per_gpu × num_gpus × cluster_comm_efficiency" | |
| ), | |
| inputs=[ | |
| ExplainInput( | |
| "GPU memory_bandwidth_gbps", | |
| f"{bw}", | |
| "[verified]", | |
| note=f"from GPU database, {report.gpu_spec.id}", | |
| ), | |
| ExplainInput( | |
| "bw_util", | |
| f"{d.bw_utilization:.2f}", | |
| "[user-set]", | |
| note="empirical, default 0.50 — override with --decode-bw-util", | |
| ), | |
| ExplainInput("weight_bytes_per_gpu", f"{weight_per_gpu:,}", "[estimated]"), | |
| ExplainInput("num_gpus", f"{chosen}", "[estimated]"), | |
| ExplainInput( | |
| "cluster_comm_efficiency", | |
| f"{d.cluster_comm_efficiency:.2f}", | |
| "[user-set]", | |
| note="NCCL AllReduce efficiency on NVLink, default 0.90", | |
| ), | |
| ], | |
| steps=steps, | |
| result=f"{d.cluster_tokens_per_sec.value:.1f} tok/s [estimated]", | |
| source="vLLM paper (Kwon et al. SOSP 2023, arxiv.org/abs/2309.06180)", | |
| methodology_anchor="#decode-tokens-per-second", | |
| ) | |
| ) | |
| # ====================================================================== | |
| # Concurrency bounds | |
| # ====================================================================== | |
| def _concurrency(report: EvaluationReport, entries: list[ExplainEntry]) -> None: | |
| if report.concurrency is None: | |
| return | |
| c = report.concurrency | |
| entries.append( | |
| ExplainEntry( | |
| heading="K bound (memory capacity)", | |
| formula="K = floor(per_GPU_headroom_bytes / per_GPU_kv_bytes_per_request)", | |
| inputs=[ | |
| ExplainInput( | |
| "per_GPU_headroom_bytes", | |
| f"{c.k_source_headroom_bytes:,}", | |
| "[estimated]", | |
| ), | |
| ExplainInput( | |
| "per_GPU_kv_bytes_per_request", | |
| f"{c.k_source_kv_per_req_bytes:,}", | |
| "[estimated]", | |
| note="post-TP-sharding via min(tp, num_kv_heads)", | |
| ), | |
| ], | |
| steps=[ | |
| f"K = floor({c.k_source_headroom_bytes:,} / " | |
| f"{c.k_source_kv_per_req_bytes:,}) = {c.k_bound.value}", | |
| ], | |
| result=f"K = {c.k_bound.value} [{c.k_bound.label.value}]", | |
| source="TP sharding rule from vLLM source code (verified)", | |
| methodology_anchor="#k-bound-memory-capacity", | |
| ) | |
| ) | |
| l_tps = report.decode.cluster_tokens_per_sec.value if report.decode else 0 | |
| entries.append( | |
| ExplainEntry( | |
| heading="L bound (compute/bandwidth at SLA)", | |
| formula=( | |
| "L = floor(cluster_tok_per_sec / target_per_user_tok_per_sec / degradation_factor)" | |
| ), | |
| inputs=[ | |
| ExplainInput("cluster_tok_per_sec", f"{l_tps:.1f}", "[estimated]"), | |
| ExplainInput( | |
| "target_per_user_tok_per_sec", | |
| f"{c.target_tokens_per_sec:.1f}", | |
| "[user-set]", | |
| note="SLA, override with --target-tokens-per-sec", | |
| ), | |
| ExplainInput( | |
| "degradation_factor", | |
| f"{c.degradation_factor:.2f}", | |
| "[user-set]", | |
| note="default 1.0 = no degradation; override with --concurrency-degradation", | |
| ), | |
| ], | |
| steps=[ | |
| f"L = floor({l_tps:.1f} / {c.target_tokens_per_sec:.1f} / " | |
| f"{c.degradation_factor:.2f}) = {c.l_bound.value}", | |
| ], | |
| result=f"L = {c.l_bound.value} [{c.l_bound.label.value}]", | |
| source="Standard SLA-based capacity planning", | |
| methodology_anchor="#l-bound-compute-bandwidth-at-sla", | |
| ) | |
| ) | |
| entries.append( | |
| ExplainEntry( | |
| heading="Max concurrent + bottleneck verdict", | |
| formula="max_concurrent = min(K, L); bottleneck = 'memory_capacity' if K ≤ L else 'memory_bandwidth / compute'", | |
| inputs=[ | |
| ExplainInput("K", str(c.k_bound.value), f"[{c.k_bound.label.value}]"), | |
| ExplainInput("L", str(c.l_bound.value), f"[{c.l_bound.label.value}]"), | |
| ], | |
| steps=[ | |
| f"max_concurrent = min(K={c.k_bound.value}, L={c.l_bound.value}) = " | |
| f"{c.max_concurrent.value}", | |
| f"bottleneck = {c.bottleneck}", | |
| ], | |
| result=(f"{c.max_concurrent.value} concurrent, bottleneck = {c.bottleneck}"), | |
| source=c.bottleneck_reason_en, | |
| methodology_anchor="#concurrency-bounds-k-l", | |
| ) | |
| ) | |
| # Sanity check to silence "unused math import" if no steps triggered math. | |
| _ = math.floor(0) | |
| # ====================================================================== | |
| # Helpers | |
| # ====================================================================== | |
| def _fmt_ctx(ctx: int) -> str: | |
| if ctx >= 1_000_000: | |
| return f"{ctx // 1_000_000}M" | |
| if ctx >= 1024: | |
| return f"{ctx // 1024}K" | |
| return str(ctx) | |