| """ |
| Training Utilities - Helper functions for model training |
| """ |
|
|
| import logging |
| import os |
| import json |
| import hashlib |
| from typing import Dict, Any, List, Optional, Tuple |
| from datetime import datetime |
| import torch |
| from transformers import ( |
| AutoTokenizer, |
| AutoModelForCausalLM, |
| AutoModelForSeq2SeqLM, |
| AutoModelForTokenClassification, |
| AutoModelForQuestionAnswering, |
| AutoModelForSequenceClassification, |
| AutoConfig, |
| TrainingArguments, |
| Trainer, |
| DataCollatorForLanguageModeling, |
| DataCollatorForSeq2Seq, |
| DataCollatorForTokenClassification, |
| ) |
| from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training |
| from datasets import Dataset |
| import numpy as np |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def get_model_class_for_task(task_type: str): |
| """Get the appropriate model class for a task type.""" |
| model_map = { |
| "causal-lm": AutoModelForCausalLM, |
| "seq2seq": AutoModelForSeq2SeqLM, |
| "token-classification": AutoModelForTokenClassification, |
| "question-answering": AutoModelForQuestionAnswering, |
| "text-classification": AutoModelForSequenceClassification, |
| } |
| |
| if task_type not in model_map: |
| raise ValueError(f"Unknown task type: {task_type}") |
| |
| return model_map[task_type] |
|
|
|
|
| def compute_model_hash(model_path: str) -> str: |
| """Compute a hash of model configuration for tracking.""" |
| config_path = os.path.join(model_path, "config.json") |
| if os.path.exists(config_path): |
| with open(config_path, "rb") as f: |
| return hashlib.md5(f.read()).hexdigest()[:12] |
| return "unknown" |
|
|
|
|
| def estimate_memory_requirements( |
| model_name: str, |
| task_type: str, |
| batch_size: int = 1, |
| max_length: int = 512, |
| use_peft: bool = False |
| ) -> Dict[str, float]: |
| """Estimate memory requirements for training.""" |
| try: |
| config = AutoConfig.from_pretrained(model_name) |
| |
| |
| if hasattr(config, "hidden_size"): |
| hidden = config.hidden_size |
| elif hasattr(config, "n_embd"): |
| hidden = config.n_embd |
| else: |
| hidden = 768 |
| |
| if hasattr(config, "num_hidden_layers"): |
| layers = config.num_hidden_layers |
| elif hasattr(config, "n_layer"): |
| layers = config.n_layer |
| else: |
| layers = 12 |
| |
| |
| params = hidden ** 2 * layers * 12 |
| params_billion = params / 1e9 |
| |
| |
| |
| model_memory_fp32 = params_billion * 4 |
| model_memory_fp16 = params_billion * 2 |
| |
| |
| gradients_memory = model_memory_fp16 |
| |
| |
| optimizer_memory = model_memory_fp16 * 2 |
| |
| |
| activation_memory = (batch_size * max_length * hidden * 4) / 1e9 |
| |
| |
| if use_peft: |
| |
| total_fp16 = (model_memory_fp16 * 0.1) + gradients_memory + optimizer_memory * 0.1 + activation_memory |
| else: |
| total_fp16 = model_memory_fp16 + gradients_memory + optimizer_memory + activation_memory |
| |
| return { |
| "estimated_params_billion": round(params_billion, 2), |
| "model_memory_gb": round(model_memory_fp16, 2), |
| "optimizer_memory_gb": round(optimizer_memory, 2), |
| "activation_memory_gb": round(activation_memory, 2), |
| "total_memory_gb": round(total_fp16, 2), |
| "recommended_memory_gb": round(total_fp16 * 1.5, 2), |
| "can_run_on_cpu": total_fp16 < 8, |
| "recommended_hardware": "gpu" if total_fp16 > 4 else "cpu" |
| } |
| |
| except Exception as e: |
| logger.warning(f"Could not estimate memory: {e}") |
| return { |
| "estimated_params_billion": 0.1, |
| "model_memory_gb": 0.5, |
| "optimizer_memory_gb": 1.0, |
| "activation_memory_gb": 0.5, |
| "total_memory_gb": 2.0, |
| "recommended_memory_gb": 4.0, |
| "can_run_on_cpu": True, |
| "recommended_hardware": "cpu" |
| } |
|
|
|
|
| def get_available_hardware() -> List[Dict[str, Any]]: |
| """Get available hardware options.""" |
| hardware = [ |
| {"id": "cpu-basic", "name": "CPU Basic", "memory_gb": 16, "gpu": False, "cost": "Free"}, |
| {"id": "cpu-upgrade", "name": "CPU Upgrade", "memory_gb": 32, "gpu": False, "cost": "Low"}, |
| {"id": "t4-small", "name": "T4 Small", "memory_gb": 16, "gpu": True, "gpu_memory_gb": 16, "cost": "Medium"}, |
| {"id": "t4-medium", "name": "T4 Medium", "memory_gb": 32, "gpu": True, "gpu_memory_gb": 16, "cost": "Medium"}, |
| {"id": "l4x1", "name": "L4 x1", "memory_gb": 32, "gpu": True, "gpu_memory_gb": 24, "cost": "High"}, |
| {"id": "l4x4", "name": "L4 x4", "memory_gb": 96, "gpu": True, "gpu_memory_gb": 96, "cost": "Very High"}, |
| {"id": "a10g-small", "name": "A10G Small", "memory_gb": 24, "gpu": True, "gpu_memory_gb": 24, "cost": "High"}, |
| {"id": "a10g-large", "name": "A10G Large", "memory_gb": 48, "gpu": True, "gpu_memory_gb": 48, "cost": "Very High"}, |
| {"id": "a100-large", "name": "A100 Large", "memory_gb": 80, "gpu": True, "gpu_memory_gb": 80, "cost": "Premium"}, |
| ] |
| |
| |
| if torch.cuda.is_available(): |
| gpu_count = torch.cuda.device_count() |
| gpu_name = torch.cuda.get_device_name(0) if gpu_count > 0 else "Unknown" |
| gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9 if gpu_count > 0 else 0 |
| |
| return hardware, { |
| "cuda_available": True, |
| "gpu_count": gpu_count, |
| "gpu_name": gpu_name, |
| "gpu_memory_gb": round(gpu_memory, 1) |
| } |
| else: |
| return hardware, { |
| "cuda_available": False, |
| "gpu_count": 0, |
| "gpu_name": None, |
| "gpu_memory_gb": 0 |
| } |
|
|
|
|
| def get_training_args( |
| output_dir: str, |
| config: Dict[str, Any], |
| task_type: str |
| ) -> TrainingArguments: |
| """Create TrainingArguments from config.""" |
| |
| |
| args = { |
| "output_dir": output_dir, |
| "overwrite_output_dir": True, |
| |
| |
| "num_train_epochs": config.get("epochs", 3), |
| "per_device_train_batch_size": config.get("batch_size", 1), |
| "per_device_eval_batch_size": config.get("batch_size", 1), |
| "gradient_accumulation_steps": config.get("gradient_accumulation_steps", 1), |
| |
| |
| "learning_rate": config.get("learning_rate", 5e-5), |
| "weight_decay": config.get("weight_decay", 0.01), |
| "warmup_steps": config.get("warmup_steps", 100), |
| "lr_scheduler_type": config.get("lr_scheduler_type", "cosine"), |
| |
| |
| "logging_dir": os.path.join(output_dir, "logs"), |
| "logging_steps": config.get("logging_steps", 10), |
| "save_steps": config.get("save_steps", 500), |
| "save_total_limit": config.get("save_total_limit", 3), |
| |
| |
| "evaluation_strategy": "steps" if config.get("eval_steps") else "no", |
| "eval_steps": config.get("eval_steps", 500), |
| |
| |
| "fp16": config.get("fp16", True) and torch.cuda.is_available(), |
| "bf16": config.get("bf16", False) and torch.cuda.is_bf16_supported(), |
| |
| |
| "dataloader_num_workers": config.get("dataloader_num_workers", 0), |
| "dataloader_pin_memory": config.get("pin_memory", True) and torch.cuda.is_available(), |
| "gradient_checkpointing": config.get("gradient_checkpointing", False), |
| |
| |
| "report_to": config.get("report_to", ["none"]), |
| |
| |
| "seed": config.get("seed", 42), |
| } |
| |
| |
| if task_type == "causal-lm": |
| args["max_steps"] = config.get("max_steps", -1) |
| if config.get("max_length"): |
| args["max_length"] = config["max_length"] |
| |
| elif task_type == "seq2seq": |
| args["predict_with_generate"] = config.get("predict_with_generate", False) |
| args["generation_max_length"] = config.get("generation_max_length", 128) |
| args["generation_num_beams"] = config.get("generation_num_beams", 4) |
| |
| elif task_type == "token-classification": |
| args["label_names"] = config.get("label_names", []) |
| |
| |
| if config.get("deepspeed_config"): |
| args["deepspeed"] = config["deepspeed_config"] |
| |
| return TrainingArguments(**args) |
|
|
|
|
| def get_peft_config(config: Dict[str, Any]) -> Optional[LoraConfig]: |
| """Create PEFT/LoRA config if enabled.""" |
| if not config.get("use_peft", False): |
| return None |
| |
| peft_config = LoraConfig( |
| r=config.get("lora_r", 8), |
| lora_alpha=config.get("lora_alpha", 32), |
| lora_dropout=config.get("lora_dropout", 0.1), |
| bias=config.get("lora_bias", "none"), |
| task_type=config.get("peft_task_type", "CAUSAL_LM"), |
| target_modules=config.get("lora_target_modules", None), |
| ) |
| |
| return peft_config |
|
|
|
|
| def get_data_collator( |
| tokenizer: Any, |
| task_type: str, |
| config: Dict[str, Any] |
| ) -> Any: |
| """Get appropriate data collator for task type.""" |
| |
| if task_type == "causal-lm": |
| return DataCollatorForLanguageModeling( |
| tokenizer=tokenizer, |
| mlm=False, |
| pad_to_multiple_of=config.get("pad_to_multiple_of", 8) |
| ) |
| |
| elif task_type == "seq2seq": |
| return DataCollatorForSeq2Seq( |
| tokenizer=tokenizer, |
| model=None, |
| padding=config.get("padding", "max_length"), |
| max_length=config.get("max_length", 512), |
| pad_to_multiple_of=config.get("pad_to_multiple_of", 8) |
| ) |
| |
| elif task_type == "token-classification": |
| return DataCollatorForTokenClassification( |
| tokenizer=tokenizer, |
| padding=config.get("padding", "max_length"), |
| max_length=config.get("max_length", 512), |
| pad_to_multiple_of=config.get("pad_to_multiple_of", 8) |
| ) |
| |
| elif task_type == "question-answering": |
| return DataCollatorForSeq2Seq( |
| tokenizer=tokenizer, |
| model=None, |
| padding=config.get("padding", "max_length"), |
| max_length=config.get("max_length", 384), |
| ) |
| |
| elif task_type == "text-classification": |
| from transformers import DataCollatorWithPadding |
| return DataCollatorWithPadding( |
| tokenizer=tokenizer, |
| padding=config.get("padding", "max_length"), |
| max_length=config.get("max_length", 512), |
| ) |
| |
| else: |
| logger.warning(f"Unknown task type {task_type}, using default collator") |
| from transformers import DataCollatorWithPadding |
| return DataCollatorWithPadding(tokenizer=tokenizer) |
|
|
|
|
| def compute_metrics_factory(task_type: str, tokenizer: Any = None): |
| """Factory for creating compute_metrics function.""" |
| |
| if task_type == "causal-lm": |
| def compute_metrics(eval_preds): |
| """Compute perplexity for language modeling.""" |
| logits, labels = eval_preds |
| |
| shift_logits = logits[..., :-1, :].contiguous() |
| shift_labels = labels[..., 1:].contiguous() |
| |
| loss_fct = torch.nn.CrossEntropyLoss(reduction='mean') |
| loss = loss_fct( |
| shift_logits.view(-1, shift_logits.size(-1)), |
| shift_labels.view(-1) |
| ) |
| perplexity = torch.exp(loss) |
| |
| return { |
| "perplexity": perplexity.item(), |
| "loss": loss.item() |
| } |
| return compute_metrics |
| |
| elif task_type == "seq2seq": |
| def compute_metrics(eval_preds): |
| """Compute ROUGE scores for summarization.""" |
| from evaluate import load |
| rouge = load("rouge") |
| |
| predictions, labels = eval_preds |
| decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True) |
| labels = np.where(labels != -100, labels, tokenizer.pad_token_id) |
| decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True) |
| |
| result = rouge.compute( |
| predictions=decoded_preds, |
| references=decoded_labels, |
| use_stemmer=True |
| ) |
| |
| return {k: round(v * 100, 4) for k, v in result.items()} |
| return compute_metrics |
| |
| elif task_type == "token-classification": |
| def compute_metrics(eval_preds): |
| """Compute precision, recall, F1 for NER.""" |
| from evaluate import load |
| seqeval = load("seqeval") |
| |
| predictions, labels = eval_preds |
| predictions = np.argmax(predictions, axis=2) |
| |
| |
| true_predictions = [ |
| [p for (p, l) in zip(prediction, label) if l != -100] |
| for prediction, label in zip(predictions, labels) |
| ] |
| true_labels = [ |
| [l for (p, l) in zip(prediction, label) if l != -100] |
| for prediction, label in zip(predictions, labels) |
| ] |
| |
| results = seqeval.compute(predictions=true_predictions, references=true_labels) |
| |
| return { |
| "precision": results["overall_precision"], |
| "recall": results["overall_recall"], |
| "f1": results["overall_f1"], |
| "accuracy": results["overall_accuracy"] |
| } |
| return compute_metrics |
| |
| elif task_type == "text-classification": |
| def compute_metrics(eval_preds): |
| """Compute accuracy and F1 for classification.""" |
| from sklearn.metrics import accuracy_score, f1_score |
| |
| predictions, labels = eval_preds |
| predictions = np.argmax(predictions, axis=1) |
| |
| return { |
| "accuracy": accuracy_score(labels, predictions), |
| "f1": f1_score(labels, predictions, average="weighted") |
| } |
| return compute_metrics |
| |
| elif task_type == "question-answering": |
| def compute_metrics(eval_preds): |
| """Compute SQuAD metrics.""" |
| from evaluate import load |
| squad_metric = load("squad_v2") |
| |
| predictions, labels = eval_preds |
| |
| |
| |
| return { |
| "exact_match": 0.0, |
| "f1": 0.0 |
| } |
| return compute_metrics |
| |
| else: |
| def compute_metrics(eval_preds): |
| return {} |
| return compute_metrics |
|
|
|
|
| def save_training_artifacts( |
| output_dir: str, |
| model: Any, |
| tokenizer: Any, |
| config: Dict[str, Any], |
| metrics: Dict[str, float] |
| ) -> Dict[str, str]: |
| """Save training artifacts.""" |
| os.makedirs(output_dir, exist_ok=True) |
| |
| saved_files = [] |
| |
| |
| model.save_pretrained(output_dir) |
| saved_files.append("model") |
| |
| |
| tokenizer.save_pretrained(output_dir) |
| saved_files.append("tokenizer") |
| |
| |
| with open(os.path.join(output_dir, "training_config.json"), "w") as f: |
| json.dump(config, f, indent=2) |
| saved_files.append("training_config.json") |
| |
| |
| with open(os.path.join(output_dir, "metrics.json"), "w") as f: |
| json.dump(metrics, f, indent=2) |
| saved_files.append("metrics.json") |
| |
| |
| readme_content = f"""# Model Fine-tuned with Universal Model Trainer |
| |
| ## Model Details |
| - Base Model: {config.get('model_name', 'Unknown')} |
| - Task: {config.get('task_type', 'Unknown')} |
| - Training Date: {datetime.utcnow().isoformat()} |
| |
| ## Training Configuration |
| - Epochs: {config.get('epochs', 'Unknown')} |
| - Batch Size: {config.get('batch_size', 'Unknown')} |
| - Learning Rate: {config.get('learning_rate', 'Unknown')} |
| - PEFT/LoRA: {'Yes' if config.get('use_peft') else 'No'} |
| |
| ## Metrics |
| ``` |
| {json.dumps(metrics, indent=2)} |
| ``` |
| |
| ## Usage |
| ```python |
| from transformers import AutoModel, AutoTokenizer |
| |
| model = AutoModel.from_pretrained("path/to/model") |
| tokenizer = AutoTokenizer.from_pretrained("path/to/model") |
| ``` |
| """ |
| |
| with open(os.path.join(output_dir, "README.md"), "w") as f: |
| f.write(readme_content) |
| saved_files.append("README.md") |
| |
| return { |
| "output_dir": output_dir, |
| "saved_files": saved_files, |
| "total_size": sum(os.path.getsize(os.path.join(output_dir, f)) for f in os.listdir(output_dir) if os.path.isfile(os.path.join(output_dir, f))) |
| } |
|
|
|
|
| def generate_job_id(config: Dict[str, Any]) -> str: |
| """Generate unique job ID.""" |
| import uuid |
| return f"train_{config['task_type']}_{uuid.uuid4().hex[:8]}" |