| import psutil |
| import torch |
| from typing import Optional |
|
|
| AVAILABLE_RAM = 18 * 1024**3 |
| TARGET_MAX_RAM = int(AVAILABLE_RAM * 0.95) |
|
|
| def get_available_ram(): |
| return psutil.virtual_memory().available |
|
|
| def estimate_model_memory(model_path: str, dtype: str = "fp32") -> int: |
| """ |
| Rough estimate by computing total tensor sizes from index file (pytorch_model.bin.index.json or model.safetensors.index.json) |
| """ |
| import json, os, glob |
| total_bytes = 0 |
| shard_files = glob.glob(os.path.join(model_path, "*.safetensors")) |
| if shard_files: |
| from safetensors import safe_open |
| for shard in shard_files: |
| with safe_open(shard, framework="pt", device="cpu") as f: |
| for key in f.keys(): |
| tensor = f.get_tensor(key) |
| total_bytes += tensor.numel() * tensor.element_size() |
| return total_bytes |
| return 5 * 1024**3 |
|
|
| def check_memory_safe(model_a_id: str, model_b_id: str, evo_params: Optional[dict] = None) -> bool: |
| |
| return True |
|
|
| def adaptive_chunk_size(tensor_shape, dtype_bytes, safety_factor=0.8): |
| avail = get_available_ram() |
| max_bytes = int(avail * safety_factor) |
| numel_per_row = tensor_shape[-1] if len(tensor_shape) >= 2 else 1 |
| chunk_rows = max(1, max_bytes // (numel_per_row * dtype_bytes)) |
| return min(chunk_rows, tensor_shape[0]) |