| |
| """ |
| Unary converter for Qwen3 models. |
| Converts safetensors to unary bitplane format. |
| (c) 2026 OpenTransformers Ltd / Scott Bisset |
| """ |
| import numpy as np |
| import os, sys, json, time |
|
|
| def load_safetensors_torch(model_dir): |
| """Load all safetensors shards using torch backend""" |
| import torch |
| from safetensors import safe_open |
|
|
| weights = {} |
| shard_files = sorted([f for f in os.listdir(model_dir) if f.endswith('.safetensors')]) |
| print(f"Loading {len(shard_files)} shard(s)...") |
|
|
| for sf in shard_files: |
| path = os.path.join(model_dir, sf) |
| print(f" {sf}...") |
| with safe_open(path, framework="pt") as f: |
| for key in f.keys(): |
| t = f.get_tensor(key) |
| weights[key] = t.float().numpy() |
| return weights |
|
|
| def quantize_unary_vectorized(w_fp32, n_planes): |
| """Quantize a weight matrix to unary bitplane format using vectorized numpy""" |
| out_dim, in_dim = w_fp32.shape |
| max_val = n_planes |
|
|
| |
| abs_max = np.abs(w_fp32).max(axis=1, keepdims=True) |
| abs_max = np.where(abs_max == 0, 1.0, abs_max) |
| scaled = w_fp32 / abs_max * max_val |
| rounded = np.clip(np.round(scaled), -max_val, max_val).astype(np.int32) |
|
|
| |
| scales = (abs_max.flatten() / max_val).astype(np.float32) |
|
|
| |
| signs = (rounded < 0) |
| magnitudes = np.abs(rounded) |
|
|
| |
| chunks = (in_dim + 63) // 64 |
| padded = chunks * 64 |
|
|
| |
| if padded > in_dim: |
| signs = np.pad(signs, ((0,0),(0,padded-in_dim)), constant_values=False) |
| magnitudes = np.pad(magnitudes, ((0,0),(0,padded-in_dim)), constant_values=0) |
|
|
| |
| sign_bits = np.packbits(signs.astype(np.uint8), axis=1, bitorder='little') |
| sign_u64 = sign_bits.view(np.uint64)[:, :chunks] |
|
|
| |
| plane_bits = np.zeros((n_planes, out_dim, chunks), dtype=np.uint64) |
| for p in range(n_planes): |
| mask = (magnitudes > p) |
| packed = np.packbits(mask.astype(np.uint8), axis=1, bitorder='little') |
| plane_bits[p] = packed.view(np.uint64)[:, :chunks] |
|
|
| return sign_u64, plane_bits, scales |
|
|
| def convert_model(model_dir, output_dir, n_planes=7): |
| """Convert a Qwen3 model to unary format""" |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| |
| config = json.load(open(os.path.join(model_dir, "config.json"))) |
| n_layers = config["num_hidden_layers"] |
| hidden = config["hidden_size"] |
| print(f"Model: {n_layers} layers, hidden={hidden}, n_planes={n_planes}") |
|
|
| |
| weights = load_safetensors_torch(model_dir) |
| print(f"Loaded {len(weights)} tensors") |
|
|
| |
| linear_keys = [k for k in weights if k.endswith(".weight") and weights[k].ndim == 2 |
| and ("proj" in k)] |
|
|
| manifest = {"unary": {}, "fp16": {}} |
|
|
| |
| total = len(linear_keys) |
| for idx, key in enumerate(sorted(linear_keys)): |
| w = weights[key] |
| t0 = time.time() |
| sign, planes, scales = quantize_unary_vectorized(w, n_planes) |
| dt = time.time() - t0 |
|
|
| |
| fname = key.replace(".", "_") |
| np.array(sign).tofile(os.path.join(output_dir, f"{fname}.sign")) |
| np.array(planes).tofile(os.path.join(output_dir, f"{fname}.planes")) |
| np.array(scales).tofile(os.path.join(output_dir, f"{fname}.scales")) |
|
|
| manifest["unary"][key] = list(w.shape) |
| sparsity = 1.0 - np.count_nonzero(np.abs(np.round(w / np.abs(w).max(axis=1, keepdims=True) * n_planes)).astype(int)) / w.size |
| orig_mb = w.nbytes / 1e6 |
| comp_mb = (sign.nbytes + planes.nbytes + scales.nbytes) / 1e6 |
| print(f" [{idx+1}/{total}] {key}: {list(w.shape)} -> {comp_mb:.1f}MB ({orig_mb/comp_mb:.1f}x) [{dt:.1f}s]") |
|
|
| |
| fp16_keys = [k for k in weights if k not in linear_keys] |
| for key in sorted(fp16_keys): |
| w = weights[key] |
| fname = key.replace(".", "_") |
| w_fp16 = w.astype(np.float16) |
| w_fp16.view(np.uint16).tofile(os.path.join(output_dir, f"{fname}.fp16")) |
| manifest["fp16"][key] = list(w.shape) |
| print(f" [FP16] {key}: {list(w.shape)} ({w_fp16.nbytes/1e6:.1f}MB)") |
|
|
| |
| manifest["n_planes"] = n_planes |
| manifest["n_layers"] = n_layers |
| manifest["config"] = config |
| with open(os.path.join(output_dir, "manifest.json"), "w") as f: |
| json.dump(manifest, f, indent=2) |
|
|
| |
| import shutil |
| shutil.copy(os.path.join(model_dir, "config.json"), os.path.join(output_dir, "config.json")) |
|
|
| |
| total_unary = sum(os.path.getsize(os.path.join(output_dir, f)) |
| for f in os.listdir(output_dir) |
| if f.endswith((".sign", ".planes", ".scales"))) |
| total_fp16 = sum(os.path.getsize(os.path.join(output_dir, f)) |
| for f in os.listdir(output_dir) |
| if f.endswith(".fp16")) |
| orig_total = sum(w.nbytes for w in weights.values()) |
|
|
| print(f"\n=== CONVERSION COMPLETE ===") |
| print(f"Original FP32: {orig_total/1e9:.2f} GB") |
| print(f"Unary linear: {total_unary/1e9:.2f} GB") |
| print(f"FP16 other: {total_fp16/1e9:.2f} GB") |
| print(f"Total: {(total_unary+total_fp16)/1e9:.2f} GB") |
| print(f"Compression: {orig_total/(total_unary+total_fp16):.1f}x") |
|
|
| if __name__ == "__main__": |
| model_dir = sys.argv[1] if len(sys.argv) > 1 else "qwen3-4b-thinking-hf" |
| output_dir = sys.argv[2] if len(sys.argv) > 2 else "qwen3-4b-thinking-unary" |
| n_planes = int(sys.argv[3]) if len(sys.argv) > 3 else 7 |
| convert_model(model_dir, output_dir, n_planes) |
|
|