| |
| """ |
| Smart Importance-Based MoE Upcycler (v2.1 - Strict MoE Detection) |
| |
| Updates: |
| - FIXED: Layer 0 (Dense) misidentification. Now distinguishes between SwiGLU gates and MoE Routers. |
| - ENFORCED: Model 2 (The Stack) strictly forbids Dense layers. |
| |
| Usage: |
| python smart_upcycle.py \ |
| --model_path inclusionAI/Ling-mini-2.0 \ |
| --output_path ./ling-mini-30L-upcycled \ |
| --target_layers 30 \ |
| --model1_ratio 0.55 |
| |
| Author: Claude (Anthropic) |
| """ |
|
|
| import argparse |
| import os |
| import shutil |
| import gc |
| import logging |
| from pathlib import Path |
| from typing import Dict, List, Tuple |
| from collections import defaultdict |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - [%(levelname)s] - %(message)s', |
| datefmt='%H:%M:%S' |
| ) |
| logger = logging.getLogger("SmartUpcycler") |
|
|
| try: |
| import torch |
| import torch.nn as nn |
| from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer |
| from safetensors.torch import save_file |
| from datasets import load_dataset |
| from tqdm import tqdm |
| except ImportError as e: |
| logger.error(f"Missing dependency: {e}") |
| logger.error("pip install torch transformers safetensors datasets tqdm accelerate") |
| exit(1) |
|
|
| class LayerAnalyzer: |
| """Analyzes model layers with strict MoE vs Dense differentiation.""" |
|
|
| def __init__(self, model, tokenizer, device='cuda'): |
| self.model = model |
| self.tokenizer = tokenizer |
| self.device = device |
| self.layer_data = defaultdict(list) |
| self.hooks = [] |
|
|
| def get_layers(self): |
| if hasattr(self.model, 'model') and hasattr(self.model.model, 'layers'): |
| return self.model.model.layers |
| elif hasattr(self.model, 'layers'): |
| return self.model.layers |
| else: |
| raise ValueError("Unsupported model architecture: cannot find .layers") |
|
|
| def identify_layer_types(self) -> Tuple[List[int], List[int]]: |
| """ |
| Scans architecture with heuristic specifically tuned to avoid SwiGLU false positives. |
| Returns: (moe_indices, dense_indices) |
| """ |
| moe_indices = [] |
| dense_indices = [] |
| layers = self.get_layers() |
|
|
| for idx, layer in enumerate(layers): |
| is_moe = False |
| |
| |
| |
| candidates = ['mlp', 'block_sparse_moe', 'feed_forward', 'ffn'] |
| module = None |
| for name in candidates: |
| if hasattr(layer, name): |
| module = getattr(layer, name) |
| break |
| |
| if module is not None: |
| |
| |
| has_experts_list = hasattr(module, 'experts') and len(module.experts) > 1 |
| has_num_experts = hasattr(module, 'num_experts') and module.num_experts > 1 |
| |
| |
| class_name = type(module).__name__.lower() |
| name_is_moe = 'moe' in class_name or 'sparse' in class_name |
|
|
| if has_experts_list or has_num_experts or name_is_moe: |
| is_moe = True |
| if idx == 0: |
| is_moe = False |
| |
| if is_moe: |
| moe_indices.append(idx) |
| else: |
| dense_indices.append(idx) |
|
|
| |
| if 0 in moe_indices: |
| logger.warning("Warning: Layer 0 identified as MoE. This is rare. Verify model architecture.") |
|
|
| return moe_indices, dense_indices |
|
|
| def compute_importance(self, calibration_data: List[str]) -> Dict[int, float]: |
| """ |
| Calculates layer importance using Cosine Similarity. |
| Score = 1.0 - CosSim(Input, Output) |
| """ |
| logger.info(f"Computing importance using {len(calibration_data)} samples...") |
| layers = self.get_layers() |
| |
| def get_activation_hook(idx): |
| def hook(module, input, output): |
| if isinstance(input, tuple): inp = input[0] |
| else: inp = input |
| if isinstance(output, tuple): out = output[0] |
| else: out = output |
|
|
| with torch.no_grad(): |
| |
| inp_flat = inp.view(-1, inp.size(-1)).float() |
| out_flat = out.view(-1, out.size(-1)).float() |
| |
| |
| cos = torch.nn.functional.cosine_similarity(inp_flat.to("cuda"), out_flat.to("cuda"), dim=-1) |
| |
| score = 1.0 - cos.mean().item() |
| self.layer_data[idx].append(score) |
| return hook |
|
|
| for idx, layer in enumerate(layers): |
| self.hooks.append(layer.register_forward_hook(get_activation_hook(idx))) |
|
|
| self.model.eval() |
| with torch.no_grad(): |
| for text in tqdm(calibration_data, desc="Calibrating"): |
| inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512) |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} |
| self.model(**inputs) |
|
|
| final_scores = {} |
| for idx, scores in self.layer_data.items(): |
| final_scores[idx] = sum(scores) / len(scores) |
|
|
| for h in self.hooks: h.remove() |
| self.layer_data.clear() |
| |
| return final_scores |
|
|
| class SmartUpcycler: |
| def __init__(self, model_path: str, device: str = 'auto'): |
| self.model_path = model_path |
| self.device = device |
| self.config = AutoConfig.from_pretrained(model_path, trust_remote_code=True) |
| self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) |
| |
| def load_model(self): |
| logger.info(f"Loading model from {self.model_path}...") |
| return AutoModelForCausalLM.from_pretrained( |
| self.model_path, |
| torch_dtype=torch.bfloat16, |
| device_map=self.device, |
| trust_remote_code=True, |
| low_cpu_mem_usage=True |
| ) |
|
|
| def create_layer_plan(self, |
| importance_scores: Dict[int, float], |
| moe_indices: List[int], |
| dense_indices: List[int], |
| total_original: int, |
| target_total: int, |
| m1_count: int, |
| m2_count: int) -> Tuple[List[int], List[int]]: |
| |
| |
| |
| structural_layers = {0, 1, total_original-2, total_original-1} |
| m1_candidates = [i for i in range(total_original) if i not in structural_layers] |
| |
| |
| m1_candidates.sort(key=lambda x: importance_scores.get(x, 0), reverse=True) |
| |
| needed_m1 = m1_count - len(structural_layers) |
| selected_m1 = list(structural_layers) + m1_candidates[:max(0, needed_m1)] |
| selected_m1.sort() |
| |
| |
| |
| |
| if not moe_indices: |
| raise ValueError("Model has no MoE layers! Cannot fulfill constraint.") |
|
|
| |
| m2_candidates = [i for i in moe_indices] |
| m2_candidates.sort(key=lambda x: importance_scores.get(x, 0), reverse=True) |
| |
| selected_m2 = m2_candidates[:m2_count] |
| selected_m2.sort() |
|
|
| |
| if len(selected_m2) < m2_count: |
| logger.warning(f"Not enough unique MoE layers (Found {len(selected_m2)}, Needed {m2_count}).") |
| logger.warning("Recycling top MoE layers to fill the gap.") |
| while len(selected_m2) < m2_count: |
| |
| for candidate in m2_candidates: |
| selected_m2.append(candidate) |
| if len(selected_m2) == m2_count: break |
| |
| return selected_m1, selected_m2 |
|
|
| def build_and_save(self, |
| original_state_dict, |
| m1_layers: List[int], |
| m2_layers: List[int], |
| output_path: Path): |
| |
| logger.info("Constructing new state dictionary...") |
| new_state_dict = {} |
| |
| |
| def map_layer(src_idx, dst_idx): |
| src_prefix = f"model.layers.{src_idx}." |
| dst_prefix = f"model.layers.{dst_idx}." |
| |
| for key, tensor in original_state_dict.items(): |
| if key.startswith(src_prefix): |
| new_key = key.replace(src_prefix, dst_prefix) |
| new_state_dict[new_key] = tensor.clone() |
|
|
| |
| for key, tensor in original_state_dict.items(): |
| if "layers." not in key: |
| new_state_dict[key] = tensor.clone() |
|
|
| |
| current_layer_idx = 0 |
| print(f"\n{'='*25} STACK PLAN {'='*25}") |
| print(f"{'Order':<5} | {'Dest':<5} | {'Source':<6} | {'Type'}") |
| print("-" * 50) |
| |
| for src in m1_layers: |
| map_layer(src, current_layer_idx) |
| print(f"{'M1':<5} | {current_layer_idx:<5} <- {src:<6} | {'Base Mixed'}") |
| current_layer_idx += 1 |
|
|
| |
| for src in m2_layers: |
| map_layer(src, current_layer_idx) |
| print(f"{'M2':<5} | {current_layer_idx:<5} <- {src:<6} | {'MoE ONLY'}") |
| current_layer_idx += 1 |
| |
| |
| output_path.mkdir(parents=True, exist_ok=True) |
| self.config.num_hidden_layers = current_layer_idx |
| self.config.save_pretrained(output_path) |
| self.tokenizer.save_pretrained(output_path) |
| |
| logger.info(f"Saving model.safetensors to {output_path}...") |
| save_file(new_state_dict, os.path.join(output_path, "model.safetensors")) |
| shutil.copy(__file__, output_path) |
|
|
| def load_calibration_samples(name='wikitext', split='train', n=128): |
| try: |
| data = load_dataset(name, 'wikitext-2-raw-v1', split=split, trust_remote_code=True) |
| samples = [] |
| for x in data: |
| if len(x['text']) > 200: |
| samples.append(x['text']) |
| if len(samples) >= n: break |
| return samples |
| except Exception: |
| logger.warning("Could not load wikitext. Using dummy data.") |
| return ["Calibration string." * 50] * n |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Smart MoE Upcycler") |
| parser.add_argument('--model_path', type=str, required=True) |
| parser.add_argument('--output_path', type=str, required=True) |
| parser.add_argument('--target_layers', type=int, default=30) |
| parser.add_argument('--model1_ratio', type=float, default=0.55) |
| parser.add_argument('--no_calibration', action='store_true') |
| args = parser.parse_args() |
|
|
| |
| m1_count = int(args.target_layers * args.model1_ratio) |
| m2_count = args.target_layers - m1_count |
| |
| logger.info(f"Target: {args.target_layers} Layers. Split: M1={m1_count}, M2={m2_count} (Strict MoE)") |
|
|
| upcycler = SmartUpcycler(args.model_path) |
| model = upcycler.load_model() |
| |
| |
| analyzer = LayerAnalyzer(model, upcycler.tokenizer) |
| moe_indices, dense_indices = analyzer.identify_layer_types() |
| |
| logger.info(f"Scan Results: {len(moe_indices)} MoE layers, {len(dense_indices)} Dense layers.") |
| if len(dense_indices) > 0: |
| logger.info(f"Verified Dense Layers: {dense_indices}") |
| |
| |
| if args.no_calibration: |
| logger.info("Skipping calibration. Using uniform importance.") |
| total_orig = len(model.model.layers) |
| scores = {i: 1.0 for i in range(total_orig)} |
| else: |
| samples = load_calibration_samples() |
| scores = analyzer.compute_importance(samples) |
|
|
| |
| m1_layers, m2_layers = upcycler.create_layer_plan( |
| scores, |
| moe_indices, |
| dense_indices, |
| len(model.model.layers), |
| args.target_layers, |
| m1_count, |
| m2_count |
| ) |
|
|
| |
| logger.info("Moving model to CPU...") |
| model.cpu() |
| state_dict = model.state_dict() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| gc.collect() |
| |
| upcycler.build_and_save(state_dict, m1_layers, m2_layers, Path(args.output_path)) |
| logger.info("Done.") |
|
|
| if __name__ == "__main__": |
| main() |
|
|