Kimi-K2.5-MTP / convert_mtp_fp4_to_int4.py
k-l-lambda's picture
Upload convert_mtp_fp4_to_int4.py with huggingface_hub
dc2c00b verified
"""
Convert MTP expert weights from NVFP4 packed format to INT4 compressed-tensors.
FP4 E2M1 format: 2 values packed per U8 byte
weight: [out, in/2] U8 (2 FP4 per byte, block_size_fp4=16 fp4 values = 8 bytes)
weight_scale: [out, in/2/8] F8E4M3 (one scale per 8 bytes = per 16 fp4 values)
weight_scale_2: scalar F32 (global scale)
input_scale: scalar F32 (activation scale, ignored for weight loading)
"""
import torch
import numpy as np
from safetensors import safe_open
from safetensors.torch import save_file
from collections import OrderedDict
MTP_PATH = "/data/models/Kimi-K2.5-MTP/mtp_fp8_orig.safetensors"
OUTPUT_PATH = "/data/models/Kimi-K2.5-MTP/mtp.safetensors"
GROUP_SIZE = 32 # INT4 group size for compressed-tensors
PACK_FACTOR = 8 # 8 INT4 values per INT32 (GPTQ format, matches base model) # 4 INT4 values per INT32 (GPTQ format)
# NV FP4 E2M1 decode table (4-bit index → float value)
# E2M1: 1 sign bit, 2 exponent bits, 1 mantissa bit
FP4_TABLE = torch.tensor([
0.0, 0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 6.0,
-0.0, -0.5, -1.0, -1.5, -2.0, -3.0, -4.0, -6.0
], dtype=torch.float32)
def dequant_fp4_block(weight_u8, weight_scale_fp8e4m3, weight_scale_2_f32):
"""Dequantize FP4-packed weight to BF16.
weight_u8: [out, in/2] — 2 FP4 values per U8 byte
weight_scale: [out, in/2/8] F8E4M3 — scale per 8 bytes (16 fp4 values)
weight_scale_2: scalar F32 — global scale
Returns: [out, in] BF16
"""
out_f, in_packed = weight_u8.shape
in_fp4 = in_packed * 2 # actual number of FP4 values per row
# Unpack FP4: low nibble first, then high nibble
w_u8 = weight_u8.to(torch.int32)
low_nibble = w_u8 & 0x0F # [out, in/2]
high_nibble = (w_u8 >> 4) & 0x0F # [out, in/2]
# Interleave: low nibble at even positions, high nibble at odd
unpacked = torch.stack([low_nibble, high_nibble], dim=-1) # [out, in/2, 2]
unpacked = unpacked.reshape(out_f, in_fp4) # [out, in]
# Decode FP4 values
decoded = FP4_TABLE[unpacked.cpu()].to(torch.float32) # [out, in]
# Apply per-block scale: each block = 16 fp4 values = 8 bytes
# weight_scale shape: [out, in/16] (in F8E4M3)
scale = weight_scale_fp8e4m3.to(torch.float32) # [out, in/16]
# Repeat scale for each 16-element block
scale_expanded = scale.repeat_interleave(16, dim=-1) # [out, in]
# Apply global scale
global_scale = weight_scale_2_f32.item() if weight_scale_2_f32.numel() == 1 else 1.0
result = decoded * scale_expanded * global_scale
return result.to(torch.bfloat16)
def quantize_int4_gptq(weight_bf16, group_size=32):
"""Quantize BF16 to INT4 GPTQ format (packed 4 values per INT32)."""
out_f, in_f = weight_bf16.shape
w = weight_bf16.to(torch.float32)
pad = (group_size - in_f % group_size) % group_size
if pad > 0:
w = torch.nn.functional.pad(w, (0, pad))
in_padded = w.shape[1]
w_grouped = w.reshape(out_f, -1, group_size)
scales = w_grouped.abs().amax(dim=-1) / 7.0
scales = scales.clamp(min=1e-10)
w_int = torch.round(w_grouped / scales.unsqueeze(-1)).clamp(-8, 7).to(torch.int8)
w_int = w_int.reshape(out_f, in_padded)
# Pack 4 INT4 values per INT32
w_unsigned = (w_int + 8).to(torch.int32)
w_r = w_unsigned.reshape(out_f, -1, PACK_FACTOR)
packed = torch.zeros(out_f, w_r.shape[1], dtype=torch.int32)
for i in range(PACK_FACTOR):
packed |= (w_r[:, :, i] & 0xF) << (i * 4)
shape = torch.tensor([out_f, in_f], dtype=torch.int32)
return packed, scales.to(torch.bfloat16), shape
print("Loading original FP4-packed MTP weights...")
new_tensors = OrderedDict()
converted_expert = 0
converted_shared = 0
passed = 0
with safe_open(MTP_PATH, framework="pt", device="cpu") as f:
all_keys = sorted(f.keys())
# Identify FP4-packed projections (have weight + weight_scale with U8 dtype)
fp4_bases = set()
for k in all_keys:
if k.endswith(".weight") and not k.endswith("_scale") and not k.endswith("_scale_2"):
t = f.get_tensor(k)
if t.dtype == torch.uint8:
base = k[:-7]
if f"{base}.weight_scale" in all_keys:
fp4_bases.add(base)
print(f"FP4-packed projections: {len(fp4_bases)}")
processed = set()
for k in all_keys:
if k in processed:
continue
base = None
for fb in fp4_bases:
if k.startswith(fb + "."):
base = fb
break
if base is not None:
if k == f"{base}.weight":
w_u8 = f.get_tensor(k)
w_scale = f.get_tensor(f"{base}.weight_scale")
w_scale2 = f.get_tensor(f"{base}.weight_scale_2")
w_bf16 = dequant_fp4_block(w_u8, w_scale, w_scale2)
if ".mlp.experts." in base:
packed, scales, shape = quantize_int4_gptq(w_bf16, GROUP_SIZE)
new_tensors[f"{base}.weight_packed"] = packed
new_tensors[f"{base}.weight_scale"] = scales
new_tensors[f"{base}.weight_shape"] = shape
converted_expert += 1
if converted_expert == 1:
print(f" Sample: {base}.weight_packed: {list(packed.shape)}, scale: {list(scales.shape)}")
else:
new_tensors[f"{base}.weight"] = w_bf16
converted_shared += 1
processed.update([k, f"{base}.weight_scale", f"{base}.weight_scale_2", f"{base}.input_scale"])
continue
new_tensors[k] = f.get_tensor(k)
passed += 1
print(f"Expert→INT4: {converted_expert}, Shared→BF16: {converted_shared}, Passthrough: {passed}")
print(f"Total: {len(new_tensors)}")
print("Saving...")
save_file(new_tensors, OUTPUT_PATH)
import os
print(f"Saved: {os.path.getsize(OUTPUT_PATH)/1024/1024:.1f} MB")