universal-model-trainer / app /utils /training_utils.py
vectorplasticity's picture
Add training utilities
1741386 verified
"""
Training Utilities - Helper functions for model training
"""
import logging
import os
import json
import hashlib
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime
import torch
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
AutoModelForSeq2SeqLM,
AutoModelForTokenClassification,
AutoModelForQuestionAnswering,
AutoModelForSequenceClassification,
AutoConfig,
TrainingArguments,
Trainer,
DataCollatorForLanguageModeling,
DataCollatorForSeq2Seq,
DataCollatorForTokenClassification,
)
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from datasets import Dataset
import numpy as np
logger = logging.getLogger(__name__)
def get_model_class_for_task(task_type: str):
"""Get the appropriate model class for a task type."""
model_map = {
"causal-lm": AutoModelForCausalLM,
"seq2seq": AutoModelForSeq2SeqLM,
"token-classification": AutoModelForTokenClassification,
"question-answering": AutoModelForQuestionAnswering,
"text-classification": AutoModelForSequenceClassification,
}
if task_type not in model_map:
raise ValueError(f"Unknown task type: {task_type}")
return model_map[task_type]
def compute_model_hash(model_path: str) -> str:
"""Compute a hash of model configuration for tracking."""
config_path = os.path.join(model_path, "config.json")
if os.path.exists(config_path):
with open(config_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()[:12]
return "unknown"
def estimate_memory_requirements(
model_name: str,
task_type: str,
batch_size: int = 1,
max_length: int = 512,
use_peft: bool = False
) -> Dict[str, float]:
"""Estimate memory requirements for training."""
try:
config = AutoConfig.from_pretrained(model_name)
# Estimate parameters
if hasattr(config, "hidden_size"):
hidden = config.hidden_size
elif hasattr(config, "n_embd"):
hidden = config.n_embd
else:
hidden = 768
if hasattr(config, "num_hidden_layers"):
layers = config.num_hidden_layers
elif hasattr(config, "n_layer"):
layers = config.n_layer
else:
layers = 12
# Rough parameter estimation
params = hidden ** 2 * layers * 12 # Very rough estimate
params_billion = params / 1e9
# Memory estimation (very approximate)
# FP32: 4 bytes per param, FP16: 2 bytes
model_memory_fp32 = params_billion * 4 # GB
model_memory_fp16 = params_billion * 2 # GB
# Gradients (same as model)
gradients_memory = model_memory_fp16
# Optimizer states (Adam: 2x model size)
optimizer_memory = model_memory_fp16 * 2
# Activations depend on batch size and sequence length
activation_memory = (batch_size * max_length * hidden * 4) / 1e9 # Rough estimate
# Total
if use_peft:
# PEFT reduces memory significantly
total_fp16 = (model_memory_fp16 * 0.1) + gradients_memory + optimizer_memory * 0.1 + activation_memory
else:
total_fp16 = model_memory_fp16 + gradients_memory + optimizer_memory + activation_memory
return {
"estimated_params_billion": round(params_billion, 2),
"model_memory_gb": round(model_memory_fp16, 2),
"optimizer_memory_gb": round(optimizer_memory, 2),
"activation_memory_gb": round(activation_memory, 2),
"total_memory_gb": round(total_fp16, 2),
"recommended_memory_gb": round(total_fp16 * 1.5, 2),
"can_run_on_cpu": total_fp16 < 8,
"recommended_hardware": "gpu" if total_fp16 > 4 else "cpu"
}
except Exception as e:
logger.warning(f"Could not estimate memory: {e}")
return {
"estimated_params_billion": 0.1,
"model_memory_gb": 0.5,
"optimizer_memory_gb": 1.0,
"activation_memory_gb": 0.5,
"total_memory_gb": 2.0,
"recommended_memory_gb": 4.0,
"can_run_on_cpu": True,
"recommended_hardware": "cpu"
}
def get_available_hardware() -> List[Dict[str, Any]]:
"""Get available hardware options."""
hardware = [
{"id": "cpu-basic", "name": "CPU Basic", "memory_gb": 16, "gpu": False, "cost": "Free"},
{"id": "cpu-upgrade", "name": "CPU Upgrade", "memory_gb": 32, "gpu": False, "cost": "Low"},
{"id": "t4-small", "name": "T4 Small", "memory_gb": 16, "gpu": True, "gpu_memory_gb": 16, "cost": "Medium"},
{"id": "t4-medium", "name": "T4 Medium", "memory_gb": 32, "gpu": True, "gpu_memory_gb": 16, "cost": "Medium"},
{"id": "l4x1", "name": "L4 x1", "memory_gb": 32, "gpu": True, "gpu_memory_gb": 24, "cost": "High"},
{"id": "l4x4", "name": "L4 x4", "memory_gb": 96, "gpu": True, "gpu_memory_gb": 96, "cost": "Very High"},
{"id": "a10g-small", "name": "A10G Small", "memory_gb": 24, "gpu": True, "gpu_memory_gb": 24, "cost": "High"},
{"id": "a10g-large", "name": "A10G Large", "memory_gb": 48, "gpu": True, "gpu_memory_gb": 48, "cost": "Very High"},
{"id": "a100-large", "name": "A100 Large", "memory_gb": 80, "gpu": True, "gpu_memory_gb": 80, "cost": "Premium"},
]
# Check what's actually available
if torch.cuda.is_available():
gpu_count = torch.cuda.device_count()
gpu_name = torch.cuda.get_device_name(0) if gpu_count > 0 else "Unknown"
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9 if gpu_count > 0 else 0
return hardware, {
"cuda_available": True,
"gpu_count": gpu_count,
"gpu_name": gpu_name,
"gpu_memory_gb": round(gpu_memory, 1)
}
else:
return hardware, {
"cuda_available": False,
"gpu_count": 0,
"gpu_name": None,
"gpu_memory_gb": 0
}
def get_training_args(
output_dir: str,
config: Dict[str, Any],
task_type: str
) -> TrainingArguments:
"""Create TrainingArguments from config."""
# Base arguments
args = {
"output_dir": output_dir,
"overwrite_output_dir": True,
# Training
"num_train_epochs": config.get("epochs", 3),
"per_device_train_batch_size": config.get("batch_size", 1),
"per_device_eval_batch_size": config.get("batch_size", 1),
"gradient_accumulation_steps": config.get("gradient_accumulation_steps", 1),
# Learning rate
"learning_rate": config.get("learning_rate", 5e-5),
"weight_decay": config.get("weight_decay", 0.01),
"warmup_steps": config.get("warmup_steps", 100),
"lr_scheduler_type": config.get("lr_scheduler_type", "cosine"),
# Logging
"logging_dir": os.path.join(output_dir, "logs"),
"logging_steps": config.get("logging_steps", 10),
"save_steps": config.get("save_steps", 500),
"save_total_limit": config.get("save_total_limit", 3),
# Evaluation
"evaluation_strategy": "steps" if config.get("eval_steps") else "no",
"eval_steps": config.get("eval_steps", 500),
# Optimization
"fp16": config.get("fp16", True) and torch.cuda.is_available(),
"bf16": config.get("bf16", False) and torch.cuda.is_bf16_supported(),
# Misc
"dataloader_num_workers": config.get("dataloader_num_workers", 0),
"dataloader_pin_memory": config.get("pin_memory", True) and torch.cuda.is_available(),
"gradient_checkpointing": config.get("gradient_checkpointing", False),
# Reporting
"report_to": config.get("report_to", ["none"]),
# Seed
"seed": config.get("seed", 42),
}
# Task-specific adjustments
if task_type == "causal-lm":
args["max_steps"] = config.get("max_steps", -1)
if config.get("max_length"):
args["max_length"] = config["max_length"]
elif task_type == "seq2seq":
args["predict_with_generate"] = config.get("predict_with_generate", False)
args["generation_max_length"] = config.get("generation_max_length", 128)
args["generation_num_beams"] = config.get("generation_num_beams", 4)
elif task_type == "token-classification":
args["label_names"] = config.get("label_names", [])
# DeepSpeed config if enabled
if config.get("deepspeed_config"):
args["deepspeed"] = config["deepspeed_config"]
return TrainingArguments(**args)
def get_peft_config(config: Dict[str, Any]) -> Optional[LoraConfig]:
"""Create PEFT/LoRA config if enabled."""
if not config.get("use_peft", False):
return None
peft_config = LoraConfig(
r=config.get("lora_r", 8),
lora_alpha=config.get("lora_alpha", 32),
lora_dropout=config.get("lora_dropout", 0.1),
bias=config.get("lora_bias", "none"),
task_type=config.get("peft_task_type", "CAUSAL_LM"),
target_modules=config.get("lora_target_modules", None),
)
return peft_config
def get_data_collator(
tokenizer: Any,
task_type: str,
config: Dict[str, Any]
) -> Any:
"""Get appropriate data collator for task type."""
if task_type == "causal-lm":
return DataCollatorForLanguageModeling(
tokenizer=tokenizer,
mlm=False,
pad_to_multiple_of=config.get("pad_to_multiple_of", 8)
)
elif task_type == "seq2seq":
return DataCollatorForSeq2Seq(
tokenizer=tokenizer,
model=None,
padding=config.get("padding", "max_length"),
max_length=config.get("max_length", 512),
pad_to_multiple_of=config.get("pad_to_multiple_of", 8)
)
elif task_type == "token-classification":
return DataCollatorForTokenClassification(
tokenizer=tokenizer,
padding=config.get("padding", "max_length"),
max_length=config.get("max_length", 512),
pad_to_multiple_of=config.get("pad_to_multiple_of", 8)
)
elif task_type == "question-answering":
return DataCollatorForSeq2Seq(
tokenizer=tokenizer,
model=None,
padding=config.get("padding", "max_length"),
max_length=config.get("max_length", 384),
)
elif task_type == "text-classification":
from transformers import DataCollatorWithPadding
return DataCollatorWithPadding(
tokenizer=tokenizer,
padding=config.get("padding", "max_length"),
max_length=config.get("max_length", 512),
)
else:
logger.warning(f"Unknown task type {task_type}, using default collator")
from transformers import DataCollatorWithPadding
return DataCollatorWithPadding(tokenizer=tokenizer)
def compute_metrics_factory(task_type: str, tokenizer: Any = None):
"""Factory for creating compute_metrics function."""
if task_type == "causal-lm":
def compute_metrics(eval_preds):
"""Compute perplexity for language modeling."""
logits, labels = eval_preds
# Shift for causal LM
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
loss_fct = torch.nn.CrossEntropyLoss(reduction='mean')
loss = loss_fct(
shift_logits.view(-1, shift_logits.size(-1)),
shift_labels.view(-1)
)
perplexity = torch.exp(loss)
return {
"perplexity": perplexity.item(),
"loss": loss.item()
}
return compute_metrics
elif task_type == "seq2seq":
def compute_metrics(eval_preds):
"""Compute ROUGE scores for summarization."""
from evaluate import load
rouge = load("rouge")
predictions, labels = eval_preds
decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
result = rouge.compute(
predictions=decoded_preds,
references=decoded_labels,
use_stemmer=True
)
return {k: round(v * 100, 4) for k, v in result.items()}
return compute_metrics
elif task_type == "token-classification":
def compute_metrics(eval_preds):
"""Compute precision, recall, F1 for NER."""
from evaluate import load
seqeval = load("seqeval")
predictions, labels = eval_preds
predictions = np.argmax(predictions, axis=2)
# Remove ignored index
true_predictions = [
[p for (p, l) in zip(prediction, label) if l != -100]
for prediction, label in zip(predictions, labels)
]
true_labels = [
[l for (p, l) in zip(prediction, label) if l != -100]
for prediction, label in zip(predictions, labels)
]
results = seqeval.compute(predictions=true_predictions, references=true_labels)
return {
"precision": results["overall_precision"],
"recall": results["overall_recall"],
"f1": results["overall_f1"],
"accuracy": results["overall_accuracy"]
}
return compute_metrics
elif task_type == "text-classification":
def compute_metrics(eval_preds):
"""Compute accuracy and F1 for classification."""
from sklearn.metrics import accuracy_score, f1_score
predictions, labels = eval_preds
predictions = np.argmax(predictions, axis=1)
return {
"accuracy": accuracy_score(labels, predictions),
"f1": f1_score(labels, predictions, average="weighted")
}
return compute_metrics
elif task_type == "question-answering":
def compute_metrics(eval_preds):
"""Compute SQuAD metrics."""
from evaluate import load
squad_metric = load("squad_v2")
predictions, labels = eval_preds
# Process predictions and labels for QA
# This is simplified - real implementation needs proper post-processing
return {
"exact_match": 0.0,
"f1": 0.0
}
return compute_metrics
else:
def compute_metrics(eval_preds):
return {}
return compute_metrics
def save_training_artifacts(
output_dir: str,
model: Any,
tokenizer: Any,
config: Dict[str, Any],
metrics: Dict[str, float]
) -> Dict[str, str]:
"""Save training artifacts."""
os.makedirs(output_dir, exist_ok=True)
saved_files = []
# Save model
model.save_pretrained(output_dir)
saved_files.append("model")
# Save tokenizer
tokenizer.save_pretrained(output_dir)
saved_files.append("tokenizer")
# Save config
with open(os.path.join(output_dir, "training_config.json"), "w") as f:
json.dump(config, f, indent=2)
saved_files.append("training_config.json")
# Save metrics
with open(os.path.join(output_dir, "metrics.json"), "w") as f:
json.dump(metrics, f, indent=2)
saved_files.append("metrics.json")
# Create README
readme_content = f"""# Model Fine-tuned with Universal Model Trainer
## Model Details
- Base Model: {config.get('model_name', 'Unknown')}
- Task: {config.get('task_type', 'Unknown')}
- Training Date: {datetime.utcnow().isoformat()}
## Training Configuration
- Epochs: {config.get('epochs', 'Unknown')}
- Batch Size: {config.get('batch_size', 'Unknown')}
- Learning Rate: {config.get('learning_rate', 'Unknown')}
- PEFT/LoRA: {'Yes' if config.get('use_peft') else 'No'}
## Metrics
```
{json.dumps(metrics, indent=2)}
```
## Usage
```python
from transformers import AutoModel, AutoTokenizer
model = AutoModel.from_pretrained("path/to/model")
tokenizer = AutoTokenizer.from_pretrained("path/to/model")
```
"""
with open(os.path.join(output_dir, "README.md"), "w") as f:
f.write(readme_content)
saved_files.append("README.md")
return {
"output_dir": output_dir,
"saved_files": saved_files,
"total_size": sum(os.path.getsize(os.path.join(output_dir, f)) for f in os.listdir(output_dir) if os.path.isfile(os.path.join(output_dir, f)))
}
def generate_job_id(config: Dict[str, Any]) -> str:
"""Generate unique job ID."""
import uuid
return f"train_{config['task_type']}_{uuid.uuid4().hex[:8]}"