""" Training Utilities - Helper functions for model training """ import logging import os import json import hashlib from typing import Dict, Any, List, Optional, Tuple from datetime import datetime import torch from transformers import ( AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM, AutoModelForTokenClassification, AutoModelForQuestionAnswering, AutoModelForSequenceClassification, AutoConfig, TrainingArguments, Trainer, DataCollatorForLanguageModeling, DataCollatorForSeq2Seq, DataCollatorForTokenClassification, ) from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training from datasets import Dataset import numpy as np logger = logging.getLogger(__name__) def get_model_class_for_task(task_type: str): """Get the appropriate model class for a task type.""" model_map = { "causal-lm": AutoModelForCausalLM, "seq2seq": AutoModelForSeq2SeqLM, "token-classification": AutoModelForTokenClassification, "question-answering": AutoModelForQuestionAnswering, "text-classification": AutoModelForSequenceClassification, } if task_type not in model_map: raise ValueError(f"Unknown task type: {task_type}") return model_map[task_type] def compute_model_hash(model_path: str) -> str: """Compute a hash of model configuration for tracking.""" config_path = os.path.join(model_path, "config.json") if os.path.exists(config_path): with open(config_path, "rb") as f: return hashlib.md5(f.read()).hexdigest()[:12] return "unknown" def estimate_memory_requirements( model_name: str, task_type: str, batch_size: int = 1, max_length: int = 512, use_peft: bool = False ) -> Dict[str, float]: """Estimate memory requirements for training.""" try: config = AutoConfig.from_pretrained(model_name) # Estimate parameters if hasattr(config, "hidden_size"): hidden = config.hidden_size elif hasattr(config, "n_embd"): hidden = config.n_embd else: hidden = 768 if hasattr(config, "num_hidden_layers"): layers = config.num_hidden_layers elif hasattr(config, "n_layer"): layers = config.n_layer else: layers = 12 # Rough parameter estimation params = hidden ** 2 * layers * 12 # Very rough estimate params_billion = params / 1e9 # Memory estimation (very approximate) # FP32: 4 bytes per param, FP16: 2 bytes model_memory_fp32 = params_billion * 4 # GB model_memory_fp16 = params_billion * 2 # GB # Gradients (same as model) gradients_memory = model_memory_fp16 # Optimizer states (Adam: 2x model size) optimizer_memory = model_memory_fp16 * 2 # Activations depend on batch size and sequence length activation_memory = (batch_size * max_length * hidden * 4) / 1e9 # Rough estimate # Total if use_peft: # PEFT reduces memory significantly total_fp16 = (model_memory_fp16 * 0.1) + gradients_memory + optimizer_memory * 0.1 + activation_memory else: total_fp16 = model_memory_fp16 + gradients_memory + optimizer_memory + activation_memory return { "estimated_params_billion": round(params_billion, 2), "model_memory_gb": round(model_memory_fp16, 2), "optimizer_memory_gb": round(optimizer_memory, 2), "activation_memory_gb": round(activation_memory, 2), "total_memory_gb": round(total_fp16, 2), "recommended_memory_gb": round(total_fp16 * 1.5, 2), "can_run_on_cpu": total_fp16 < 8, "recommended_hardware": "gpu" if total_fp16 > 4 else "cpu" } except Exception as e: logger.warning(f"Could not estimate memory: {e}") return { "estimated_params_billion": 0.1, "model_memory_gb": 0.5, "optimizer_memory_gb": 1.0, "activation_memory_gb": 0.5, "total_memory_gb": 2.0, "recommended_memory_gb": 4.0, "can_run_on_cpu": True, "recommended_hardware": "cpu" } def get_available_hardware() -> List[Dict[str, Any]]: """Get available hardware options.""" hardware = [ {"id": "cpu-basic", "name": "CPU Basic", "memory_gb": 16, "gpu": False, "cost": "Free"}, {"id": "cpu-upgrade", "name": "CPU Upgrade", "memory_gb": 32, "gpu": False, "cost": "Low"}, {"id": "t4-small", "name": "T4 Small", "memory_gb": 16, "gpu": True, "gpu_memory_gb": 16, "cost": "Medium"}, {"id": "t4-medium", "name": "T4 Medium", "memory_gb": 32, "gpu": True, "gpu_memory_gb": 16, "cost": "Medium"}, {"id": "l4x1", "name": "L4 x1", "memory_gb": 32, "gpu": True, "gpu_memory_gb": 24, "cost": "High"}, {"id": "l4x4", "name": "L4 x4", "memory_gb": 96, "gpu": True, "gpu_memory_gb": 96, "cost": "Very High"}, {"id": "a10g-small", "name": "A10G Small", "memory_gb": 24, "gpu": True, "gpu_memory_gb": 24, "cost": "High"}, {"id": "a10g-large", "name": "A10G Large", "memory_gb": 48, "gpu": True, "gpu_memory_gb": 48, "cost": "Very High"}, {"id": "a100-large", "name": "A100 Large", "memory_gb": 80, "gpu": True, "gpu_memory_gb": 80, "cost": "Premium"}, ] # Check what's actually available if torch.cuda.is_available(): gpu_count = torch.cuda.device_count() gpu_name = torch.cuda.get_device_name(0) if gpu_count > 0 else "Unknown" gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9 if gpu_count > 0 else 0 return hardware, { "cuda_available": True, "gpu_count": gpu_count, "gpu_name": gpu_name, "gpu_memory_gb": round(gpu_memory, 1) } else: return hardware, { "cuda_available": False, "gpu_count": 0, "gpu_name": None, "gpu_memory_gb": 0 } def get_training_args( output_dir: str, config: Dict[str, Any], task_type: str ) -> TrainingArguments: """Create TrainingArguments from config.""" # Base arguments args = { "output_dir": output_dir, "overwrite_output_dir": True, # Training "num_train_epochs": config.get("epochs", 3), "per_device_train_batch_size": config.get("batch_size", 1), "per_device_eval_batch_size": config.get("batch_size", 1), "gradient_accumulation_steps": config.get("gradient_accumulation_steps", 1), # Learning rate "learning_rate": config.get("learning_rate", 5e-5), "weight_decay": config.get("weight_decay", 0.01), "warmup_steps": config.get("warmup_steps", 100), "lr_scheduler_type": config.get("lr_scheduler_type", "cosine"), # Logging "logging_dir": os.path.join(output_dir, "logs"), "logging_steps": config.get("logging_steps", 10), "save_steps": config.get("save_steps", 500), "save_total_limit": config.get("save_total_limit", 3), # Evaluation "evaluation_strategy": "steps" if config.get("eval_steps") else "no", "eval_steps": config.get("eval_steps", 500), # Optimization "fp16": config.get("fp16", True) and torch.cuda.is_available(), "bf16": config.get("bf16", False) and torch.cuda.is_bf16_supported(), # Misc "dataloader_num_workers": config.get("dataloader_num_workers", 0), "dataloader_pin_memory": config.get("pin_memory", True) and torch.cuda.is_available(), "gradient_checkpointing": config.get("gradient_checkpointing", False), # Reporting "report_to": config.get("report_to", ["none"]), # Seed "seed": config.get("seed", 42), } # Task-specific adjustments if task_type == "causal-lm": args["max_steps"] = config.get("max_steps", -1) if config.get("max_length"): args["max_length"] = config["max_length"] elif task_type == "seq2seq": args["predict_with_generate"] = config.get("predict_with_generate", False) args["generation_max_length"] = config.get("generation_max_length", 128) args["generation_num_beams"] = config.get("generation_num_beams", 4) elif task_type == "token-classification": args["label_names"] = config.get("label_names", []) # DeepSpeed config if enabled if config.get("deepspeed_config"): args["deepspeed"] = config["deepspeed_config"] return TrainingArguments(**args) def get_peft_config(config: Dict[str, Any]) -> Optional[LoraConfig]: """Create PEFT/LoRA config if enabled.""" if not config.get("use_peft", False): return None peft_config = LoraConfig( r=config.get("lora_r", 8), lora_alpha=config.get("lora_alpha", 32), lora_dropout=config.get("lora_dropout", 0.1), bias=config.get("lora_bias", "none"), task_type=config.get("peft_task_type", "CAUSAL_LM"), target_modules=config.get("lora_target_modules", None), ) return peft_config def get_data_collator( tokenizer: Any, task_type: str, config: Dict[str, Any] ) -> Any: """Get appropriate data collator for task type.""" if task_type == "causal-lm": return DataCollatorForLanguageModeling( tokenizer=tokenizer, mlm=False, pad_to_multiple_of=config.get("pad_to_multiple_of", 8) ) elif task_type == "seq2seq": return DataCollatorForSeq2Seq( tokenizer=tokenizer, model=None, padding=config.get("padding", "max_length"), max_length=config.get("max_length", 512), pad_to_multiple_of=config.get("pad_to_multiple_of", 8) ) elif task_type == "token-classification": return DataCollatorForTokenClassification( tokenizer=tokenizer, padding=config.get("padding", "max_length"), max_length=config.get("max_length", 512), pad_to_multiple_of=config.get("pad_to_multiple_of", 8) ) elif task_type == "question-answering": return DataCollatorForSeq2Seq( tokenizer=tokenizer, model=None, padding=config.get("padding", "max_length"), max_length=config.get("max_length", 384), ) elif task_type == "text-classification": from transformers import DataCollatorWithPadding return DataCollatorWithPadding( tokenizer=tokenizer, padding=config.get("padding", "max_length"), max_length=config.get("max_length", 512), ) else: logger.warning(f"Unknown task type {task_type}, using default collator") from transformers import DataCollatorWithPadding return DataCollatorWithPadding(tokenizer=tokenizer) def compute_metrics_factory(task_type: str, tokenizer: Any = None): """Factory for creating compute_metrics function.""" if task_type == "causal-lm": def compute_metrics(eval_preds): """Compute perplexity for language modeling.""" logits, labels = eval_preds # Shift for causal LM shift_logits = logits[..., :-1, :].contiguous() shift_labels = labels[..., 1:].contiguous() loss_fct = torch.nn.CrossEntropyLoss(reduction='mean') loss = loss_fct( shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1) ) perplexity = torch.exp(loss) return { "perplexity": perplexity.item(), "loss": loss.item() } return compute_metrics elif task_type == "seq2seq": def compute_metrics(eval_preds): """Compute ROUGE scores for summarization.""" from evaluate import load rouge = load("rouge") predictions, labels = eval_preds decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True) labels = np.where(labels != -100, labels, tokenizer.pad_token_id) decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True) result = rouge.compute( predictions=decoded_preds, references=decoded_labels, use_stemmer=True ) return {k: round(v * 100, 4) for k, v in result.items()} return compute_metrics elif task_type == "token-classification": def compute_metrics(eval_preds): """Compute precision, recall, F1 for NER.""" from evaluate import load seqeval = load("seqeval") predictions, labels = eval_preds predictions = np.argmax(predictions, axis=2) # Remove ignored index true_predictions = [ [p for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels) ] true_labels = [ [l for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels) ] results = seqeval.compute(predictions=true_predictions, references=true_labels) return { "precision": results["overall_precision"], "recall": results["overall_recall"], "f1": results["overall_f1"], "accuracy": results["overall_accuracy"] } return compute_metrics elif task_type == "text-classification": def compute_metrics(eval_preds): """Compute accuracy and F1 for classification.""" from sklearn.metrics import accuracy_score, f1_score predictions, labels = eval_preds predictions = np.argmax(predictions, axis=1) return { "accuracy": accuracy_score(labels, predictions), "f1": f1_score(labels, predictions, average="weighted") } return compute_metrics elif task_type == "question-answering": def compute_metrics(eval_preds): """Compute SQuAD metrics.""" from evaluate import load squad_metric = load("squad_v2") predictions, labels = eval_preds # Process predictions and labels for QA # This is simplified - real implementation needs proper post-processing return { "exact_match": 0.0, "f1": 0.0 } return compute_metrics else: def compute_metrics(eval_preds): return {} return compute_metrics def save_training_artifacts( output_dir: str, model: Any, tokenizer: Any, config: Dict[str, Any], metrics: Dict[str, float] ) -> Dict[str, str]: """Save training artifacts.""" os.makedirs(output_dir, exist_ok=True) saved_files = [] # Save model model.save_pretrained(output_dir) saved_files.append("model") # Save tokenizer tokenizer.save_pretrained(output_dir) saved_files.append("tokenizer") # Save config with open(os.path.join(output_dir, "training_config.json"), "w") as f: json.dump(config, f, indent=2) saved_files.append("training_config.json") # Save metrics with open(os.path.join(output_dir, "metrics.json"), "w") as f: json.dump(metrics, f, indent=2) saved_files.append("metrics.json") # Create README readme_content = f"""# Model Fine-tuned with Universal Model Trainer ## Model Details - Base Model: {config.get('model_name', 'Unknown')} - Task: {config.get('task_type', 'Unknown')} - Training Date: {datetime.utcnow().isoformat()} ## Training Configuration - Epochs: {config.get('epochs', 'Unknown')} - Batch Size: {config.get('batch_size', 'Unknown')} - Learning Rate: {config.get('learning_rate', 'Unknown')} - PEFT/LoRA: {'Yes' if config.get('use_peft') else 'No'} ## Metrics ``` {json.dumps(metrics, indent=2)} ``` ## Usage ```python from transformers import AutoModel, AutoTokenizer model = AutoModel.from_pretrained("path/to/model") tokenizer = AutoTokenizer.from_pretrained("path/to/model") ``` """ with open(os.path.join(output_dir, "README.md"), "w") as f: f.write(readme_content) saved_files.append("README.md") return { "output_dir": output_dir, "saved_files": saved_files, "total_size": sum(os.path.getsize(os.path.join(output_dir, f)) for f in os.listdir(output_dir) if os.path.isfile(os.path.join(output_dir, f))) } def generate_job_id(config: Dict[str, Any]) -> str: """Generate unique job ID.""" import uuid return f"train_{config['task_type']}_{uuid.uuid4().hex[:8]}"