| import torch |
| from safetensors.torch import load_file, save_file |
| import logging |
| from typing import Dict, List, Optional |
| import time |
| from pathlib import Path |
| import sys |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(levelname)s - %(message)s", |
| handlers=[ |
| logging.StreamHandler(sys.stdout), |
| logging.FileHandler("model_operations.log") |
| ] |
| ) |
|
|
| class ModelHandler: |
| """Class to handle model operations with improved efficiency.""" |
| |
| DEFAULT_CHECKPOINT = Path("Model_4_of_10.safetensors") |
| |
| def __init__(self, checkpoint_path: str | Path = DEFAULT_CHECKPOINT): |
| self.checkpoint_path = Path(checkpoint_path) |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| |
| def _log_time(self, operation: str, start_time: float) -> None: |
| """Helper method for consistent timing logging.""" |
| elapsed = time.time() - start_time |
| logging.info(f"{operation} completed in {elapsed:.2f} seconds") |
|
|
| def load_model(self) -> Dict[str, torch.Tensor]: |
| """Loads model with memory-efficient handling.""" |
| start_time = time.time() |
| try: |
| logging.info(f"Loading model from {self.checkpoint_path}") |
| |
| model_data = load_file(str(self.checkpoint_path), device="cpu") |
| for key in model_data: |
| model_data[key] = model_data[key].to(self.device) |
| self._log_time("Model loading", start_time) |
| return model_data |
| except Exception as e: |
| logging.error(f"Model loading failed: {str(e)}") |
| raise RuntimeError(f"Failed to load model: {str(e)}") from e |
|
|
| def save_model(self, model_tensors: Dict[str, torch.Tensor]) -> None: |
| """Saves model with validation and error handling.""" |
| start_time = time.time() |
| try: |
| logging.info(f"Saving model to {self.checkpoint_path}") |
| self.checkpoint_path.parent.mkdir(parents=True, exist_ok=True) |
| save_file(model_tensors, str(self.checkpoint_path)) |
| self._log_time("Model saving", start_time) |
| except Exception as e: |
| logging.error(f"Model saving failed: {str(e)}") |
| raise RuntimeError(f"Failed to save model: {str(e)}") from e |
|
|
| def initialize_model( |
| self, |
| layers: List[int] = [8192, 16384, 32768], |
| dtype: torch.dtype = torch.bfloat16, |
| seed: Optional[int] = 42 |
| ) -> Dict[str, torch.Tensor]: |
| """Initializes model with optimized parameters.""" |
| if seed is not None: |
| torch.manual_seed(seed) |
| |
| model_tensors = {} |
| start_time = time.time() |
| try: |
| for i, size in enumerate(layers, 1): |
| layer_name = f"layer_{i}" |
| logging.info(f"Initializing {layer_name} [{size}x{size}] on {self.device}") |
| |
| tensor = torch.randn(size, size, dtype=dtype, device=self.device) * (1.0 / size ** 0.5) |
| model_tensors[layer_name] = tensor |
| self._log_time("Model initialization", start_time) |
| return model_tensors |
| except Exception as e: |
| logging.error(f"Model initialization failed: {str(e)}") |
| raise RuntimeError(f"Failed to initialize model: {str(e)}") from e |
|
|
| def verify_model( |
| self, |
| original: Dict[str, torch.Tensor], |
| loaded: Dict[str, torch.Tensor], |
| atol: float = 1e-5, |
| rtol: float = 1e-3 |
| ) -> bool: |
| """Verifies model integrity with detailed comparison.""" |
| all_match = True |
| for key in original: |
| if key not in loaded: |
| logging.warning(f"Missing tensor: {key}") |
| all_match = False |
| continue |
| |
| orig, load = original[key], loaded[key] |
| try: |
| if orig.shape != load.shape: |
| logging.warning(f"Shape mismatch in {key}: {orig.shape} vs {load.shape}") |
| all_match = False |
| continue |
| |
| if not torch.allclose(orig, load, atol=atol, rtol=rtol): |
| diff = torch.max(torch.abs(orig - load)) |
| logging.warning(f"Mismatch in {key}: max diff = {diff}") |
| all_match = False |
| else: |
| logging.info(f"Tensor {key} verified (shape: {orig.shape})") |
| except Exception as e: |
| logging.error(f"Verification failed for {key}: {str(e)}") |
| all_match = False |
| return all_match |
|
|
| def main(): |
| """Main execution flow.""" |
| try: |
| |
| handler = ModelHandler() |
| |
| |
| model_data = handler.initialize_model() |
| handler.save_model(model_data) |
| |
| |
| loaded_model_data = handler.load_model() |
| is_valid = handler.verify_model(model_data, loaded_model_data) |
| |
| logging.info(f"Model verification {'passed' if is_valid else 'failed'}") |
| return 0 |
| |
| except Exception as e: |
| logging.error(f"Execution failed: {str(e)}") |
| return 1 |
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |