universal-model-trainer / app /utils /model_utils.py
vectorplasticity's picture
Add model utilities
810b6bc verified
"""
Model Utilities - Helper functions for model operations
"""
import logging
from typing import Dict, Any, List, Optional, Tuple
import torch
from transformers import (
AutoModel,
AutoModelForCausalLM,
AutoModelForSeq2SeqLM,
AutoModelForTokenClassification,
AutoModelForQuestionAnswering,
AutoModelForSequenceClassification,
AutoConfig,
AutoTokenizer,
PreTrainedModel,
PreTrainedTokenizer,
)
from peft import LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training
import os
import json
import hashlib
logger = logging.getLogger(__name__)
# Model architectures and their supported tasks
MODEL_TASK_MAPPING = {
"gpt": ["causal-lm"],
"llama": ["causal-lm"],
"mistral": ["causal-lm"],
"falcon": ["causal-lm"],
"qwen": ["causal-lm"],
"phi": ["causal-lm"],
"opt": ["causal-lm"],
"bloom": ["causal-lm"],
"t5": ["seq2seq"],
"bart": ["seq2seq"],
"pegasus": ["seq2seq"],
"mt5": ["seq2seq"],
"bert": ["token-classification", "text-classification", "question-answering"],
"roberta": ["token-classification", "text-classification", "question-answering"],
"deberta": ["token-classification", "text-classification", "question-answering"],
"xlnet": ["token-classification", "text-classification", "question-answering"],
"albert": ["token-classification", "text-classification", "question-answering"],
"electra": ["token-classification", "text-classification"],
"distilbert": ["token-classification", "text-classification", "question-answering"],
}
# PEFT task type mapping
PEFT_TASK_TYPES = {
"causal-lm": TaskType.CAUSAL_LM,
"seq2seq": TaskType.SEQ_2_SEQ_LM,
"token-classification": TaskType.TOKEN_CLS,
"text-classification": TaskType.SEQ_CLS,
"question-answering": TaskType.QUESTION_ANS,
}
def get_model_for_task(model_name: str, task_type: str, **kwargs) -> Tuple[PreTrainedModel, Optional[str]]:
"""Load appropriate model for a task type."""
try:
config = AutoConfig.from_pretrained(model_name)
# Determine model class
if task_type == "causal-lm":
model_class = AutoModelForCausalLM
elif task_type == "seq2seq":
model_class = AutoModelForSeq2SeqLM
elif task_type == "token-classification":
model_class = AutoModelForTokenClassification
elif task_type == "text-classification":
model_class = AutoModelForSequenceClassification
elif task_type == "question-answering":
model_class = AutoModelForQuestionAnswering
else:
model_class = AutoModel
# Load model
model = model_class.from_pretrained(
model_name,
config=config,
**kwargs
)
return model, None
except Exception as e:
logger.error(f"Error loading model {model_name} for task {task_type}: {e}")
return None, str(e)
def load_tokenizer(model_name: str, **kwargs) -> Tuple[PreTrainedTokenizer, Optional[str]]:
"""Load tokenizer for a model."""
try:
tokenizer = AutoTokenizer.from_pretrained(model_name, **kwargs)
# Ensure pad token is set
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token or "<pad>"
tokenizer.pad_token_id = tokenizer.eos_token_id or tokenizer.convert_tokens_to_ids("<pad>")
return tokenizer, None
except Exception as e:
logger.error(f"Error loading tokenizer for {model_name}: {e}")
return None, str(e)
def get_model_info(model_name: str) -> Dict[str, Any]:
"""Get detailed model information."""
try:
from huggingface_hub import HfApi, model_info
api = HfApi()
info = api.model_info(model_name)
# Try to load config for more details
try:
config = AutoConfig.from_pretrained(model_name)
config_dict = config.to_dict()
except:
config_dict = {}
return {
"model_id": info.id,
"author": info.author,
"sha": info.sha,
"pipeline_tag": info.pipeline_tag,
"library_name": info.library_name,
"downloads": getattr(info, "downloads", 0),
"likes": getattr(info, "likes", 0),
"tags": info.tags or [],
"siblings": [s.rfilename for s in info.siblings] if info.siblings else [],
"config": config_dict,
"hidden_size": config_dict.get("hidden_size"),
"num_hidden_layers": config_dict.get("num_hidden_layers"),
"num_attention_heads": config_dict.get("num_attention_heads"),
"intermediate_size": config_dict.get("intermediate_size"),
"vocab_size": config_dict.get("vocab_size"),
"model_type": config_dict.get("model_type"),
"architectures": config_dict.get("architectures", []),
}
except Exception as e:
logger.error(f"Error getting model info for {model_name}: {e}")
return {"error": str(e)}
def check_model_compatibility(model_name: str, task_type: str) -> Tuple[bool, List[str]]:
"""Check if model is compatible with a task type."""
issues = []
try:
config = AutoConfig.from_pretrained(model_name)
architectures = config.architectures or []
model_type = config.model_type or ""
# Check if architecture supports task
compatible = True
if task_type == "causal-lm":
causal_archs = ["GPT", "LLaMA", "Mistral", "Falcon", "Qwen", "Phi", "OPT", "Bloom", "CausalLM"]
if not any(arch in arch for arch in architectures for arch in causal_archs):
if model_type not in ["gpt2", "llama", "mistral", "falcon", "qwen", "phi"]:
issues.append("Model may not support causal language modeling")
elif task_type == "seq2seq":
seq2seq_archs = ["T5", "BART", "Pegasus", "MT5", "EncoderDecoderModel"]
if not any(arch in arch for arch in architectures for arch in seq2seq_archs):
issues.append("Model may not support seq2seq tasks")
elif task_type == "token-classification":
if not any("TokenClassification" in arch for arch in architectures):
issues.append("Model may not support token classification")
elif task_type == "text-classification":
if not any("Classification" in arch for arch in architectures):
issues.append("Model may not support text classification")
elif task_type == "question-answering":
qa_archs = ["QuestionAnswering", "BertForQA"]
if not any(arch in arch for arch in architectures for arch in qa_archs):
issues.append("Model may not support question answering")
return len(issues) == 0, issues
except Exception as e:
return False, [f"Error checking compatibility: {str(e)}"]
def apply_peft(
model: PreTrainedModel,
task_type: str,
lora_r: int = 8,
lora_alpha: int = 32,
lora_dropout: float = 0.1,
target_modules: Optional[List[str]] = None,
) -> Tuple[PreTrainedModel, Dict[str, Any]]:
"""Apply PEFT/LoRA to a model."""
try:
# Prepare model for training
model = prepare_model_for_kbit_training(model)
# Get PEFT task type
peft_task_type = PEFT_TASK_TYPES.get(task_type, TaskType.CAUSAL_LM)
# Auto-detect target modules if not specified
if target_modules is None:
model_type = getattr(model.config, "model_type", "").lower()
if "llama" in model_type or "mistral" in model_type:
target_modules = ["q_proj", "v_proj", "k_proj", "o_proj", "gate_proj", "up_proj", "down_proj"]
elif "gpt" in model_type:
target_modules = ["c_attn", "c_proj"]
elif "bert" in model_type or "roberta" in model_type:
target_modules = ["query", "value", "key", "dense"]
else:
target_modules = ["q_proj", "v_proj"]
# Create LoRA config
lora_config = LoraConfig(
r=lora_r,
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
bias="none",
task_type=peft_task_type,
target_modules=target_modules,
)
# Apply LoRA
model = get_peft_model(model, lora_config)
# Get trainable params info
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
all_params = sum(p.numel() for p in model.parameters())
info = {
"trainable_params": trainable_params,
"all_params": all_params,
"trainable_percentage": 100 * trainable_params / all_params,
"lora_r": lora_r,
"lora_alpha": lora_alpha,
"target_modules": target_modules,
}
return model, info
except Exception as e:
logger.error(f"Error applying PEFT: {e}")
return model, {"error": str(e)}
def estimate_parameters(model_name: str) -> Dict[str, Any]:
"""Estimate model parameters without loading."""
try:
config = AutoConfig.from_pretrained(model_name)
hidden_size = getattr(config, "hidden_size", 768)
num_layers = getattr(config, "num_hidden_layers", 12)
num_heads = getattr(config, "num_attention_heads", 12)
vocab_size = getattr(config, "vocab_size", 30522)
intermediate_size = getattr(config, "intermediate_size", hidden_size * 4)
# Rough estimation formulas
# Embedding params
embedding_params = vocab_size * hidden_size
# Attention params per layer (Q, K, V, O projections)
attention_params = 4 * hidden_size * hidden_size * num_layers
# FFN params per layer
ffn_params = (hidden_size * intermediate_size + intermediate_size * hidden_size) * num_layers
# Layer norm params
layernorm_params = 2 * hidden_size * num_layers
total_params = embedding_params + attention_params + ffn_params + layernorm_params
return {
"estimated_params": total_params,
"estimated_params_billions": round(total_params / 1e9, 2),
"hidden_size": hidden_size,
"num_layers": num_layers,
"num_heads": num_heads,
"vocab_size": vocab_size,
"model_size_mb": round(total_params * 4 / (1024 * 1024), 2), # FP32
"model_size_mb_fp16": round(total_params * 2 / (1024 * 1024), 2), # FP16
}
except Exception as e:
logger.warning(f"Could not estimate parameters: {e}")
return {
"estimated_params": 0,
"estimated_params_billions": 0,
"error": str(e),
}
def get_recommended_settings(model_name: str, task_type: str) -> Dict[str, Any]:
"""Get recommended training settings for a model."""
info = estimate_parameters(model_name)
params_b = info.get("estimated_params_billions", 0.1)
# Base recommendations
settings = {
"batch_size": 1,
"gradient_accumulation_steps": 1,
"learning_rate": "5e-5",
"epochs": 3,
"max_length": 512,
"use_peft": False,
"lora_r": 8,
"warmup_ratio": 0.1,
}
# Adjust based on model size
if params_b > 7: # > 7B parameters
settings["batch_size"] = 1
settings["gradient_accumulation_steps"] = 8
settings["learning_rate"] = "1e-5"
settings["use_peft"] = True
settings["lora_r"] = 8
settings["max_length"] = 256
elif params_b > 3: # > 3B parameters
settings["batch_size"] = 1
settings["gradient_accumulation_steps"] = 4
settings["learning_rate"] = "2e-5"
settings["use_peft"] = True
settings["max_length"] = 512
elif params_b > 1: # > 1B parameters
settings["batch_size"] = 2
settings["gradient_accumulation_steps"] = 2
settings["use_peft"] = True
else: # < 1B parameters
settings["batch_size"] = 4
settings["gradient_accumulation_steps"] = 1
settings["use_peft"] = False
# Task-specific adjustments
if task_type == "seq2seq":
settings["max_length"] = 1024
settings["epochs"] = 5
elif task_type == "token-classification":
settings["max_length"] = 128
settings["learning_rate"] = "2e-5"
elif task_type == "text-classification":
settings["epochs"] = 3
settings["learning_rate"] = "3e-5"
elif task_type == "question-answering":
settings["max_length"] = 384
settings["batch_size"] = 8
return settings
def count_parameters(model: PreTrainedModel) -> Dict[str, int]:
"""Count model parameters."""
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
total = sum(p.numel() for p in model.parameters())
frozen = total - trainable
return {
"trainable": trainable,
"frozen": frozen,
"total": total,
"trainable_percentage": 100 * trainable / total if total > 0 else 0,
}
def get_model_memory_footprint(model: PreTrainedModel) -> Dict[str, float]:
"""Get model memory footprint in MB."""
param_size = sum(p.numel() * p.element_size() for p in model.parameters())
buffer_size = sum(b.numel() * b.element_size() for b in model.buffers())
return {
"parameters_mb": param_size / (1024 * 1024),
"buffers_mb": buffer_size / (1024 * 1024),
"total_mb": (param_size + buffer_size) / (1024 * 1024),
}
def save_model_with_metadata(
model: PreTrainedModel,
tokenizer: PreTrainedTokenizer,
output_dir: str,
training_config: Dict[str, Any],
metrics: Dict[str, float],
) -> Dict[str, str]:
"""Save model with comprehensive metadata."""
import json
from datetime import datetime
os.makedirs(output_dir, exist_ok=True)
# Save model and tokenizer
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
# Get model info
param_info = count_parameters(model)
memory_info = get_model_memory_footprint(model)
# Create comprehensive metadata
metadata = {
"model_name": training_config.get("model_name", "unknown"),
"task_type": training_config.get("task_type", "unknown"),
"training_config": training_config,
"metrics": metrics,
"parameter_info": param_info,
"memory_info": memory_info,
"created_at": datetime.utcnow().isoformat(),
"transformers_version": __import__("transformers").__version__,
"torch_version": __import__("torch").__version__,
"python_version": __import__("sys").version,
}
# Save metadata
metadata_path = os.path.join(output_dir, "training_metadata.json")
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
# Create model card
model_card = create_model_card(training_config, metrics, param_info)
model_card_path = os.path.join(output_dir, "README.md")
with open(model_card_path, "w") as f:
f.write(model_card)
return {
"output_dir": output_dir,
"model_path": output_dir,
"metadata_path": metadata_path,
"model_card_path": model_card_path,
}
def create_model_card(
config: Dict[str, Any],
metrics: Dict[str, float],
param_info: Dict[str, int],
) -> str:
"""Create a model card README."""
model_name = config.get("model_name", "unknown")
task_type = config.get("task_type", "unknown")
metrics_str = "\n".join([f"- {k}: {v:.4f}" if isinstance(v, float) else f"- {k}: {v}" for k, v in metrics.items()]) if metrics else "- No metrics available"
return f"""# {model_name} - Fine-tuned
## Model Details
- **Base Model:** {model_name}
- **Task:** {task_type}
- **Total Parameters:** {param_info.get('total', 0):,}
- **Trainable Parameters:** {param_info.get('trainable', 0):,}
## Training Configuration
```json
{json.dumps(config, indent=2)}
```
## Training Metrics
{metrics_str}
## Usage
```python
from transformers import AutoModel, AutoTokenizer
model = AutoModel.from_pretrained("path/to/model")
tokenizer = AutoTokenizer.from_pretrained("path/to/model")
```
## License
Please refer to the original model's license.
## Training Framework
This model was trained using the Universal Model Trainer.
"""