| |
| |
| """ |
| Chimera GGUF Import Optimized |
| βββββββββββββββββββββββββββββ |
| |
| Convert GGUF tensors into a Chimera-compatible checkpoint. |
| |
| AmΓ©liorations vs version originale : |
| - Ne garde pas tous les tensors GGUF FP32 en mΓ©moire. |
| - Corrige le bug embeddings/lm_head traitΓ©s comme BitLinear. |
| - Quantization ternary offline sans autograd. |
| - Clipping outlier par ligne pour les matrices. |
| - Auto-transpose si shape inversΓ©e. |
| - Modes de stockage : |
| fp32 : compatible Chimera classique, sauvegarde weight latent. |
| packed : sauvegarde packed_weight + alpha uniquement pour couches linΓ©aires. |
| both : sauvegarde weight + packed_weight + alpha. |
| - Init des poids manquants pour checkpoint complet. |
| - Resize configurable : strict, crop_pad, interpolate. |
| - Mapping GGUF plus robuste pour LLaMA/Qwen/Mistral-like. |
| |
| Usage : |
| python gguf_import_optimized.py \ |
| --gguf model.gguf \ |
| --config config.json \ |
| --scale tiny \ |
| --output imported_chimera.pt \ |
| --storage fp32 |
| |
| Pour checkpoint compact expΓ©rimental : |
| python gguf_import_optimized.py \ |
| --gguf model.gguf \ |
| --config config.json \ |
| --output imported_chimera_packed.pt \ |
| --storage packed |
| |
| Attention : |
| - storage=packed nΓ©cessite que ton loader Chimera sache lire |
| *.packed_weight et *.alpha. |
| - Importer un gros modèle vers tiny/small via resize détruit beaucoup |
| d'information. C'est utile pour bootstrap, pas Γ©quivalent Γ distillation. |
| """ |
|
|
| import os |
| import re |
| import gc |
| import json |
| import math |
| import argparse |
| from copy import deepcopy |
| from pathlib import Path |
| from typing import Dict, Tuple, Optional, Iterable, Any |
|
|
| import numpy as np |
| import torch |
| import torch.nn.functional as F |
|
|
| from chimera.paths import DEFAULT_CONFIG_PATH |
|
|
|
|
| try: |
| from gguf import GGUFReader, dequantize |
| HAS_GGUF = True |
| except Exception: |
| GGUFReader = None |
| dequantize = None |
| HAS_GGUF = False |
|
|
|
|
| |
| |
| |
|
|
| SCALE_OVERRIDES = { |
| "tiny": { |
| "hidden_size": 256, |
| "intermediate_size": 512, |
| "num_hidden_layers": 28, |
| "num_heads": 4, |
| "head_dim": 48, |
| }, |
| "small": { |
| "hidden_size": 512, |
| "intermediate_size": 1024, |
| "num_hidden_layers": 28, |
| "num_heads": 8, |
| "head_dim": 48, |
| }, |
| "medium": { |
| "hidden_size": 1024, |
| "intermediate_size": 2048, |
| "num_hidden_layers": 28, |
| "num_heads": 8, |
| "head_dim": 96, |
| }, |
| |
| "full": {}, |
| } |
|
|
|
|
| |
| |
| |
|
|
| DIRECT_NAME_MAP = { |
| "token_embd": "embed.weight", |
| "token_embd.weight": "embed.weight", |
|
|
| "output": "lm_head.weight", |
| "output.weight": "lm_head.weight", |
|
|
| "output_norm": "norm.weight", |
| "output_norm.weight": "norm.weight", |
|
|
| |
| "norm": "norm.weight", |
| "norm.weight": "norm.weight", |
| } |
|
|
|
|
| BLOCK_SUFFIX_MAP = { |
| |
| "attn_norm": "attn_norm.weight", |
| "attn_norm.weight": "attn_norm.weight", |
|
|
| |
| "ffn_norm": "mlp_norm.weight", |
| "ffn_norm.weight": "mlp_norm.weight", |
|
|
| |
| "attn_q": "attn.q_proj.weight", |
| "attn_q.weight": "attn.q_proj.weight", |
| "attn_k": "attn.k_proj.weight", |
| "attn_k.weight": "attn.k_proj.weight", |
| "attn_v": "attn.v_proj.weight", |
| "attn_v.weight": "attn.v_proj.weight", |
| "attn_output": "attn.o_proj.weight", |
| "attn_output.weight": "attn.o_proj.weight", |
|
|
| |
| "ffn_gate": "mlp.gate_proj.weight", |
| "ffn_gate.weight": "mlp.gate_proj.weight", |
| "ffn_up": "mlp.up_proj.weight", |
| "ffn_up.weight": "mlp.up_proj.weight", |
| "ffn_down": "mlp.down_proj.weight", |
| "ffn_down.weight": "mlp.down_proj.weight", |
| } |
|
|
|
|
| def map_gguf_name(name: str, n_layers: int) -> Optional[str]: |
| """ |
| Convertit un nom GGUF vers une clΓ© Chimera. |
| Retourne None si non mappable. |
| """ |
| if name in DIRECT_NAME_MAP: |
| return DIRECT_NAME_MAP[name] |
|
|
| m = re.match(r"^blk\.(\d+)\.(.+)$", name) |
| if not m: |
| return None |
|
|
| bid = int(m.group(1)) |
| suffix = m.group(2) |
|
|
| if bid >= n_layers: |
| return None |
|
|
| mapped_suffix = BLOCK_SUFFIX_MAP.get(suffix) |
| if mapped_suffix is None: |
| return None |
|
|
| return f"layers.{bid}.{mapped_suffix}" |
|
|
|
|
| |
| |
| |
|
|
| @torch.no_grad() |
| def ternary_quantize_absmean( |
| w: torch.Tensor, |
| threshold: float = 0.5, |
| eps: float = 1e-5, |
| ) -> Tuple[torch.Tensor, torch.Tensor]: |
| """ |
| Convertit w FP32 [M,K] -> w_q int8 {-1,0,1} + alpha [M]. |
| |
| alpha = mean(abs(w), dim=1) |
| w_norm = w / alpha |
| q = -1 si w_norm <= -threshold |
| 0 si entre |
| +1 si w_norm >= threshold |
| """ |
| if w.ndim != 2: |
| raise ValueError("ternary_quantize_absmean attend un tensor 2D") |
|
|
| w = w.to(torch.float32) |
| alpha = w.abs().mean(dim=1).clamp_min(eps) |
|
|
| wn = w / alpha[:, None] |
| q = torch.zeros_like(wn, dtype=torch.int8) |
| q[wn >= threshold] = 1 |
| q[wn <= -threshold] = -1 |
|
|
| return q, alpha.to(torch.float32) |
|
|
|
|
| @torch.no_grad() |
| def pack_ternary_2bit(w_q: torch.Tensor) -> torch.Tensor: |
| """ |
| Pack int8 {-1,0,+1} -> uint8, 4 poids par byte. |
| |
| Encoding : |
| 0 -> 00 |
| +1 -> 01 |
| -1 -> 10 |
| |
| Ordre : |
| weight0 bits 7..6 |
| weight1 bits 5..4 |
| weight2 bits 3..2 |
| weight3 bits 1..0 |
| """ |
| if w_q.ndim != 2: |
| raise ValueError("pack_ternary_2bit attend un tensor 2D") |
|
|
| M, K = w_q.shape |
| K4 = (K + 3) // 4 |
| pad = K4 * 4 - K |
|
|
| codes = torch.zeros_like(w_q, dtype=torch.uint8) |
| codes[w_q == 1] = 1 |
| codes[w_q == -1] = 2 |
|
|
| if pad: |
| codes = F.pad(codes, (0, pad), value=0) |
|
|
| codes = codes.view(M, K4, 4) |
| packed = ( |
| (codes[..., 0] << 6) |
| | (codes[..., 1] << 4) |
| | (codes[..., 2] << 2) |
| | codes[..., 3] |
| ) |
| return packed.contiguous() |
|
|
|
|
| |
| |
| |
|
|
| @torch.no_grad() |
| def reduce_noise( |
| w: torch.Tensor, |
| method: str = "row_outlier_clip", |
| sigma: float = 3.0, |
| eps: float = 1e-5, |
| ) -> torch.Tensor: |
| """ |
| PrΓ©traitement avant ternarisation. |
| |
| none : rien. |
| global_clip : clip global mean Β± sigma*std. |
| row_outlier_clip : clip par ligne, meilleur pour matrices linΓ©aires. |
| median_center : recentrage robuste global median/MAD. |
| """ |
| if method == "none": |
| return w |
|
|
| w = w.to(torch.float32) |
|
|
| if method == "global_clip": |
| mu = w.mean() |
| std = w.std(unbiased=False).clamp_min(eps) |
| return w.clamp(mu - sigma * std, mu + sigma * std) |
|
|
| if method == "row_outlier_clip": |
| if w.ndim != 2: |
| return reduce_noise(w, method="global_clip", sigma=sigma, eps=eps) |
|
|
| mu = w.mean(dim=1, keepdim=True) |
| std = w.std(dim=1, keepdim=True, unbiased=False).clamp_min(eps) |
| return w.clamp(mu - sigma * std, mu + sigma * std) |
|
|
| if method == "median_center": |
| med = w.median() |
| mad = (w - med).abs().median().clamp_min(eps) |
| return (w - med) / mad |
|
|
| return w |
|
|
|
|
| |
| |
| |
|
|
| @torch.no_grad() |
| def resize_1d(w: torch.Tensor, target: int) -> torch.Tensor: |
| src = w.numel() |
| if src == target: |
| return w.contiguous() |
|
|
| out = torch.ones(target, dtype=w.dtype) |
| n = min(src, target) |
| out[:n] = w[:n] |
| return out.contiguous() |
|
|
|
|
| @torch.no_grad() |
| def resize_2d_crop_pad( |
| w: torch.Tensor, |
| target_shape: Tuple[int, int], |
| fill_std: float = 0.02, |
| ) -> torch.Tensor: |
| """ |
| Resize rapide par crop/pad. |
| Plus prΓ©visible qu'une interpolation sur poids Transformer. |
| """ |
| target_out, target_in = target_shape |
| src_out, src_in = w.shape |
|
|
| if (src_out, src_in) == (target_out, target_in): |
| return w.contiguous() |
|
|
| out = torch.empty((target_out, target_in), dtype=w.dtype) |
|
|
| |
| std = float(w.std(unbiased=False).item()) if w.numel() > 1 else fill_std |
| std = max(min(std, 0.2), 1e-4) |
| out.normal_(mean=0.0, std=std) |
|
|
| ro = min(src_out, target_out) |
| ci = min(src_in, target_in) |
| out[:ro, :ci] = w[:ro, :ci] |
|
|
| return out.contiguous() |
|
|
|
|
| @torch.no_grad() |
| def resize_2d_interpolate( |
| w: torch.Tensor, |
| target_shape: Tuple[int, int], |
| ) -> torch.Tensor: |
| target_out, target_in = target_shape |
| if tuple(w.shape) == tuple(target_shape): |
| return w.contiguous() |
|
|
| x = w[None, None, :, :] |
| y = F.interpolate( |
| x, |
| size=(target_out, target_in), |
| mode="bilinear", |
| align_corners=False, |
| ) |
| return y[0, 0].contiguous() |
|
|
|
|
| @torch.no_grad() |
| def resize_2d( |
| w: torch.Tensor, |
| target_shape: Tuple[int, int], |
| strategy: str = "crop_pad", |
| ) -> torch.Tensor: |
| if tuple(w.shape) == tuple(target_shape): |
| return w.contiguous() |
|
|
| if strategy == "strict": |
| raise ValueError(f"Shape mismatch: got {tuple(w.shape)}, expected {target_shape}") |
|
|
| if strategy == "crop_pad": |
| return resize_2d_crop_pad(w, target_shape) |
|
|
| if strategy == "interpolate": |
| return resize_2d_interpolate(w, target_shape) |
|
|
| raise ValueError(f"resize strategy inconnue: {strategy}") |
|
|
|
|
| |
| |
| |
|
|
| class OptimizedGGUFImporter: |
| def __init__( |
| self, |
| config: Dict[str, Any], |
| scale: str = "tiny", |
| storage: str = "fp32", |
| param_dtype: str = "fp32", |
| noise_method: str = "row_outlier_clip", |
| noise_sigma: float = 3.0, |
| ternary_threshold: float = 0.5, |
| resize_strategy: str = "crop_pad", |
| auto_transpose: bool = True, |
| init_missing: bool = True, |
| verbose: bool = True, |
| ): |
| self.config = deepcopy(config) |
| self.scale = scale |
| self.storage = storage |
| self.param_dtype = param_dtype |
| self.noise_method = noise_method |
| self.noise_sigma = noise_sigma |
| self.ternary_threshold = ternary_threshold |
| self.resize_strategy = resize_strategy |
| self.auto_transpose = auto_transpose |
| self.init_missing = init_missing |
| self.verbose = verbose |
|
|
| if scale not in SCALE_OVERRIDES: |
| raise ValueError(f"scale invalide: {scale}") |
|
|
| self.config.update(SCALE_OVERRIDES[scale]) |
|
|
| self.n_layers = int(self.config["num_hidden_layers"]) |
| self.hidden_size = int(self.config["hidden_size"]) |
| self.vocab_size = int(self.config["vocab_size"]) |
| self.num_heads = int(self.config.get("num_heads", 4)) |
| self.head_dim = int(self.config.get("head_dim", self.hidden_size // self.num_heads)) |
|
|
| inter = int(self.config["intermediate_size"]) |
| self.intermediate_size = 256 * ((inter + 255) // 256) |
| self.config["intermediate_size"] = self.intermediate_size |
|
|
| if storage not in {"fp32", "packed", "both"}: |
| raise ValueError("storage doit Γͺtre: fp32, packed ou both") |
|
|
| if param_dtype not in {"fp32", "fp16", "bf16"}: |
| raise ValueError("param_dtype doit Γͺtre: fp32, fp16 ou bf16") |
|
|
| if self.verbose: |
| self.log( |
| f"[CONFIG] scale={scale} h={self.hidden_size} " |
| f"layers={self.n_layers} heads={self.num_heads} " |
| f"head_dim={self.head_dim} inter={self.intermediate_size} " |
| f"vocab={self.vocab_size}" |
| ) |
| self.log( |
| f"[CONFIG] storage={storage} param_dtype={param_dtype} " |
| f"resize={resize_strategy} noise={noise_method}" |
| ) |
|
|
| def log(self, msg: str): |
| if self.verbose: |
| print(msg, flush=True) |
|
|
| def target_dtype(self): |
| if self.param_dtype == "fp16": |
| return torch.float16 |
| if self.param_dtype == "bf16": |
| return torch.bfloat16 |
| return torch.float32 |
|
|
| def infer_shape(self, key: str) -> Tuple[int, ...]: |
| h = self.hidden_size |
| attn_dim = self.num_heads * self.head_dim |
|
|
| if key == "embed.weight": |
| return (self.vocab_size, h) |
|
|
| if key == "lm_head.weight": |
| return (self.vocab_size, h) |
|
|
| if key == "norm.weight": |
| return (h,) |
|
|
| if key.endswith("attn_norm.weight") or key.endswith("mlp_norm.weight"): |
| return (h,) |
|
|
| if key.endswith("attn.q_proj.weight"): |
| return (attn_dim, h) |
| if key.endswith("attn.k_proj.weight"): |
| return (attn_dim, h) |
| if key.endswith("attn.v_proj.weight"): |
| return (attn_dim, h) |
| if key.endswith("attn.o_proj.weight"): |
| return (h, attn_dim) |
|
|
| if key.endswith("mlp.gate_proj.weight"): |
| return (self.intermediate_size, h) |
| if key.endswith("mlp.up_proj.weight"): |
| return (self.intermediate_size, h) |
| if key.endswith("mlp.down_proj.weight"): |
| return (h, self.intermediate_size) |
|
|
| raise KeyError(f"Impossible d'infΓ©rer la shape pour {key}") |
|
|
| def all_expected_keys(self) -> Iterable[str]: |
| yield "embed.weight" |
| yield "norm.weight" |
| yield "lm_head.weight" |
|
|
| for i in range(self.n_layers): |
| prefix = f"layers.{i}" |
| yield f"{prefix}.attn_norm.weight" |
| yield f"{prefix}.mlp_norm.weight" |
| yield f"{prefix}.attn.q_proj.weight" |
| yield f"{prefix}.attn.k_proj.weight" |
| yield f"{prefix}.attn.v_proj.weight" |
| yield f"{prefix}.attn.o_proj.weight" |
| yield f"{prefix}.mlp.gate_proj.weight" |
| yield f"{prefix}.mlp.up_proj.weight" |
| yield f"{prefix}.mlp.down_proj.weight" |
|
|
| def is_linear_key(self, key: str) -> bool: |
| return any( |
| key.endswith(s) |
| for s in ( |
| "attn.q_proj.weight", |
| "attn.k_proj.weight", |
| "attn.v_proj.weight", |
| "attn.o_proj.weight", |
| "mlp.gate_proj.weight", |
| "mlp.up_proj.weight", |
| "mlp.down_proj.weight", |
| ) |
| ) |
|
|
| def is_embedding_or_head(self, key: str) -> bool: |
| return key in {"embed.weight", "lm_head.weight"} |
|
|
| def maybe_transpose(self, w: torch.Tensor, expected: Tuple[int, ...], key: str) -> torch.Tensor: |
| if not self.auto_transpose: |
| return w |
|
|
| if w.ndim == 2 and len(expected) == 2: |
| if tuple(w.shape) != tuple(expected) and tuple(w.t().shape) == tuple(expected): |
| self.log(f" [TRANSPOSE] {key}: {tuple(w.shape)} -> {tuple(w.t().shape)}") |
| return w.t().contiguous() |
|
|
| return w |
|
|
| def convert_tensor( |
| self, |
| gguf_name: str, |
| key: str, |
| arr: np.ndarray, |
| ) -> Optional[Dict[str, torch.Tensor]]: |
| expected = self.infer_shape(key) |
|
|
| w = torch.from_numpy(np.asarray(arr)).to(torch.float32) |
| w = self.maybe_transpose(w, expected, key) |
|
|
| result: Dict[str, torch.Tensor] = {} |
|
|
| |
| if len(expected) == 1: |
| if w.ndim != 1: |
| self.log(f" [SKIP] {gguf_name}: expected 1D {expected}, got {tuple(w.shape)}") |
| return None |
|
|
| if tuple(w.shape) != tuple(expected): |
| self.log(f" [RESIZE-1D] {gguf_name}: {tuple(w.shape)} -> {expected}") |
| w = resize_1d(w, expected[0]) |
|
|
| result[key] = w.to(self.target_dtype()).contiguous() |
| return result |
|
|
| |
| if self.is_embedding_or_head(key): |
| if w.ndim != 2: |
| self.log(f" [SKIP] {gguf_name}: expected 2D embedding/head, got {tuple(w.shape)}") |
| return None |
|
|
| if tuple(w.shape) != tuple(expected): |
| self.log(f" [RESIZE-EMB] {gguf_name}: {tuple(w.shape)} -> {expected}") |
| w = resize_2d(w, expected, self.resize_strategy) |
|
|
| result[key] = w.to(self.target_dtype()).contiguous() |
| return result |
|
|
| |
| if self.is_linear_key(key): |
| if w.ndim != 2: |
| self.log(f" [SKIP] {gguf_name}: expected 2D linear, got {tuple(w.shape)}") |
| return None |
|
|
| if tuple(w.shape) != tuple(expected): |
| self.log(f" [RESIZE-2D] {gguf_name}: {tuple(w.shape)} -> {expected}") |
| w = resize_2d(w, expected, self.resize_strategy) |
|
|
| w = reduce_noise(w, method=self.noise_method, sigma=self.noise_sigma) |
|
|
| if self.storage in {"fp32", "both"}: |
| result[key] = w.to(self.target_dtype()).contiguous() |
|
|
| if self.storage in {"packed", "both"}: |
| q, alpha = ternary_quantize_absmean( |
| w, |
| threshold=self.ternary_threshold, |
| ) |
| packed = pack_ternary_2bit(q) |
| result[f"{key}.packed_weight"] = packed.cpu().contiguous() |
| result[f"{key}.alpha"] = alpha.cpu().contiguous() |
| result[f"{key}.shape"] = torch.tensor(list(expected), dtype=torch.int32) |
|
|
| return result |
|
|
| self.log(f" [SKIP] {gguf_name}: key non reconnue {key}") |
| return None |
|
|
| def init_missing_tensor(self, key: str) -> Dict[str, torch.Tensor]: |
| expected = self.infer_shape(key) |
| out: Dict[str, torch.Tensor] = {} |
|
|
| if len(expected) == 1: |
| |
| w = torch.ones(expected, dtype=self.target_dtype()) |
| out[key] = w |
| return out |
|
|
| if key in {"embed.weight", "lm_head.weight"}: |
| w = torch.empty(expected, dtype=torch.float32) |
| w.normal_(0.0, 0.02) |
| out[key] = w.to(self.target_dtype()) |
| return out |
|
|
| if self.is_linear_key(key): |
| w = torch.empty(expected, dtype=torch.float32) |
| fan_in = max(1, expected[1]) |
| std = math.sqrt(2.0 / fan_in) |
| w.normal_(0.0, std) |
|
|
| if self.storage in {"fp32", "both"}: |
| out[key] = w.to(self.target_dtype()).contiguous() |
|
|
| if self.storage in {"packed", "both"}: |
| q, alpha = ternary_quantize_absmean(w, threshold=self.ternary_threshold) |
| out[f"{key}.packed_weight"] = pack_ternary_2bit(q) |
| out[f"{key}.alpha"] = alpha |
| out[f"{key}.shape"] = torch.tensor(list(expected), dtype=torch.int32) |
|
|
| return out |
|
|
| return out |
|
|
| def dequantize_tensor(self, tensor) -> np.ndarray: |
| """ |
| Dequantize GGUF tensor vers numpy float32. |
| Compatible avec l'API gguf-py la plus courante. |
| """ |
| qtype = getattr(tensor, "tensor_type", None) |
| data = getattr(tensor, "data", None) |
|
|
| if data is None: |
| raise RuntimeError(f"Tensor GGUF sans data: {getattr(tensor, 'name', '?')}") |
|
|
| try: |
| arr = dequantize(data, qtype) |
| except Exception: |
| |
| arr = np.asarray(data) |
|
|
| arr = np.asarray(arr) |
|
|
| if arr.dtype != np.float32: |
| arr = arr.astype(np.float32, copy=False) |
|
|
| return np.ascontiguousarray(arr) |
|
|
| def read_arch(self, reader) -> str: |
| try: |
| field = reader.fields.get("general.architecture") |
| if field is None: |
| return "unknown" |
| |
| if hasattr(field, "parts") and field.parts: |
| return str(field.parts[-1]) |
| return str(field) |
| except Exception: |
| return "unknown" |
|
|
| def import_model(self, gguf_path: str, output_path: str) -> Dict[str, Any]: |
| if not HAS_GGUF: |
| raise ImportError("Package gguf manquant. Installe avec: pip install gguf") |
|
|
| gguf_path = str(gguf_path) |
| output_path = str(output_path) |
|
|
| self.log("=" * 70) |
| self.log("CHIMERA GGUF IMPORT OPTIMIZED") |
| self.log("=" * 70) |
|
|
| reader = GGUFReader(gguf_path) |
| arch = self.read_arch(reader) |
|
|
| self.log(f"[GGUF] file={gguf_path}") |
| self.log(f"[GGUF] arch={arch}") |
| self.log(f"[GGUF] tensors={len(reader.tensors)}") |
|
|
| state_dict: Dict[str, torch.Tensor] = {} |
|
|
| stats = { |
| "mapped": 0, |
| "unmapped": 0, |
| "skipped": 0, |
| "linear": 0, |
| "dense": 0, |
| "norm": 0, |
| "resized_or_transposed_possible": 0, |
| } |
|
|
| imported_keys = set() |
|
|
| for idx, tensor in enumerate(reader.tensors): |
| name = str(tensor.name) |
| key = map_gguf_name(name, self.n_layers) |
|
|
| if key is None: |
| stats["unmapped"] += 1 |
| if self.verbose: |
| self.log(f" [UNMAPPED] {name}") |
| continue |
|
|
| try: |
| arr = self.dequantize_tensor(tensor) |
| converted = self.convert_tensor(name, key, arr) |
|
|
| if not converted: |
| stats["skipped"] += 1 |
| continue |
|
|
| state_dict.update(converted) |
| imported_keys.add(key) |
| stats["mapped"] += 1 |
|
|
| if self.is_linear_key(key): |
| stats["linear"] += 1 |
| elif key in {"embed.weight", "lm_head.weight"}: |
| stats["dense"] += 1 |
| else: |
| stats["norm"] += 1 |
|
|
| if self.verbose: |
| qtype = getattr(tensor, "tensor_type", "?") |
| shape = tuple(arr.shape) |
| self.log(f" [OK] {idx+1:04d} {name} -> {key} shape={shape} qtype={qtype}") |
|
|
| except Exception as e: |
| stats["skipped"] += 1 |
| self.log(f" [ERROR] {name}: {type(e).__name__}: {e}") |
|
|
| finally: |
| |
| try: |
| del arr |
| except Exception: |
| pass |
| gc.collect() |
|
|
| |
| missing = [] |
| if self.init_missing: |
| for key in self.all_expected_keys(): |
| if key not in imported_keys: |
| missing.append(key) |
| init_tensors = self.init_missing_tensor(key) |
| state_dict.update(init_tensors) |
|
|
| if missing: |
| self.log(f"[MISSING] {len(missing)} tensors initialisΓ©s automatiquement") |
|
|
| ckpt = { |
| "model": state_dict, |
| "config": self.config, |
| "source": { |
| "gguf_path": gguf_path, |
| "gguf_arch": arch, |
| "scale": self.scale, |
| "storage": self.storage, |
| "param_dtype": self.param_dtype, |
| "noise_method": self.noise_method, |
| "noise_sigma": self.noise_sigma, |
| "ternary_threshold": self.ternary_threshold, |
| "resize_strategy": self.resize_strategy, |
| "auto_transpose": self.auto_transpose, |
| }, |
| "stats": stats, |
| "missing_keys": missing, |
| "import_version": "2.0-optimized", |
| } |
|
|
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) |
| torch.save(ckpt, output_path) |
|
|
| gguf_mb = os.path.getsize(gguf_path) / 1024 / 1024 |
| out_mb = os.path.getsize(output_path) / 1024 / 1024 |
|
|
| self.log("") |
| self.log("=" * 70) |
| self.log("[DONE]") |
| self.log(f"[STATS] {stats}") |
| self.log(f"[SIZE] GGUF={gguf_mb:.2f} MB -> checkpoint={out_mb:.2f} MB") |
| self.log(f"[SAVE] {output_path}") |
| self.log("=" * 70) |
|
|
| return ckpt |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Optimized GGUF -> Chimera checkpoint importer" |
| ) |
|
|
| parser.add_argument("--gguf", required=True, help="Path to input .gguf") |
| parser.add_argument("--config", default=str(DEFAULT_CONFIG_PATH), help="Chimera config.json") |
| parser.add_argument("--output", required=True, help="Output .pt checkpoint") |
|
|
| parser.add_argument( |
| "--scale", |
| default="tiny", |
| choices=["tiny", "small", "medium", "full"], |
| help="Chimera scale override", |
| ) |
|
|
| parser.add_argument( |
| "--storage", |
| default="fp32", |
| choices=["fp32", "packed", "both"], |
| help=( |
| "fp32=compatible Chimera classique, " |
| "packed=2-bit seulement, both=les deux" |
| ), |
| ) |
|
|
| parser.add_argument( |
| "--param-dtype", |
| default="fp32", |
| choices=["fp32", "fp16", "bf16"], |
| help="dtype pour les tensors denses/latents sauvegardΓ©s", |
| ) |
|
|
| parser.add_argument( |
| "--noise-method", |
| default="row_outlier_clip", |
| choices=["none", "global_clip", "row_outlier_clip", "median_center"], |
| help="Noise reduction before ternary conversion", |
| ) |
|
|
| parser.add_argument( |
| "--noise-sigma", |
| type=float, |
| default=3.0, |
| help="Sigma for clipping", |
| ) |
|
|
| parser.add_argument( |
| "--ternary-threshold", |
| type=float, |
| default=0.5, |
| help="Threshold on normalized weights for ternary quantization", |
| ) |
|
|
| parser.add_argument( |
| "--resize-strategy", |
| default="crop_pad", |
| choices=["strict", "crop_pad", "interpolate"], |
| help="Resize strategy when GGUF shape != Chimera shape", |
| ) |
|
|
| parser.add_argument( |
| "--no-auto-transpose", |
| action="store_true", |
| help="Disable automatic transpose when reversed shape matches", |
| ) |
|
|
| parser.add_argument( |
| "--no-init-missing", |
| action="store_true", |
| help="Do not initialize missing Chimera weights", |
| ) |
|
|
| parser.add_argument( |
| "--quiet", |
| action="store_true", |
| help="Less logs", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| with open(args.config, "r", encoding="utf-8") as f: |
| config = json.load(f) |
|
|
| importer = OptimizedGGUFImporter( |
| config=config, |
| scale=args.scale, |
| storage=args.storage, |
| param_dtype=args.param_dtype, |
| noise_method=args.noise_method, |
| noise_sigma=args.noise_sigma, |
| ternary_threshold=args.ternary_threshold, |
| resize_strategy=args.resize_strategy, |
| auto_transpose=not args.no_auto_transpose, |
| init_missing=not args.no_init_missing, |
| verbose=not args.quiet, |
| ) |
|
|
| importer.import_model(args.gguf, args.output) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|