""" Model Utilities - Helper functions for model operations """ import logging from typing import Dict, Any, List, Optional, Tuple import torch from transformers import ( AutoModel, AutoModelForCausalLM, AutoModelForSeq2SeqLM, AutoModelForTokenClassification, AutoModelForQuestionAnswering, AutoModelForSequenceClassification, AutoConfig, AutoTokenizer, PreTrainedModel, PreTrainedTokenizer, ) from peft import LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training import os import json import hashlib logger = logging.getLogger(__name__) # Model architectures and their supported tasks MODEL_TASK_MAPPING = { "gpt": ["causal-lm"], "llama": ["causal-lm"], "mistral": ["causal-lm"], "falcon": ["causal-lm"], "qwen": ["causal-lm"], "phi": ["causal-lm"], "opt": ["causal-lm"], "bloom": ["causal-lm"], "t5": ["seq2seq"], "bart": ["seq2seq"], "pegasus": ["seq2seq"], "mt5": ["seq2seq"], "bert": ["token-classification", "text-classification", "question-answering"], "roberta": ["token-classification", "text-classification", "question-answering"], "deberta": ["token-classification", "text-classification", "question-answering"], "xlnet": ["token-classification", "text-classification", "question-answering"], "albert": ["token-classification", "text-classification", "question-answering"], "electra": ["token-classification", "text-classification"], "distilbert": ["token-classification", "text-classification", "question-answering"], } # PEFT task type mapping PEFT_TASK_TYPES = { "causal-lm": TaskType.CAUSAL_LM, "seq2seq": TaskType.SEQ_2_SEQ_LM, "token-classification": TaskType.TOKEN_CLS, "text-classification": TaskType.SEQ_CLS, "question-answering": TaskType.QUESTION_ANS, } def get_model_for_task(model_name: str, task_type: str, **kwargs) -> Tuple[PreTrainedModel, Optional[str]]: """Load appropriate model for a task type.""" try: config = AutoConfig.from_pretrained(model_name) # Determine model class if task_type == "causal-lm": model_class = AutoModelForCausalLM elif task_type == "seq2seq": model_class = AutoModelForSeq2SeqLM elif task_type == "token-classification": model_class = AutoModelForTokenClassification elif task_type == "text-classification": model_class = AutoModelForSequenceClassification elif task_type == "question-answering": model_class = AutoModelForQuestionAnswering else: model_class = AutoModel # Load model model = model_class.from_pretrained( model_name, config=config, **kwargs ) return model, None except Exception as e: logger.error(f"Error loading model {model_name} for task {task_type}: {e}") return None, str(e) def load_tokenizer(model_name: str, **kwargs) -> Tuple[PreTrainedTokenizer, Optional[str]]: """Load tokenizer for a model.""" try: tokenizer = AutoTokenizer.from_pretrained(model_name, **kwargs) # Ensure pad token is set if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token or "" tokenizer.pad_token_id = tokenizer.eos_token_id or tokenizer.convert_tokens_to_ids("") return tokenizer, None except Exception as e: logger.error(f"Error loading tokenizer for {model_name}: {e}") return None, str(e) def get_model_info(model_name: str) -> Dict[str, Any]: """Get detailed model information.""" try: from huggingface_hub import HfApi, model_info api = HfApi() info = api.model_info(model_name) # Try to load config for more details try: config = AutoConfig.from_pretrained(model_name) config_dict = config.to_dict() except: config_dict = {} return { "model_id": info.id, "author": info.author, "sha": info.sha, "pipeline_tag": info.pipeline_tag, "library_name": info.library_name, "downloads": getattr(info, "downloads", 0), "likes": getattr(info, "likes", 0), "tags": info.tags or [], "siblings": [s.rfilename for s in info.siblings] if info.siblings else [], "config": config_dict, "hidden_size": config_dict.get("hidden_size"), "num_hidden_layers": config_dict.get("num_hidden_layers"), "num_attention_heads": config_dict.get("num_attention_heads"), "intermediate_size": config_dict.get("intermediate_size"), "vocab_size": config_dict.get("vocab_size"), "model_type": config_dict.get("model_type"), "architectures": config_dict.get("architectures", []), } except Exception as e: logger.error(f"Error getting model info for {model_name}: {e}") return {"error": str(e)} def check_model_compatibility(model_name: str, task_type: str) -> Tuple[bool, List[str]]: """Check if model is compatible with a task type.""" issues = [] try: config = AutoConfig.from_pretrained(model_name) architectures = config.architectures or [] model_type = config.model_type or "" # Check if architecture supports task compatible = True if task_type == "causal-lm": causal_archs = ["GPT", "LLaMA", "Mistral", "Falcon", "Qwen", "Phi", "OPT", "Bloom", "CausalLM"] if not any(arch in arch for arch in architectures for arch in causal_archs): if model_type not in ["gpt2", "llama", "mistral", "falcon", "qwen", "phi"]: issues.append("Model may not support causal language modeling") elif task_type == "seq2seq": seq2seq_archs = ["T5", "BART", "Pegasus", "MT5", "EncoderDecoderModel"] if not any(arch in arch for arch in architectures for arch in seq2seq_archs): issues.append("Model may not support seq2seq tasks") elif task_type == "token-classification": if not any("TokenClassification" in arch for arch in architectures): issues.append("Model may not support token classification") elif task_type == "text-classification": if not any("Classification" in arch for arch in architectures): issues.append("Model may not support text classification") elif task_type == "question-answering": qa_archs = ["QuestionAnswering", "BertForQA"] if not any(arch in arch for arch in architectures for arch in qa_archs): issues.append("Model may not support question answering") return len(issues) == 0, issues except Exception as e: return False, [f"Error checking compatibility: {str(e)}"] def apply_peft( model: PreTrainedModel, task_type: str, lora_r: int = 8, lora_alpha: int = 32, lora_dropout: float = 0.1, target_modules: Optional[List[str]] = None, ) -> Tuple[PreTrainedModel, Dict[str, Any]]: """Apply PEFT/LoRA to a model.""" try: # Prepare model for training model = prepare_model_for_kbit_training(model) # Get PEFT task type peft_task_type = PEFT_TASK_TYPES.get(task_type, TaskType.CAUSAL_LM) # Auto-detect target modules if not specified if target_modules is None: model_type = getattr(model.config, "model_type", "").lower() if "llama" in model_type or "mistral" in model_type: target_modules = ["q_proj", "v_proj", "k_proj", "o_proj", "gate_proj", "up_proj", "down_proj"] elif "gpt" in model_type: target_modules = ["c_attn", "c_proj"] elif "bert" in model_type or "roberta" in model_type: target_modules = ["query", "value", "key", "dense"] else: target_modules = ["q_proj", "v_proj"] # Create LoRA config lora_config = LoraConfig( r=lora_r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, bias="none", task_type=peft_task_type, target_modules=target_modules, ) # Apply LoRA model = get_peft_model(model, lora_config) # Get trainable params info trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) all_params = sum(p.numel() for p in model.parameters()) info = { "trainable_params": trainable_params, "all_params": all_params, "trainable_percentage": 100 * trainable_params / all_params, "lora_r": lora_r, "lora_alpha": lora_alpha, "target_modules": target_modules, } return model, info except Exception as e: logger.error(f"Error applying PEFT: {e}") return model, {"error": str(e)} def estimate_parameters(model_name: str) -> Dict[str, Any]: """Estimate model parameters without loading.""" try: config = AutoConfig.from_pretrained(model_name) hidden_size = getattr(config, "hidden_size", 768) num_layers = getattr(config, "num_hidden_layers", 12) num_heads = getattr(config, "num_attention_heads", 12) vocab_size = getattr(config, "vocab_size", 30522) intermediate_size = getattr(config, "intermediate_size", hidden_size * 4) # Rough estimation formulas # Embedding params embedding_params = vocab_size * hidden_size # Attention params per layer (Q, K, V, O projections) attention_params = 4 * hidden_size * hidden_size * num_layers # FFN params per layer ffn_params = (hidden_size * intermediate_size + intermediate_size * hidden_size) * num_layers # Layer norm params layernorm_params = 2 * hidden_size * num_layers total_params = embedding_params + attention_params + ffn_params + layernorm_params return { "estimated_params": total_params, "estimated_params_billions": round(total_params / 1e9, 2), "hidden_size": hidden_size, "num_layers": num_layers, "num_heads": num_heads, "vocab_size": vocab_size, "model_size_mb": round(total_params * 4 / (1024 * 1024), 2), # FP32 "model_size_mb_fp16": round(total_params * 2 / (1024 * 1024), 2), # FP16 } except Exception as e: logger.warning(f"Could not estimate parameters: {e}") return { "estimated_params": 0, "estimated_params_billions": 0, "error": str(e), } def get_recommended_settings(model_name: str, task_type: str) -> Dict[str, Any]: """Get recommended training settings for a model.""" info = estimate_parameters(model_name) params_b = info.get("estimated_params_billions", 0.1) # Base recommendations settings = { "batch_size": 1, "gradient_accumulation_steps": 1, "learning_rate": "5e-5", "epochs": 3, "max_length": 512, "use_peft": False, "lora_r": 8, "warmup_ratio": 0.1, } # Adjust based on model size if params_b > 7: # > 7B parameters settings["batch_size"] = 1 settings["gradient_accumulation_steps"] = 8 settings["learning_rate"] = "1e-5" settings["use_peft"] = True settings["lora_r"] = 8 settings["max_length"] = 256 elif params_b > 3: # > 3B parameters settings["batch_size"] = 1 settings["gradient_accumulation_steps"] = 4 settings["learning_rate"] = "2e-5" settings["use_peft"] = True settings["max_length"] = 512 elif params_b > 1: # > 1B parameters settings["batch_size"] = 2 settings["gradient_accumulation_steps"] = 2 settings["use_peft"] = True else: # < 1B parameters settings["batch_size"] = 4 settings["gradient_accumulation_steps"] = 1 settings["use_peft"] = False # Task-specific adjustments if task_type == "seq2seq": settings["max_length"] = 1024 settings["epochs"] = 5 elif task_type == "token-classification": settings["max_length"] = 128 settings["learning_rate"] = "2e-5" elif task_type == "text-classification": settings["epochs"] = 3 settings["learning_rate"] = "3e-5" elif task_type == "question-answering": settings["max_length"] = 384 settings["batch_size"] = 8 return settings def count_parameters(model: PreTrainedModel) -> Dict[str, int]: """Count model parameters.""" trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) total = sum(p.numel() for p in model.parameters()) frozen = total - trainable return { "trainable": trainable, "frozen": frozen, "total": total, "trainable_percentage": 100 * trainable / total if total > 0 else 0, } def get_model_memory_footprint(model: PreTrainedModel) -> Dict[str, float]: """Get model memory footprint in MB.""" param_size = sum(p.numel() * p.element_size() for p in model.parameters()) buffer_size = sum(b.numel() * b.element_size() for b in model.buffers()) return { "parameters_mb": param_size / (1024 * 1024), "buffers_mb": buffer_size / (1024 * 1024), "total_mb": (param_size + buffer_size) / (1024 * 1024), } def save_model_with_metadata( model: PreTrainedModel, tokenizer: PreTrainedTokenizer, output_dir: str, training_config: Dict[str, Any], metrics: Dict[str, float], ) -> Dict[str, str]: """Save model with comprehensive metadata.""" import json from datetime import datetime os.makedirs(output_dir, exist_ok=True) # Save model and tokenizer model.save_pretrained(output_dir) tokenizer.save_pretrained(output_dir) # Get model info param_info = count_parameters(model) memory_info = get_model_memory_footprint(model) # Create comprehensive metadata metadata = { "model_name": training_config.get("model_name", "unknown"), "task_type": training_config.get("task_type", "unknown"), "training_config": training_config, "metrics": metrics, "parameter_info": param_info, "memory_info": memory_info, "created_at": datetime.utcnow().isoformat(), "transformers_version": __import__("transformers").__version__, "torch_version": __import__("torch").__version__, "python_version": __import__("sys").version, } # Save metadata metadata_path = os.path.join(output_dir, "training_metadata.json") with open(metadata_path, "w") as f: json.dump(metadata, f, indent=2) # Create model card model_card = create_model_card(training_config, metrics, param_info) model_card_path = os.path.join(output_dir, "README.md") with open(model_card_path, "w") as f: f.write(model_card) return { "output_dir": output_dir, "model_path": output_dir, "metadata_path": metadata_path, "model_card_path": model_card_path, } def create_model_card( config: Dict[str, Any], metrics: Dict[str, float], param_info: Dict[str, int], ) -> str: """Create a model card README.""" model_name = config.get("model_name", "unknown") task_type = config.get("task_type", "unknown") metrics_str = "\n".join([f"- {k}: {v:.4f}" if isinstance(v, float) else f"- {k}: {v}" for k, v in metrics.items()]) if metrics else "- No metrics available" return f"""# {model_name} - Fine-tuned ## Model Details - **Base Model:** {model_name} - **Task:** {task_type} - **Total Parameters:** {param_info.get('total', 0):,} - **Trainable Parameters:** {param_info.get('trainable', 0):,} ## Training Configuration ```json {json.dumps(config, indent=2)} ``` ## Training Metrics {metrics_str} ## Usage ```python from transformers import AutoModel, AutoTokenizer model = AutoModel.from_pretrained("path/to/model") tokenizer = AutoTokenizer.from_pretrained("path/to/model") ``` ## License Please refer to the original model's license. ## Training Framework This model was trained using the Universal Model Trainer. """