| """ |
| Model Utilities - Helper functions for model operations |
| """ |
|
|
| import logging |
| from typing import Dict, Any, List, Optional, Tuple |
| import torch |
| from transformers import ( |
| AutoModel, |
| AutoModelForCausalLM, |
| AutoModelForSeq2SeqLM, |
| AutoModelForTokenClassification, |
| AutoModelForQuestionAnswering, |
| AutoModelForSequenceClassification, |
| AutoConfig, |
| AutoTokenizer, |
| PreTrainedModel, |
| PreTrainedTokenizer, |
| ) |
| from peft import LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training |
| import os |
| import json |
| import hashlib |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| MODEL_TASK_MAPPING = { |
| "gpt": ["causal-lm"], |
| "llama": ["causal-lm"], |
| "mistral": ["causal-lm"], |
| "falcon": ["causal-lm"], |
| "qwen": ["causal-lm"], |
| "phi": ["causal-lm"], |
| "opt": ["causal-lm"], |
| "bloom": ["causal-lm"], |
| "t5": ["seq2seq"], |
| "bart": ["seq2seq"], |
| "pegasus": ["seq2seq"], |
| "mt5": ["seq2seq"], |
| "bert": ["token-classification", "text-classification", "question-answering"], |
| "roberta": ["token-classification", "text-classification", "question-answering"], |
| "deberta": ["token-classification", "text-classification", "question-answering"], |
| "xlnet": ["token-classification", "text-classification", "question-answering"], |
| "albert": ["token-classification", "text-classification", "question-answering"], |
| "electra": ["token-classification", "text-classification"], |
| "distilbert": ["token-classification", "text-classification", "question-answering"], |
| } |
|
|
|
|
| |
| PEFT_TASK_TYPES = { |
| "causal-lm": TaskType.CAUSAL_LM, |
| "seq2seq": TaskType.SEQ_2_SEQ_LM, |
| "token-classification": TaskType.TOKEN_CLS, |
| "text-classification": TaskType.SEQ_CLS, |
| "question-answering": TaskType.QUESTION_ANS, |
| } |
|
|
|
|
| def get_model_for_task(model_name: str, task_type: str, **kwargs) -> Tuple[PreTrainedModel, Optional[str]]: |
| """Load appropriate model for a task type.""" |
| try: |
| config = AutoConfig.from_pretrained(model_name) |
| |
| |
| if task_type == "causal-lm": |
| model_class = AutoModelForCausalLM |
| elif task_type == "seq2seq": |
| model_class = AutoModelForSeq2SeqLM |
| elif task_type == "token-classification": |
| model_class = AutoModelForTokenClassification |
| elif task_type == "text-classification": |
| model_class = AutoModelForSequenceClassification |
| elif task_type == "question-answering": |
| model_class = AutoModelForQuestionAnswering |
| else: |
| model_class = AutoModel |
| |
| |
| model = model_class.from_pretrained( |
| model_name, |
| config=config, |
| **kwargs |
| ) |
| |
| return model, None |
| |
| except Exception as e: |
| logger.error(f"Error loading model {model_name} for task {task_type}: {e}") |
| return None, str(e) |
|
|
|
|
| def load_tokenizer(model_name: str, **kwargs) -> Tuple[PreTrainedTokenizer, Optional[str]]: |
| """Load tokenizer for a model.""" |
| try: |
| tokenizer = AutoTokenizer.from_pretrained(model_name, **kwargs) |
| |
| |
| if tokenizer.pad_token is None: |
| tokenizer.pad_token = tokenizer.eos_token or "<pad>" |
| tokenizer.pad_token_id = tokenizer.eos_token_id or tokenizer.convert_tokens_to_ids("<pad>") |
| |
| return tokenizer, None |
| |
| except Exception as e: |
| logger.error(f"Error loading tokenizer for {model_name}: {e}") |
| return None, str(e) |
|
|
|
|
| def get_model_info(model_name: str) -> Dict[str, Any]: |
| """Get detailed model information.""" |
| try: |
| from huggingface_hub import HfApi, model_info |
| |
| api = HfApi() |
| info = api.model_info(model_name) |
| |
| |
| try: |
| config = AutoConfig.from_pretrained(model_name) |
| config_dict = config.to_dict() |
| except: |
| config_dict = {} |
| |
| return { |
| "model_id": info.id, |
| "author": info.author, |
| "sha": info.sha, |
| "pipeline_tag": info.pipeline_tag, |
| "library_name": info.library_name, |
| "downloads": getattr(info, "downloads", 0), |
| "likes": getattr(info, "likes", 0), |
| "tags": info.tags or [], |
| "siblings": [s.rfilename for s in info.siblings] if info.siblings else [], |
| "config": config_dict, |
| "hidden_size": config_dict.get("hidden_size"), |
| "num_hidden_layers": config_dict.get("num_hidden_layers"), |
| "num_attention_heads": config_dict.get("num_attention_heads"), |
| "intermediate_size": config_dict.get("intermediate_size"), |
| "vocab_size": config_dict.get("vocab_size"), |
| "model_type": config_dict.get("model_type"), |
| "architectures": config_dict.get("architectures", []), |
| } |
| |
| except Exception as e: |
| logger.error(f"Error getting model info for {model_name}: {e}") |
| return {"error": str(e)} |
|
|
|
|
| def check_model_compatibility(model_name: str, task_type: str) -> Tuple[bool, List[str]]: |
| """Check if model is compatible with a task type.""" |
| issues = [] |
| |
| try: |
| config = AutoConfig.from_pretrained(model_name) |
| architectures = config.architectures or [] |
| model_type = config.model_type or "" |
| |
| |
| compatible = True |
| |
| if task_type == "causal-lm": |
| causal_archs = ["GPT", "LLaMA", "Mistral", "Falcon", "Qwen", "Phi", "OPT", "Bloom", "CausalLM"] |
| if not any(arch in arch for arch in architectures for arch in causal_archs): |
| if model_type not in ["gpt2", "llama", "mistral", "falcon", "qwen", "phi"]: |
| issues.append("Model may not support causal language modeling") |
| |
| elif task_type == "seq2seq": |
| seq2seq_archs = ["T5", "BART", "Pegasus", "MT5", "EncoderDecoderModel"] |
| if not any(arch in arch for arch in architectures for arch in seq2seq_archs): |
| issues.append("Model may not support seq2seq tasks") |
| |
| elif task_type == "token-classification": |
| if not any("TokenClassification" in arch for arch in architectures): |
| issues.append("Model may not support token classification") |
| |
| elif task_type == "text-classification": |
| if not any("Classification" in arch for arch in architectures): |
| issues.append("Model may not support text classification") |
| |
| elif task_type == "question-answering": |
| qa_archs = ["QuestionAnswering", "BertForQA"] |
| if not any(arch in arch for arch in architectures for arch in qa_archs): |
| issues.append("Model may not support question answering") |
| |
| return len(issues) == 0, issues |
| |
| except Exception as e: |
| return False, [f"Error checking compatibility: {str(e)}"] |
|
|
|
|
| def apply_peft( |
| model: PreTrainedModel, |
| task_type: str, |
| lora_r: int = 8, |
| lora_alpha: int = 32, |
| lora_dropout: float = 0.1, |
| target_modules: Optional[List[str]] = None, |
| ) -> Tuple[PreTrainedModel, Dict[str, Any]]: |
| """Apply PEFT/LoRA to a model.""" |
| try: |
| |
| model = prepare_model_for_kbit_training(model) |
| |
| |
| peft_task_type = PEFT_TASK_TYPES.get(task_type, TaskType.CAUSAL_LM) |
| |
| |
| if target_modules is None: |
| model_type = getattr(model.config, "model_type", "").lower() |
| if "llama" in model_type or "mistral" in model_type: |
| target_modules = ["q_proj", "v_proj", "k_proj", "o_proj", "gate_proj", "up_proj", "down_proj"] |
| elif "gpt" in model_type: |
| target_modules = ["c_attn", "c_proj"] |
| elif "bert" in model_type or "roberta" in model_type: |
| target_modules = ["query", "value", "key", "dense"] |
| else: |
| target_modules = ["q_proj", "v_proj"] |
| |
| |
| lora_config = LoraConfig( |
| r=lora_r, |
| lora_alpha=lora_alpha, |
| lora_dropout=lora_dropout, |
| bias="none", |
| task_type=peft_task_type, |
| target_modules=target_modules, |
| ) |
| |
| |
| model = get_peft_model(model, lora_config) |
| |
| |
| trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) |
| all_params = sum(p.numel() for p in model.parameters()) |
| |
| info = { |
| "trainable_params": trainable_params, |
| "all_params": all_params, |
| "trainable_percentage": 100 * trainable_params / all_params, |
| "lora_r": lora_r, |
| "lora_alpha": lora_alpha, |
| "target_modules": target_modules, |
| } |
| |
| return model, info |
| |
| except Exception as e: |
| logger.error(f"Error applying PEFT: {e}") |
| return model, {"error": str(e)} |
|
|
|
|
| def estimate_parameters(model_name: str) -> Dict[str, Any]: |
| """Estimate model parameters without loading.""" |
| try: |
| config = AutoConfig.from_pretrained(model_name) |
| |
| hidden_size = getattr(config, "hidden_size", 768) |
| num_layers = getattr(config, "num_hidden_layers", 12) |
| num_heads = getattr(config, "num_attention_heads", 12) |
| vocab_size = getattr(config, "vocab_size", 30522) |
| intermediate_size = getattr(config, "intermediate_size", hidden_size * 4) |
| |
| |
| |
| embedding_params = vocab_size * hidden_size |
| |
| |
| attention_params = 4 * hidden_size * hidden_size * num_layers |
| |
| |
| ffn_params = (hidden_size * intermediate_size + intermediate_size * hidden_size) * num_layers |
| |
| |
| layernorm_params = 2 * hidden_size * num_layers |
| |
| total_params = embedding_params + attention_params + ffn_params + layernorm_params |
| |
| return { |
| "estimated_params": total_params, |
| "estimated_params_billions": round(total_params / 1e9, 2), |
| "hidden_size": hidden_size, |
| "num_layers": num_layers, |
| "num_heads": num_heads, |
| "vocab_size": vocab_size, |
| "model_size_mb": round(total_params * 4 / (1024 * 1024), 2), |
| "model_size_mb_fp16": round(total_params * 2 / (1024 * 1024), 2), |
| } |
| |
| except Exception as e: |
| logger.warning(f"Could not estimate parameters: {e}") |
| return { |
| "estimated_params": 0, |
| "estimated_params_billions": 0, |
| "error": str(e), |
| } |
|
|
|
|
| def get_recommended_settings(model_name: str, task_type: str) -> Dict[str, Any]: |
| """Get recommended training settings for a model.""" |
| info = estimate_parameters(model_name) |
| params_b = info.get("estimated_params_billions", 0.1) |
| |
| |
| settings = { |
| "batch_size": 1, |
| "gradient_accumulation_steps": 1, |
| "learning_rate": "5e-5", |
| "epochs": 3, |
| "max_length": 512, |
| "use_peft": False, |
| "lora_r": 8, |
| "warmup_ratio": 0.1, |
| } |
| |
| |
| if params_b > 7: |
| settings["batch_size"] = 1 |
| settings["gradient_accumulation_steps"] = 8 |
| settings["learning_rate"] = "1e-5" |
| settings["use_peft"] = True |
| settings["lora_r"] = 8 |
| settings["max_length"] = 256 |
| |
| elif params_b > 3: |
| settings["batch_size"] = 1 |
| settings["gradient_accumulation_steps"] = 4 |
| settings["learning_rate"] = "2e-5" |
| settings["use_peft"] = True |
| settings["max_length"] = 512 |
| |
| elif params_b > 1: |
| settings["batch_size"] = 2 |
| settings["gradient_accumulation_steps"] = 2 |
| settings["use_peft"] = True |
| |
| else: |
| settings["batch_size"] = 4 |
| settings["gradient_accumulation_steps"] = 1 |
| settings["use_peft"] = False |
| |
| |
| if task_type == "seq2seq": |
| settings["max_length"] = 1024 |
| settings["epochs"] = 5 |
| |
| elif task_type == "token-classification": |
| settings["max_length"] = 128 |
| settings["learning_rate"] = "2e-5" |
| |
| elif task_type == "text-classification": |
| settings["epochs"] = 3 |
| settings["learning_rate"] = "3e-5" |
| |
| elif task_type == "question-answering": |
| settings["max_length"] = 384 |
| settings["batch_size"] = 8 |
| |
| return settings |
|
|
|
|
| def count_parameters(model: PreTrainedModel) -> Dict[str, int]: |
| """Count model parameters.""" |
| trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) |
| total = sum(p.numel() for p in model.parameters()) |
| frozen = total - trainable |
| |
| return { |
| "trainable": trainable, |
| "frozen": frozen, |
| "total": total, |
| "trainable_percentage": 100 * trainable / total if total > 0 else 0, |
| } |
|
|
|
|
| def get_model_memory_footprint(model: PreTrainedModel) -> Dict[str, float]: |
| """Get model memory footprint in MB.""" |
| param_size = sum(p.numel() * p.element_size() for p in model.parameters()) |
| buffer_size = sum(b.numel() * b.element_size() for b in model.buffers()) |
| |
| return { |
| "parameters_mb": param_size / (1024 * 1024), |
| "buffers_mb": buffer_size / (1024 * 1024), |
| "total_mb": (param_size + buffer_size) / (1024 * 1024), |
| } |
|
|
|
|
| def save_model_with_metadata( |
| model: PreTrainedModel, |
| tokenizer: PreTrainedTokenizer, |
| output_dir: str, |
| training_config: Dict[str, Any], |
| metrics: Dict[str, float], |
| ) -> Dict[str, str]: |
| """Save model with comprehensive metadata.""" |
| import json |
| from datetime import datetime |
| |
| os.makedirs(output_dir, exist_ok=True) |
| |
| |
| model.save_pretrained(output_dir) |
| tokenizer.save_pretrained(output_dir) |
| |
| |
| param_info = count_parameters(model) |
| memory_info = get_model_memory_footprint(model) |
| |
| |
| metadata = { |
| "model_name": training_config.get("model_name", "unknown"), |
| "task_type": training_config.get("task_type", "unknown"), |
| "training_config": training_config, |
| "metrics": metrics, |
| "parameter_info": param_info, |
| "memory_info": memory_info, |
| "created_at": datetime.utcnow().isoformat(), |
| "transformers_version": __import__("transformers").__version__, |
| "torch_version": __import__("torch").__version__, |
| "python_version": __import__("sys").version, |
| } |
| |
| |
| metadata_path = os.path.join(output_dir, "training_metadata.json") |
| with open(metadata_path, "w") as f: |
| json.dump(metadata, f, indent=2) |
| |
| |
| model_card = create_model_card(training_config, metrics, param_info) |
| model_card_path = os.path.join(output_dir, "README.md") |
| with open(model_card_path, "w") as f: |
| f.write(model_card) |
| |
| return { |
| "output_dir": output_dir, |
| "model_path": output_dir, |
| "metadata_path": metadata_path, |
| "model_card_path": model_card_path, |
| } |
|
|
|
|
| def create_model_card( |
| config: Dict[str, Any], |
| metrics: Dict[str, float], |
| param_info: Dict[str, int], |
| ) -> str: |
| """Create a model card README.""" |
| model_name = config.get("model_name", "unknown") |
| task_type = config.get("task_type", "unknown") |
| |
| metrics_str = "\n".join([f"- {k}: {v:.4f}" if isinstance(v, float) else f"- {k}: {v}" for k, v in metrics.items()]) if metrics else "- No metrics available" |
| |
| return f"""# {model_name} - Fine-tuned |
| |
| ## Model Details |
| |
| - **Base Model:** {model_name} |
| - **Task:** {task_type} |
| - **Total Parameters:** {param_info.get('total', 0):,} |
| - **Trainable Parameters:** {param_info.get('trainable', 0):,} |
| |
| ## Training Configuration |
| |
| ```json |
| {json.dumps(config, indent=2)} |
| ``` |
| |
| ## Training Metrics |
| |
| {metrics_str} |
| |
| ## Usage |
| |
| ```python |
| from transformers import AutoModel, AutoTokenizer |
| |
| model = AutoModel.from_pretrained("path/to/model") |
| tokenizer = AutoTokenizer.from_pretrained("path/to/model") |
| ``` |
| |
| ## License |
| |
| Please refer to the original model's license. |
| |
| ## Training Framework |
| |
| This model was trained using the Universal Model Trainer. |
| """ |