| """Simplified performance predictor for MLPerf configurations using XGBoost.""" |
|
|
| import logging |
| import random |
| from collections import Counter, defaultdict |
|
|
| import numpy as np |
| import pandas as pd |
| import xgboost as xgb |
| from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score |
| from sklearn.model_selection import train_test_split |
| from utils import FEATURE_TYPES |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class PerformancePredictor: |
| """Predicts performance for hardware configurations.""" |
|
|
| def __init__(self, dataset: pd.DataFrame, test_size: float = 0.2): |
| """Initialize with benchmark dataset.""" |
| self.df = dataset |
| self.model = None |
| self.target = "metrics.result_per_accelerator" |
| self.features = [] |
| self.test_size = test_size |
|
|
| self.evaluation_data = pd.DataFrame() |
| self.evaluation_metrics = {} |
| self.feature_importance = pd.DataFrame(columns=["Feature", "Importance"]) |
|
|
| self.excluded_features = { |
| "model.name", |
| "model.mlperf_name", |
| "software.framework", |
| "system.name", |
| } |
|
|
| self.excluded_features.update( |
| { |
| col |
| for col in dataset.columns |
| if col.startswith("submission.") or col.startswith("metrics.") |
| } |
| ) |
|
|
| self.distributions = {} |
|
|
| self.max_accelerators = int(dataset["system.accelerator.total_count"].max()) |
| self.max_gpu_memory = float(dataset["system.accelerator.memory_capacity"].max()) |
| self.max_cpu_memory = float(dataset["system.memory.capacity"].max()) |
|
|
| self.frameworks = sorted( |
| list( |
| set( |
| col.replace("software.framework.", "") |
| for col in dataset.columns |
| if col.startswith("software.framework.") |
| and col != "software.framework" |
| ) |
| ) |
| ) |
| logger.info( |
| f"Found {len(self.frameworks)} unique frameworks: {', '.join(self.frameworks)}" |
| ) |
|
|
| self._identify_features() |
| self._analyze_data_distributions() |
| self._train_model() |
|
|
| def _identify_features(self): |
| """Identify features for model training.""" |
| all_columns = set(self.df.columns) |
| available_features = all_columns - self.excluded_features - {self.target} |
| self.features = [f for f in available_features if not self.df[f].isna().all()] |
| logger.info(f"Identified {len(self.features)} features for model training") |
|
|
| def _analyze_data_distributions(self): |
| """Analyze feature distributions for realistic data generation.""" |
| categorical_features = { |
| col |
| for col in self.df.columns |
| if self.df[col].dtype == "object" |
| or col in FEATURE_TYPES.get("categorical", []) |
| } |
|
|
| for feature in categorical_features: |
| values = self.df[feature].dropna().tolist() |
| if values: |
| counter = Counter(values) |
| total = sum(counter.values()) |
| self.distributions[feature] = { |
| value: count / total for value, count in counter.items() |
| } |
|
|
| continuous_features = { |
| col |
| for col in self.df.columns |
| if col in FEATURE_TYPES.get("continuous", []) |
| or pd.api.types.is_numeric_dtype(self.df[col].dtype) |
| if col not in categorical_features and not col.startswith("metrics.") |
| } |
|
|
| for feature in continuous_features: |
| values = self.df[feature].dropna() |
| if len(values) > 0: |
| self.distributions[feature] = { |
| "min": float(values.min()), |
| "max": float(values.max()), |
| "mean": float(values.mean()), |
| "std": float(values.std()), |
| "median": float(values.median()), |
| "values": sorted(values.unique().tolist()), |
| } |
|
|
| self._analyze_feature_relationships() |
| logger.info(f"Analyzed distributions for {len(self.distributions)} features") |
|
|
| def _analyze_feature_relationships(self): |
| """Analyze relationships between related features.""" |
| self._analyze_vendor_accelerator_relations() |
| self._analyze_vendor_cpu_relations() |
| self._analyze_accelerator_memory_relations() |
| self._analyze_interconnect_relations() |
| self._analyze_software_relations() |
| self._analyze_device_counts() |
|
|
| def _analyze_vendor_accelerator_relations(self): |
| """Map vendors to their accelerators.""" |
| vendor_accelerators = defaultdict(list) |
| for _, row in self.df.iterrows(): |
| vendor = row.get("system.accelerator.vendor") |
| acc = row.get("system.accelerator.name") |
| if vendor and acc: |
| vendor_accelerators[vendor].append(acc) |
|
|
| self.distributions["vendor_accelerators"] = {} |
| for vendor, accelerators in vendor_accelerators.items(): |
| counter = Counter(accelerators) |
| total = sum(counter.values()) |
| self.distributions["vendor_accelerators"][vendor] = { |
| acc: count / total for acc, count in counter.items() |
| } |
|
|
| def _analyze_vendor_cpu_relations(self): |
| """Map CPU vendors to their models.""" |
| vendor_cpus = defaultdict(list) |
| for _, row in self.df.iterrows(): |
| vendor = row.get("system.cpu.vendor") |
| model = row.get("system.cpu.model") |
| if vendor and model: |
| vendor_cpus[vendor].append(model) |
|
|
| self.distributions["vendor_cpus"] = {} |
| for vendor, models in vendor_cpus.items(): |
| counter = Counter(models) |
| total = sum(counter.values()) |
| self.distributions["vendor_cpus"][vendor] = { |
| model: count / total for model, count in counter.items() |
| } |
|
|
| def _analyze_accelerator_memory_relations(self): |
| """Map accelerator models to memory capacities.""" |
| acc_memory = defaultdict(list) |
| for _, row in self.df.iterrows(): |
| acc = row.get("system.accelerator.name") |
| memory = row.get("system.accelerator.memory_capacity") |
| if acc and memory: |
| acc_memory[acc].append(memory) |
|
|
| self.distributions["accelerator_memory"] = {} |
| for acc, memories in acc_memory.items(): |
| if memories: |
| counter = Counter(memories) |
| most_common = counter.most_common(1)[0][0] if counter else None |
| self.distributions["accelerator_memory"][acc] = { |
| "min": min(memories), |
| "max": max(memories), |
| "mean": sum(memories) / len(memories), |
| "most_common": most_common, |
| "values": sorted(set(memories)), |
| } |
|
|
| def _analyze_interconnect_relations(self): |
| """Map vendors to interconnect types.""" |
| vendor_interconnects = defaultdict(list) |
| for _, row in self.df.iterrows(): |
| vendor = row.get("system.accelerator.vendor") |
| interconnect = row.get("system.interconnect.accelerator") |
| if vendor and interconnect: |
| vendor_interconnects[vendor].append(interconnect) |
|
|
| self.distributions["vendor_interconnects"] = {} |
| for vendor, interconnects in vendor_interconnects.items(): |
| counter = Counter(interconnects) |
| total = sum(counter.values()) |
| self.distributions["vendor_interconnects"][vendor] = { |
| ic: count / total for ic, count in counter.items() |
| } |
|
|
| def _analyze_software_relations(self): |
| """Map vendors to software stacks.""" |
| vendor_software = defaultdict(lambda: defaultdict(list)) |
| for _, row in self.df.iterrows(): |
| vendor = row.get("system.accelerator.vendor") |
| if not vendor: |
| continue |
|
|
| os = row.get("software.operating_system") |
| if os: |
| vendor_software[vendor]["os"].append(os) |
|
|
| for col in self.df.columns: |
| if ( |
| col.startswith("software.framework.") |
| and col != "software.framework" |
| ): |
| framework = col.replace("software.framework.", "") |
| version = row.get(col) |
| if version: |
| vendor_software[vendor][framework].append(version) |
|
|
| self.distributions["vendor_software"] = {} |
| for vendor, software_dict in vendor_software.items(): |
| self.distributions["vendor_software"][vendor] = {} |
| for software_type, values in software_dict.items(): |
| counter = Counter(values) |
| total = sum(counter.values()) |
| self.distributions["vendor_software"][vendor][software_type] = { |
| value: count / total for value, count in counter.items() |
| } |
|
|
| def _analyze_device_counts(self): |
| """Analyze distribution of device counts.""" |
| counts = self.df["system.accelerator.total_count"].dropna().astype(int).tolist() |
| if counts: |
| counter = Counter(counts) |
| total = sum(counter.values()) |
| self.distributions["device_count"] = { |
| count: freq / total for count, freq in counter.items() |
| } |
| self.distributions["device_count_values"] = sorted(list(set(counts))) |
|
|
| node_counts = self.df["system.number_of_nodes"].dropna().astype(int).tolist() |
| if node_counts: |
| counter = Counter(node_counts) |
| total = sum(counter.values()) |
| self.distributions["node_count"] = { |
| count: freq / total for count, freq in counter.items() |
| } |
| self.distributions["node_count_values"] = sorted(list(set(node_counts))) |
|
|
| device_node_pairs = [ |
| ( |
| int(row["system.number_of_nodes"]), |
| int(row["system.accelerator.total_count"]), |
| ) |
| for _, row in self.df.iterrows() |
| if row.get("system.number_of_nodes") |
| and row.get("system.accelerator.total_count") |
| ] |
|
|
| node_to_devices = defaultdict(list) |
| for nodes, devices in device_node_pairs: |
| node_to_devices[nodes].append(devices) |
|
|
| self.distributions["node_device_relation"] = {} |
| for node_count, device_counts in node_to_devices.items(): |
| counter = Counter(device_counts) |
| total = sum(counter.values()) |
| self.distributions["node_device_relation"][node_count] = { |
| count: freq / total for count, freq in counter.items() |
| } |
|
|
| def _train_model(self): |
| """Train XGBoost model on available data with train/test split.""" |
| df_clean = self.df.dropna(subset=[self.target]) |
|
|
| X = df_clean[self.features] |
| y = df_clean[self.target] |
|
|
| for col in X.select_dtypes(include=["object"]).columns: |
| with pd.option_context("mode.chained_assignment", None): |
| X[col] = X[col].astype("category") |
|
|
| try: |
| strat_column = df_clean["system.accelerator.name"].fillna("unknown") |
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y, test_size=self.test_size, stratify=strat_column, random_state=42 |
| ) |
| logger.info( |
| f"Created stratified train/test split ({100 - self.test_size * 100:.0f}%/{self.test_size * 100:.0f}%) with {len(X_train)} training and {len(X_test)} test samples" |
| ) |
| except ValueError: |
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y, test_size=self.test_size, random_state=42 |
| ) |
| logger.info( |
| f"Created regular train/test split with {len(X_train)} training and {len(X_test)} test samples" |
| ) |
|
|
| self.model = xgb.XGBRegressor( |
| objective="reg:squarederror", |
| n_estimators=100, |
| max_depth=6, |
| learning_rate=0.1, |
| subsample=0.8, |
| enable_categorical=True, |
| ) |
|
|
| self.model.fit(X_train, y_train) |
| logger.info(f"Trained XGBoost model on {len(X_train)} rows") |
|
|
| self._evaluate_model(X_test, y_test, df_clean.loc[X_test.index]) |
|
|
| def _evaluate_model(self, X_test, y_test, test_df): |
| """Evaluate model performance on test set.""" |
| if X_test.empty: |
| logger.warning("No test data available for evaluation") |
| return |
|
|
| y_pred = self.model.predict(X_test) |
|
|
| rmse = np.sqrt(mean_squared_error(y_test, y_pred)) |
| mae = mean_absolute_error(y_test, y_pred) |
| r2 = r2_score(y_test, y_pred) |
|
|
| mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100 |
|
|
| self.evaluation_metrics = { |
| "rmse": rmse, |
| "mae": mae, |
| "r2": r2, |
| "mape": mape, |
| "test_size": len(y_test), |
| "training_size": len(self.df) - len(y_test), |
| } |
|
|
| eval_data = test_df[ |
| [ |
| "system.accelerator.name", |
| "system.accelerator.vendor", |
| "system.accelerator.total_count", |
| ] |
| ].copy() |
| eval_data["actual"] = y_test |
| eval_data["predicted"] = y_pred |
| eval_data["error"] = y_pred - y_test |
| eval_data["error_percent"] = (y_pred - y_test) / y_test * 100 |
|
|
| self.evaluation_data = eval_data.copy() |
|
|
| logger.info( |
| f"Model evaluation - RMSE: {rmse:.2f}, MAE: {mae:.2f}, R²: {r2:.3f}, MAPE: {mape:.2f}%" |
| ) |
| logger.info( |
| f"Evaluation data shape: {eval_data.shape}, with columns: {list(eval_data.columns)}" |
| ) |
| logger.info(f"Evaluation data sample: {eval_data.head(2).to_dict()}") |
| logger.info( |
| f"Evaluation data stored as class attribute with shape: {self.evaluation_data.shape}" |
| ) |
|
|
| importance = self.model.feature_importances_ |
| feature_importance = pd.DataFrame( |
| {"Feature": self.model.feature_names_in_, "Importance": importance} |
| ).sort_values("Importance", ascending=False) |
| self.feature_importance = feature_importance.head(10).copy() |
|
|
| logger.info( |
| f"Top 5 important features: {', '.join(self.feature_importance['Feature'].head(5).tolist())}" |
| ) |
|
|
| def get_evaluation_metrics(self) -> dict: |
| """Return model evaluation metrics.""" |
| logger.info(f"Getting evaluation metrics: {self.evaluation_metrics}") |
| return self.evaluation_metrics.copy() if self.evaluation_metrics else {} |
|
|
| def get_evaluation_data(self) -> pd.DataFrame: |
| """Return evaluation data for visualization.""" |
| data_shape = ( |
| "empty" if self.evaluation_data.empty else self.evaluation_data.shape |
| ) |
| logger.info(f"Getting evaluation data with shape: {data_shape}") |
| return self.evaluation_data.copy() if not self.evaluation_data.empty else None |
|
|
| def get_feature_importance(self) -> pd.DataFrame: |
| """Return feature importance data.""" |
| logger.info( |
| f"Getting feature importance with shape: {self.feature_importance.shape}" |
| ) |
| return ( |
| self.feature_importance.copy() |
| if not self.feature_importance.empty |
| else pd.DataFrame(columns=["Feature", "Importance"]) |
| ) |
|
|
| def generate_predictions( |
| self, |
| architecture: str, |
| parameters: float, |
| constraints: dict = None, |
| num_configs: int = 10, |
| ) -> pd.DataFrame: |
| """Generate and predict performance for hardware configurations.""" |
| constraints = constraints or {} |
| logger.info( |
| f"Generating {num_configs} predictions for {architecture} model with {parameters}B parameters" |
| ) |
|
|
| configs = self._generate_configs( |
| architecture, parameters, constraints, num_configs |
| ) |
| if not configs: |
| return pd.DataFrame() |
|
|
| configs_df = pd.DataFrame(configs) |
| model_features = getattr(self.model, "feature_names_in_", self.features) |
|
|
| for feature in model_features: |
| if feature not in configs_df.columns: |
| configs_df[feature] = None |
|
|
| X_pred = configs_df[model_features] |
| for col in X_pred.select_dtypes(include=["object"]).columns: |
| with pd.option_context("mode.chained_assignment", None): |
| X_pred[col] = X_pred[col].astype("category") |
|
|
| configs_df[self.target] = self.model.predict(X_pred) |
| configs_df["predicted"] = True |
| configs_df["metrics.result"] = ( |
| configs_df[self.target] * configs_df["system.accelerator.total_count"] |
| ) |
| configs_df["system.name"] = "Hypothetical system - ongoing work" |
|
|
| logger.info( |
| f"Performance range: {configs_df[self.target].min():.2f} - {configs_df[self.target].max():.2f} tokens/s per accelerator" |
| ) |
| return configs_df |
|
|
| def _sample_from_distribution(self, distribution: dict) -> any: |
| """Sample a value from a categorical distribution.""" |
| items = list(distribution.keys()) |
| probabilities = list(distribution.values()) |
| return np.random.choice(items, p=probabilities) |
|
|
| def _sample_continuous_value(self, feature: str) -> float: |
| """Sample a continuous value from the feature distribution.""" |
| dist = self.distributions[feature] |
|
|
| if "values" in dist and dist["values"]: |
| if len(dist["values"]) > 3: |
| value = np.random.normal(dist["mean"], max(dist["std"], 1.0)) |
| value = max(dist["min"], min(dist["max"], value)) |
| closest_idx = min( |
| range(len(dist["values"])), |
| key=lambda i: abs(dist["values"][i] - value), |
| ) |
| return dist["values"][closest_idx] |
| else: |
| return random.choice(dist["values"]) |
|
|
| elif all(k in dist for k in ["min", "max", "mean", "std"]): |
| value = np.random.normal(dist["mean"], max(dist["std"], 1.0)) |
| return max(dist["min"], min(dist["max"], value)) |
|
|
| return np.random.uniform(dist["min"], dist["max"]) |
|
|
| def _get_device_count(self, min_devices=None, max_devices=None) -> int: |
| """Get a realistic device count based on distribution and constraints.""" |
| valid_counts = [ |
| count |
| for count in self.distributions["device_count_values"] |
| if (min_devices is None or count >= min_devices) |
| and (max_devices is None or count <= max_devices) |
| ] |
|
|
| if valid_counts: |
| probs = { |
| count: self.distributions["device_count"][count] |
| for count in valid_counts |
| if count in self.distributions["device_count"] |
| } |
|
|
| if probs: |
| total = sum(probs.values()) |
| items = list(probs.keys()) |
| weights = [probs[item] / total for item in items] |
| return np.random.choice(items, p=weights) |
|
|
| return random.choice(valid_counts) |
|
|
| if min_devices is not None and max_devices is not None: |
| valid_powers = [ |
| 2**i for i in range(10) if min_devices <= 2**i <= max_devices |
| ] |
| if valid_powers: |
| return random.choice(valid_powers) |
| return random.randint(min_devices, max_devices) |
|
|
| return random.choice([1, 2, 4, 8, 16]) |
|
|
| def _get_vendor_accelerator(self, vendor=None) -> tuple: |
| """Get a vendor and accelerator pair.""" |
| if vendor is None or vendor == "Any": |
| vendor = self._sample_from_distribution( |
| self.distributions["system.accelerator.vendor"] |
| ) |
|
|
| if vendor in self.distributions["vendor_accelerators"]: |
| accelerator = self._sample_from_distribution( |
| self.distributions["vendor_accelerators"][vendor] |
| ) |
| else: |
| accelerator = self._sample_from_distribution( |
| self.distributions["system.accelerator.name"] |
| ) |
|
|
| return vendor, accelerator |
|
|
| def _get_memory_for_accelerator( |
| self, vendor: str, accelerator: str, min_memory=None, max_memory=None |
| ) -> float: |
| """Get appropriate memory capacity for a given accelerator.""" |
| if accelerator in self.distributions["accelerator_memory"]: |
| mem_dist = self.distributions["accelerator_memory"][accelerator] |
|
|
| if "values" in mem_dist: |
| valid_values = [ |
| m |
| for m in mem_dist["values"] |
| if (min_memory is None or m >= min_memory) |
| and (max_memory is None or m <= max_memory) |
| ] |
| if valid_values: |
| return random.choice(valid_values) |
|
|
| if "most_common" in mem_dist: |
| most_common = mem_dist["most_common"] |
| if (min_memory is None or most_common >= min_memory) and ( |
| max_memory is None or most_common <= max_memory |
| ): |
| return most_common |
|
|
| dist = self.distributions["system.accelerator.memory_capacity"] |
| valid_values = [ |
| m |
| for m in dist["values"] |
| if (min_memory is None or m >= min_memory) |
| and (max_memory is None or m <= max_memory) |
| ] |
|
|
| if valid_values: |
| return random.choice(valid_values) |
|
|
| min_val = max(dist["min"], min_memory or dist["min"]) |
| max_val = min(dist["max"], max_memory or dist["max"]) |
|
|
| if min_val <= max_val: |
| mean = min(max(dist["mean"], min_val), max_val) |
| std = max(dist["std"], 1.0) |
|
|
| for _ in range(5): |
| value = np.random.normal(mean, std) |
| if min_val <= value <= max_val: |
| return value |
|
|
| return np.random.uniform(min_val, max_val) |
|
|
| return None |
|
|
| def _get_node_config(self, total_devices: int) -> tuple: |
| """Determine number of nodes and devices per node.""" |
| VALID_GPUS_PER_NODE = [1, 2, 4, 8] |
|
|
| for gpus_per_node in sorted(VALID_GPUS_PER_NODE, reverse=True): |
| if total_devices % gpus_per_node == 0: |
| return total_devices // gpus_per_node, gpus_per_node |
|
|
| for gpus_per_node in sorted(VALID_GPUS_PER_NODE, reverse=True): |
| if gpus_per_node <= total_devices: |
| nodes = total_devices // gpus_per_node |
| return nodes, gpus_per_node |
|
|
| return 1, 1 |
|
|
| def _get_cpu_config(self) -> dict: |
| """Generate a CPU configuration.""" |
| cpu_config = {} |
| cpu_config["system.cpu.vendor"] = self._sample_from_distribution( |
| self.distributions["system.cpu.vendor"] |
| ) |
|
|
| cpu_vendor = cpu_config["system.cpu.vendor"] |
| if cpu_vendor in self.distributions["vendor_cpus"]: |
| cpu_config["system.cpu.model"] = self._sample_from_distribution( |
| self.distributions["vendor_cpus"][cpu_vendor] |
| ) |
| else: |
| cpu_config["system.cpu.model"] = self._sample_from_distribution( |
| self.distributions["system.cpu.model"] |
| ) |
|
|
| for feature in [ |
| "system.cpu.core_count", |
| "system.cpu.count_per_node", |
| "system.cpu.frequency", |
| ]: |
| value = self._sample_continuous_value(feature) |
| if value is not None: |
| if feature in ["system.cpu.core_count", "system.cpu.count_per_node"]: |
| value = int(value) |
| cpu_config[feature] = value |
|
|
| if "system.cpu.caches" in self.distributions: |
| cpu_config["system.cpu.caches"] = self._sample_from_distribution( |
| self.distributions["system.cpu.caches"] |
| ) |
|
|
| return cpu_config |
|
|
| def _get_software_config(self, vendor: str, constraints=None) -> dict: |
| """Generate a software configuration based on hardware vendor.""" |
| constraints = constraints or {} |
| software_config = {} |
|
|
| if vendor in self.distributions["vendor_software"]: |
| vendor_sw = self.distributions["vendor_software"][vendor] |
|
|
| if "os" in vendor_sw: |
| os_constraint = constraints.get("software.operating_system") |
| if os_constraint and os_constraint != "Any": |
| software_config["software.operating_system"] = os_constraint |
| else: |
| software_config["software.operating_system"] = ( |
| self._sample_from_distribution(vendor_sw["os"]) |
| ) |
|
|
| for framework, versions in vendor_sw.items(): |
| if framework != "os": |
| framework_key = f"software.framework.{framework}" |
| version_constraint = constraints.get(framework_key) |
| if version_constraint and version_constraint != "Any": |
| software_config[framework_key] = version_constraint |
| else: |
| software_config[framework_key] = self._sample_from_distribution( |
| versions |
| ) |
|
|
| if ( |
| "software.operating_system" not in software_config |
| and "software.operating_system" in self.distributions |
| ): |
| os_constraint = constraints.get("software.operating_system") |
| if os_constraint and os_constraint != "Any": |
| software_config["software.operating_system"] = os_constraint |
| else: |
| software_config["software.operating_system"] = ( |
| self._sample_from_distribution( |
| self.distributions["software.operating_system"] |
| ) |
| ) |
|
|
| for framework in self.frameworks: |
| framework_key = f"software.framework.{framework}" |
| if ( |
| framework_key not in software_config |
| and framework_key in self.distributions |
| ): |
| version_constraint = constraints.get(framework_key) |
| if version_constraint and version_constraint != "Any": |
| software_config[framework_key] = version_constraint |
| else: |
| software_config[framework_key] = self._sample_from_distribution( |
| self.distributions[framework_key] |
| ) |
|
|
| return software_config |
|
|
| def _get_memory_config(self, min_memory=None, max_memory=None) -> dict: |
| """Generate a memory configuration.""" |
| memory_config = {} |
| dist = self.distributions["system.memory.capacity"] |
|
|
| if "values" in dist: |
| valid_values = [ |
| m |
| for m in dist["values"] |
| if (min_memory is None or m >= min_memory) |
| and (max_memory is None or m <= max_memory) |
| ] |
| if valid_values: |
| memory_config["system.memory.capacity"] = random.choice(valid_values) |
|
|
| if "system.memory.capacity" not in memory_config: |
| min_val = max(dist["min"], min_memory or dist["min"]) |
| max_val = min(dist["max"], max_memory or dist["max"]) |
|
|
| if min_val <= max_val: |
| mean = min(max(dist["mean"], min_val), max_val) |
| std = max(dist["std"], (max_val - min_val) / 6.0) |
|
|
| value = np.random.normal(mean, std) |
| if min_val <= value <= max_val: |
| memory_config["system.memory.capacity"] = value |
| else: |
| memory_config["system.memory.capacity"] = np.random.uniform( |
| min_val, max_val |
| ) |
|
|
| if "system.memory.configuration" in self.distributions: |
| memory_config["system.memory.configuration"] = ( |
| self._sample_from_distribution( |
| self.distributions["system.memory.configuration"] |
| ) |
| ) |
|
|
| return memory_config |
|
|
| def _get_interconnect_config(self, vendor: str) -> dict: |
| """Generate interconnect configuration based on vendor.""" |
| interconnect_config = {} |
|
|
| if vendor in self.distributions["vendor_interconnects"]: |
| interconnect_config["system.interconnect.accelerator"] = ( |
| self._sample_from_distribution( |
| self.distributions["vendor_interconnects"][vendor] |
| ) |
| ) |
| elif "system.interconnect.accelerator" in self.distributions: |
| interconnect_config["system.interconnect.accelerator"] = ( |
| self._sample_from_distribution( |
| self.distributions["system.interconnect.accelerator"] |
| ) |
| ) |
|
|
| if "system.interconnect.accelerator_host" in self.distributions: |
| interconnect_config["system.interconnect.accelerator_host"] = ( |
| self._sample_from_distribution( |
| self.distributions["system.interconnect.accelerator_host"] |
| ) |
| ) |
|
|
| return interconnect_config |
|
|
| def _generate_configs( |
| self, architecture: str, parameters: float, constraints=None, count: int = 10 |
| ) -> list: |
| """Generate configurations respecting user constraints.""" |
| constraints = constraints or {} |
| configs = [] |
|
|
| vendor = constraints.get("system.accelerator.vendor") |
| acc_name = constraints.get("system.accelerator.name") |
|
|
| def apply_margin(value, is_min=True): |
| if value is None or not isinstance(value, (int, float)) or value == "Any": |
| return None |
| return value * (0.9 if is_min else 1.1) |
|
|
| min_gpu_memory = apply_margin(constraints.get("min_gpu_memory"), is_min=True) |
| max_gpu_memory = apply_margin( |
| constraints.get("max_gpu_memory"), is_min=False |
| ) or (self.max_gpu_memory * 1.1) |
|
|
| min_cpu_memory = apply_margin(constraints.get("min_cpu_memory"), is_min=True) |
| max_cpu_memory = apply_margin( |
| constraints.get("max_cpu_memory"), is_min=False |
| ) or (self.max_cpu_memory * 1.1) |
|
|
| min_devices = apply_margin(constraints.get("min_accelerators"), is_min=True) |
| max_devices = ( |
| apply_margin(constraints.get("max_accelerators"), is_min=False) |
| or self.max_accelerators |
| ) |
|
|
| interconnect = constraints.get("system.interconnect.accelerator") |
| nodes = constraints.get("system.number_of_nodes") |
|
|
| VALID_GPUS_PER_NODE = [1, 2, 4, 8] |
|
|
| for _ in range(count * 3): |
| if len(configs) >= count: |
| break |
|
|
| device_count = self._get_device_count(min_devices, max_devices) |
| acc_vendor, acc_model = self._get_vendor_accelerator(vendor) |
|
|
| if acc_name and acc_name != "Any": |
| acc_model = acc_name |
|
|
| if nodes and nodes != "Any": |
| node_count = int(nodes) |
| valid_device_counts = [] |
| for gpus in VALID_GPUS_PER_NODE: |
| if node_count * gpus >= ( |
| min_devices or 1 |
| ) and node_count * gpus <= (max_devices or float("inf")): |
| valid_device_counts.append(gpus) |
|
|
| if not valid_device_counts: |
| continue |
|
|
| devices_per_node = random.choice(valid_device_counts) |
| device_count = node_count * devices_per_node |
| else: |
| valid_count = False |
| for gpus_per_node in sorted(VALID_GPUS_PER_NODE, reverse=True): |
| if device_count % gpus_per_node == 0: |
| valid_count = True |
| break |
|
|
| if not valid_count: |
| node_count, devices_per_node = self._get_node_config(device_count) |
| device_count = node_count * devices_per_node |
| else: |
| node_count, devices_per_node = ( |
| device_count // gpus_per_node, |
| gpus_per_node, |
| ) |
|
|
| config = { |
| "model.architecture": architecture, |
| "model.number_of_parameters": parameters, |
| "system.accelerator.vendor": acc_vendor, |
| "system.accelerator.name": acc_model, |
| "system.accelerator.total_count": device_count, |
| "system.number_of_nodes": node_count, |
| "system.accelerator.count_per_node": devices_per_node, |
| } |
|
|
| gpu_memory = self._get_memory_for_accelerator( |
| acc_vendor, |
| acc_model, |
| min_memory=min_gpu_memory, |
| max_memory=max_gpu_memory, |
| ) |
|
|
| if gpu_memory is None: |
| continue |
|
|
| config["system.accelerator.memory_capacity"] = gpu_memory |
|
|
| if "system.accelerator.memory_config" in self.distributions: |
| config["system.accelerator.memory_config"] = ( |
| self._sample_from_distribution( |
| self.distributions["system.accelerator.memory_config"] |
| ) |
| ) |
|
|
| interconnect_config = self._get_interconnect_config(acc_vendor) |
|
|
| if interconnect and interconnect != "Any": |
| interconnect_config["system.interconnect.accelerator"] = interconnect |
|
|
| config.update(interconnect_config) |
| config.update(self._get_cpu_config()) |
|
|
| memory_config = self._get_memory_config( |
| min_memory=min_cpu_memory, max_memory=max_cpu_memory |
| ) |
| if "system.memory.capacity" not in memory_config: |
| continue |
|
|
| config.update(memory_config) |
|
|
| for feature_name in [ |
| "system.type", |
| "system.cooling", |
| "model.weight_data_types", |
| ]: |
| if feature_name in self.distributions: |
| config[feature_name] = self._sample_from_distribution( |
| self.distributions[feature_name] |
| ) |
|
|
| config.update(self._get_software_config(acc_vendor, constraints)) |
|
|
| for key, value in constraints.items(): |
| if ( |
| not key.startswith("software.framework.") |
| and key != "software.operating_system" |
| and key |
| not in [ |
| "min_gpu_memory", |
| "max_gpu_memory", |
| "min_cpu_memory", |
| "max_cpu_memory", |
| "min_accelerators", |
| "max_accelerators", |
| ] |
| and key not in config |
| and value != "Any" |
| and value is not None |
| ): |
| config[key] = value |
|
|
| configs.append(config) |
|
|
| return configs[:count] |
|
|