""" DoE & Bayesian Optimization Dashboard A comprehensive tool for experimentalists to design experiments, fit surrogate models, and get AI-guided suggestions for next experiments. """ import gradio as gr import numpy as np import pandas as pd import itertools import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots from scipy.stats import norm from scipy.stats.qmc import LatinHypercube, Sobol, scale as qmc_scale from scipy.optimize import minimize as sp_minimize from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.gaussian_process.kernels import ( Matern, RBF, ConstantKernel as C, WhiteKernel, RationalQuadratic, DotProduct ) from sklearn.ensemble import RandomForestRegressor, ExtraTreesRegressor, GradientBoostingRegressor from sklearn.preprocessing import StandardScaler import warnings warnings.filterwarnings("ignore") # ═══════════════════════════════════════════════════════════════════ # HELPER: Design of Experiments generators # ═══════════════════════════════════════════════════════════════════ def parse_levels(level_str): """Parse a level string like '10, 20, 30' or '10:30:3' into a list of floats.""" level_str = str(level_str).strip() if not level_str: return [] if ":" in level_str: parts = level_str.split(":") if len(parts) == 3: lo, hi, n = float(parts[0]), float(parts[1]), int(parts[2]) return list(np.linspace(lo, hi, n)) elif len(parts) == 2: lo, hi = float(parts[0]), float(parts[1]) return [lo, hi] else: return [float(x.strip()) for x in level_str.split(",") if x.strip()] def generate_full_factorial(factor_df): """Generate full factorial design from a factor definition table.""" if factor_df is None or len(factor_df) == 0: return pd.DataFrame({"Error": ["No factors defined"]}) factors = {} for _, row in factor_df.iterrows(): name = str(row.iloc[0]).strip() levels = parse_levels(row.iloc[1]) if name and levels: factors[name] = levels if not factors: return pd.DataFrame({"Error": ["No valid factors found. Use format: '10,20,30' or '10:30:3'"]}) names = list(factors.keys()) level_lists = list(factors.values()) combos = list(itertools.product(*level_lists)) df = pd.DataFrame(combos, columns=names) df.insert(0, "Run", range(1, len(df) + 1)) df["Response"] = np.nan return df def generate_fractional_factorial(factor_df, fraction=0.5): """Generate fractional factorial (random subset).""" full = generate_full_factorial(factor_df) if "Error" in full.columns: return full n_keep = max(2, int(len(full) * fraction)) sampled = full.sample(n=n_keep, random_state=42).reset_index(drop=True) sampled["Run"] = range(1, len(sampled) + 1) return sampled def generate_lhs_design(factor_df, n_runs=20): """Generate Latin Hypercube Sampling design.""" if factor_df is None or len(factor_df) == 0: return pd.DataFrame({"Error": ["No factors defined"]}) bounds = [] names = [] for _, row in factor_df.iterrows(): name = str(row.iloc[0]).strip() levels = parse_levels(row.iloc[1]) if name and len(levels) >= 2: names.append(name) bounds.append((min(levels), max(levels))) if not names: return pd.DataFrame({"Error": ["Need at least 2 levels per factor for LHS"]}) d = len(names) n_runs = int(n_runs) sampler = LatinHypercube(d=d, seed=42) X = sampler.random(n=n_runs) lb = np.array([b[0] for b in bounds]) ub = np.array([b[1] for b in bounds]) X_scaled = qmc_scale(X, lb, ub) df = pd.DataFrame(X_scaled, columns=names) df.insert(0, "Run", range(1, n_runs + 1)) df["Response"] = np.nan for col in names: df[col] = df[col].round(4) return df def generate_sobol_design(factor_df, n_runs=16): """Generate Sobol sequence design.""" if factor_df is None or len(factor_df) == 0: return pd.DataFrame({"Error": ["No factors defined"]}) bounds = [] names = [] for _, row in factor_df.iterrows(): name = str(row.iloc[0]).strip() levels = parse_levels(row.iloc[1]) if name and len(levels) >= 2: names.append(name) bounds.append((min(levels), max(levels))) if not names: return pd.DataFrame({"Error": ["Need at least 2 levels per factor"]}) d = len(names) n_runs = int(n_runs) n_actual = 2 ** int(np.ceil(np.log2(max(n_runs, 2)))) sampler = Sobol(d=d, scramble=True, seed=42) X = sampler.random(n=n_actual) lb = np.array([b[0] for b in bounds]) ub = np.array([b[1] for b in bounds]) X_scaled = qmc_scale(X, lb, ub) df = pd.DataFrame(X_scaled[:n_runs], columns=names) df.insert(0, "Run", range(1, n_runs + 1)) df["Response"] = np.nan for col in names: df[col] = df[col].round(4) return df def generate_central_composite(factor_df): """Generate Central Composite Design (CCD).""" if factor_df is None or len(factor_df) == 0: return pd.DataFrame({"Error": ["No factors defined"]}) bounds = [] names = [] for _, row in factor_df.iterrows(): name = str(row.iloc[0]).strip() levels = parse_levels(row.iloc[1]) if name and len(levels) >= 2: names.append(name) bounds.append((min(levels), max(levels))) if not names: return pd.DataFrame({"Error": ["Need at least 2 levels per factor"]}) k = len(names) lb = np.array([b[0] for b in bounds]) ub = np.array([b[1] for b in bounds]) center = (lb + ub) / 2 half_range = (ub - lb) / 2 factorial_coded = np.array(list(itertools.product([-1, 1], repeat=k))) alpha = np.sqrt(k) axial_coded = np.zeros((2 * k, k)) for i in range(k): axial_coded[2 * i, i] = -alpha axial_coded[2 * i + 1, i] = alpha center_coded = np.zeros((3, k)) all_coded = np.vstack([factorial_coded, axial_coded, center_coded]) all_actual = center + all_coded * half_range all_actual = np.clip(all_actual, lb, ub) df = pd.DataFrame(all_actual, columns=names) df.insert(0, "Run", range(1, len(df) + 1)) df["Response"] = np.nan for col in names: df[col] = df[col].round(4) return df def generate_box_behnken(factor_df): """Generate Box-Behnken design (for 3+ factors).""" if factor_df is None or len(factor_df) == 0: return pd.DataFrame({"Error": ["No factors defined"]}) bounds = [] names = [] for _, row in factor_df.iterrows(): name = str(row.iloc[0]).strip() levels = parse_levels(row.iloc[1]) if name and len(levels) >= 2: names.append(name) bounds.append((min(levels), max(levels))) if len(names) < 3: return pd.DataFrame({"Error": ["Box-Behnken requires at least 3 factors"]}) k = len(names) lb = np.array([b[0] for b in bounds]) ub = np.array([b[1] for b in bounds]) center = (lb + ub) / 2 half_range = (ub - lb) / 2 runs_coded = [] for i in range(k): for j in range(i + 1, k): for vi in [-1, 1]: for vj in [-1, 1]: row = [0] * k row[i] = vi row[j] = vj runs_coded.append(row) for _ in range(3): runs_coded.append([0] * k) all_coded = np.array(runs_coded) all_actual = center + all_coded * half_range df = pd.DataFrame(all_actual, columns=names) df.insert(0, "Run", range(1, len(df) + 1)) df["Response"] = np.nan for col in names: df[col] = df[col].round(4) return df # ═══════════════════════════════════════════════════════════════════ # HELPER: Surrogate Models & Acquisition Functions # ═══════════════════════════════════════════════════════════════════ KERNEL_MAP = { "Matérn 5/2": lambda d: C(1.0, (1e-3, 1e3)) * Matern(length_scale=np.ones(d), nu=2.5) + WhiteKernel(noise_level=1e-3), "Matérn 3/2": lambda d: C(1.0, (1e-3, 1e3)) * Matern(length_scale=np.ones(d), nu=1.5) + WhiteKernel(noise_level=1e-3), "RBF": lambda d: C(1.0, (1e-3, 1e3)) * RBF(length_scale=np.ones(d)) + WhiteKernel(noise_level=1e-3), "Rational Quadratic": lambda d: C(1.0, (1e-3, 1e3)) * RationalQuadratic() + WhiteKernel(noise_level=1e-3), "RBF + Matérn": lambda d: C(1.0) * RBF(length_scale=np.ones(d)) + C(0.5) * Matern(length_scale=np.ones(d), nu=2.5) + WhiteKernel(noise_level=1e-3), } def build_surrogate(model_type, kernel_name, n_dims, n_estimators=100, alpha=1e-6, n_restarts=5): """Build a surrogate model based on type selection.""" if model_type == "Gaussian Process": kernel_fn = KERNEL_MAP.get(kernel_name, KERNEL_MAP["Matérn 5/2"]) kernel = kernel_fn(n_dims) model = GaussianProcessRegressor( kernel=kernel, alpha=alpha, n_restarts_optimizer=n_restarts, normalize_y=True ) return model elif model_type == "Random Forest": return RandomForestRegressor(n_estimators=n_estimators, min_samples_leaf=2, random_state=42) elif model_type == "Extra Trees": return ExtraTreesRegressor(n_estimators=n_estimators, min_samples_leaf=2, random_state=42) elif model_type == "Gradient Boosting": return GradientBoostingRegressor(n_estimators=n_estimators, max_depth=4, random_state=42) else: return GaussianProcessRegressor(normalize_y=True) def predict_with_uncertainty(model, X, model_type="Gaussian Process"): """Predict mean and std for any surrogate model type.""" X = np.atleast_2d(X) if model_type == "Gaussian Process": mu, sigma = model.predict(X, return_std=True) return mu, sigma elif model_type in ["Random Forest", "Extra Trees"]: preds = np.array([tree.predict(X) for tree in model.estimators_]) mu = preds.mean(axis=0) sigma = preds.std(axis=0) return mu, sigma elif model_type == "Gradient Boosting": mu = model.predict(X) staged = np.array(list(model.staged_predict(X))) sigma = staged.std(axis=0) return mu, sigma else: mu = model.predict(X) return mu, np.zeros_like(mu) def expected_improvement(X, model, y_best, xi=0.01, model_type="Gaussian Process", maximize=True): mu, sigma = predict_with_uncertainty(model, X, model_type) sigma = np.maximum(sigma, 1e-9) if maximize: Z = (mu - y_best - xi) / sigma else: Z = (y_best - mu - xi) / sigma ei = sigma * (Z * norm.cdf(Z) + norm.pdf(Z)) ei[sigma < 1e-10] = 0.0 return ei def probability_of_improvement(X, model, y_best, xi=0.01, model_type="Gaussian Process", maximize=True): mu, sigma = predict_with_uncertainty(model, X, model_type) sigma = np.maximum(sigma, 1e-9) if maximize: Z = (mu - y_best - xi) / sigma else: Z = (y_best - mu - xi) / sigma return norm.cdf(Z) def upper_confidence_bound(X, model, kappa=2.0, model_type="Gaussian Process", maximize=True): mu, sigma = predict_with_uncertainty(model, X, model_type) if maximize: return mu + kappa * sigma else: return -(mu - kappa * sigma) def thompson_sampling(X, model, model_type="Gaussian Process", maximize=True): mu, sigma = predict_with_uncertainty(model, X, model_type) samples = np.random.normal(mu, np.maximum(sigma, 1e-9)) if not maximize: samples = -samples return samples ACQ_MAP = { "Expected Improvement (EI)": expected_improvement, "Probability of Improvement (PI)": probability_of_improvement, "Upper Confidence Bound (UCB)": upper_confidence_bound, "Thompson Sampling": thompson_sampling, } def suggest_next_experiments(model, model_type, acq_name, bounds, n_suggestions, y_best, xi=0.01, kappa=2.0, maximize=True, n_candidates=5000): """Suggest next experiments using acquisition function optimization.""" d = len(bounds) lb = np.array([b[0] for b in bounds]) ub = np.array([b[1] for b in bounds]) sampler = LatinHypercube(d=d, seed=np.random.randint(0, 10000)) X_candidates = qmc_scale(sampler.random(n=n_candidates), lb, ub) acq_fn = ACQ_MAP.get(acq_name, expected_improvement) if acq_name == "Upper Confidence Bound (UCB)": acq_vals = acq_fn(X_candidates, model, kappa=kappa, model_type=model_type, maximize=maximize) elif acq_name == "Thompson Sampling": acq_vals = acq_fn(X_candidates, model, model_type=model_type, maximize=maximize) else: acq_vals = acq_fn(X_candidates, model, y_best, xi=xi, model_type=model_type, maximize=maximize) top_indices = np.argsort(acq_vals)[::-1] selected = [] selected_points = [] for idx in top_indices: if len(selected) >= n_suggestions: break pt = X_candidates[idx] if selected_points: dists = [np.linalg.norm(pt - sp) for sp in selected_points] min_dist = min(dists) range_diag = np.linalg.norm(ub - lb) if min_dist < 0.02 * range_diag: continue selected.append(idx) selected_points.append(pt) X_suggested = X_candidates[selected] acq_at_suggested = acq_vals[selected] mu_sug, sigma_sug = predict_with_uncertainty(model, X_suggested, model_type) return X_suggested, acq_at_suggested, mu_sug, sigma_sug # ═══════════════════════════════════════════════════════════════════ # VISUALIZATION FUNCTIONS # ═══════════════════════════════════════════════════════════════════ def plot_gp_surface_2d(model, model_type, param_names, bounds, X_obs, y_obs, resolution=50, dim_x=0, dim_y=1, fixed_values=None): """2D contour plot of surrogate model prediction surface.""" if len(param_names) < 2: fig, ax = plt.subplots(figsize=(8, 5)) ax.text(0.5, 0.5, "Need at least 2 parameters for 2D surface", ha='center', va='center', fontsize=14, transform=ax.transAxes) return fig x1_range = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution) x2_range = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution) X1, X2 = np.meshgrid(x1_range, x2_range) d = len(param_names) X_grid = np.zeros((resolution * resolution, d)) X_grid[:, dim_x] = X1.ravel() X_grid[:, dim_y] = X2.ravel() for i in range(d): if i != dim_x and i != dim_y: if fixed_values and i < len(fixed_values): X_grid[:, i] = fixed_values[i] else: X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2 mu, sigma = predict_with_uncertainty(model, X_grid, model_type) fig, axes = plt.subplots(1, 2, figsize=(14, 5)) c1 = axes[0].contourf(X1, X2, mu.reshape(resolution, resolution), levels=25, cmap='viridis') axes[0].scatter(X_obs[:, dim_x], X_obs[:, dim_y], c='red', s=60, edgecolors='white', zorder=5, label='Observations') plt.colorbar(c1, ax=axes[0]) axes[0].set_xlabel(param_names[dim_x], fontsize=11) axes[0].set_ylabel(param_names[dim_y], fontsize=11) axes[0].set_title("Surrogate Mean Prediction", fontsize=12, fontweight='bold') axes[0].legend(fontsize=9) c2 = axes[1].contourf(X1, X2, sigma.reshape(resolution, resolution), levels=25, cmap='YlOrRd') axes[1].scatter(X_obs[:, dim_x], X_obs[:, dim_y], c='blue', s=60, edgecolors='white', zorder=5, label='Observations') plt.colorbar(c2, ax=axes[1]) axes[1].set_xlabel(param_names[dim_x], fontsize=11) axes[1].set_ylabel(param_names[dim_y], fontsize=11) axes[1].set_title("Prediction Uncertainty (σ)", fontsize=12, fontweight='bold') axes[1].legend(fontsize=9) plt.tight_layout() return fig def plot_gp_1d_slices(model, model_type, param_names, bounds, X_obs, y_obs): """1D slice plots through each parameter dimension.""" d = len(param_names) n_cols = min(3, d) n_rows = int(np.ceil(d / n_cols)) fig, axes = plt.subplots(n_rows, n_cols, figsize=(5 * n_cols, 4 * n_rows)) if d == 1: axes = np.array([axes]) axes = np.atleast_2d(axes) center = np.array([(b[0] + b[1]) / 2 for b in bounds]) for i in range(d): row, col = divmod(i, n_cols) ax = axes[row, col] x_range = np.linspace(bounds[i][0], bounds[i][1], 200) X_plot = np.tile(center, (200, 1)) X_plot[:, i] = x_range mu, sigma = predict_with_uncertainty(model, X_plot, model_type) ax.fill_between(x_range, mu - 2 * sigma, mu + 2 * sigma, alpha=0.2, color='steelblue', label='±2σ') ax.fill_between(x_range, mu - sigma, mu + sigma, alpha=0.3, color='steelblue', label='±1σ') ax.plot(x_range, mu, 'b-', linewidth=2, label='Mean') ax.scatter(X_obs[:, i], y_obs, c='red', s=40, zorder=5, label='Observations') ax.set_xlabel(param_names[i], fontsize=11) ax.set_ylabel("Response", fontsize=11) ax.set_title(f"Effect of {param_names[i]}", fontsize=11, fontweight='bold') ax.legend(fontsize=8) ax.grid(True, alpha=0.3) for i in range(d, n_rows * n_cols): row, col = divmod(i, n_cols) axes[row, col].set_visible(False) plt.tight_layout() return fig def plot_acquisition_landscape(model, model_type, acq_name, param_names, bounds, X_obs, y_obs, y_best, xi, kappa, maximize, dim_x=0, dim_y=1, resolution=60): """Plot the acquisition function landscape.""" if len(param_names) < 2: fig, ax = plt.subplots(figsize=(8, 5)) ax.text(0.5, 0.5, "Need at least 2 parameters", ha='center', va='center', fontsize=14, transform=ax.transAxes) return fig x1_range = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution) x2_range = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution) X1, X2 = np.meshgrid(x1_range, x2_range) d = len(param_names) X_grid = np.zeros((resolution * resolution, d)) X_grid[:, dim_x] = X1.ravel() X_grid[:, dim_y] = X2.ravel() for i in range(d): if i != dim_x and i != dim_y: X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2 acq_fn = ACQ_MAP.get(acq_name, expected_improvement) if acq_name == "Upper Confidence Bound (UCB)": acq_vals = acq_fn(X_grid, model, kappa=kappa, model_type=model_type, maximize=maximize) elif acq_name == "Thompson Sampling": acq_vals = acq_fn(X_grid, model, model_type=model_type, maximize=maximize) else: acq_vals = acq_fn(X_grid, model, y_best, xi=xi, model_type=model_type, maximize=maximize) fig, ax = plt.subplots(figsize=(8, 6)) c = ax.contourf(X1, X2, acq_vals.reshape(resolution, resolution), levels=25, cmap='plasma') ax.scatter(X_obs[:, dim_x], X_obs[:, dim_y], c='white', s=60, edgecolors='black', zorder=5, label='Observations') best_idx = np.argmax(acq_vals) ax.scatter(X_grid[best_idx, dim_x], X_grid[best_idx, dim_y], marker='*', c='yellow', s=300, edgecolors='black', zorder=10, label='Best acquisition') plt.colorbar(c, ax=ax) ax.set_xlabel(param_names[dim_x], fontsize=11) ax.set_ylabel(param_names[dim_y], fontsize=11) ax.set_title(f"Acquisition Function: {acq_name}", fontsize=12, fontweight='bold') ax.legend(fontsize=9) plt.tight_layout() return fig def plot_convergence(y_obs, maximize=True): """Plot optimization convergence.""" if len(y_obs) == 0: fig, ax = plt.subplots(figsize=(8, 5)) ax.text(0.5, 0.5, "No data yet", ha='center', va='center', fontsize=14, transform=ax.transAxes) return fig if maximize: best_so_far = np.maximum.accumulate(y_obs) ylabel = "Best Response (↑)" else: best_so_far = np.minimum.accumulate(y_obs) ylabel = "Best Response (↓)" iters = np.arange(1, len(y_obs) + 1) fig, ax = plt.subplots(figsize=(9, 5)) ax.plot(iters, best_so_far, 'b-o', linewidth=2, markersize=4, label='Best so far', zorder=4) ax.scatter(iters, y_obs, c='red', s=30, alpha=0.6, zorder=5, label='Individual observations') ax.fill_between(iters, best_so_far, y_obs, alpha=0.1, color='blue') ax.set_xlabel("Experiment Number", fontsize=11) ax.set_ylabel(ylabel, fontsize=11) ax.set_title("Optimization Convergence", fontsize=13, fontweight='bold') ax.legend(fontsize=10) ax.grid(True, alpha=0.3) plt.tight_layout() return fig def plot_parallel_coordinates(X_obs, y_obs, param_names): """Interactive parallel coordinates plot with Plotly.""" if len(X_obs) == 0: fig = go.Figure() fig.add_annotation(text="No data yet", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=20)) return fig df = pd.DataFrame(X_obs, columns=param_names) df['Response'] = y_obs dimensions = [] for col in param_names: dimensions.append(dict(range=[df[col].min(), df[col].max()], label=col, values=df[col])) dimensions.append(dict(range=[df['Response'].min(), df['Response'].max()], label='Response', values=df['Response'])) fig = go.Figure(data=go.Parcoords( line=dict(color=df['Response'], colorscale='Viridis', showscale=True, colorbar=dict(title='Response')), dimensions=dimensions )) fig.update_layout(title="Parallel Coordinates: All Experiments", height=450, margin=dict(l=80, r=80, t=60, b=40)) return fig def plot_predicted_vs_actual(model, model_type, X_obs, y_obs): """Parity plot: predicted vs actual.""" if len(X_obs) == 0: fig, ax = plt.subplots(figsize=(6, 6)) ax.text(0.5, 0.5, "No data", ha='center', va='center', fontsize=14, transform=ax.transAxes) return fig mu, sigma = predict_with_uncertainty(model, X_obs, model_type) fig, ax = plt.subplots(figsize=(6, 6)) ax.errorbar(y_obs, mu, yerr=2 * sigma, fmt='o', color='steelblue', ecolor='lightblue', elinewidth=1.5, capsize=3, markersize=6, label='Predictions ± 2σ') mn = min(y_obs.min(), mu.min()) mx = max(y_obs.max(), mu.max()) pad = (mx - mn) * 0.1 ax.plot([mn - pad, mx + pad], [mn - pad, mx + pad], 'k--', linewidth=1, label='Perfect fit') ss_res = np.sum((y_obs - mu) ** 2) ss_tot = np.sum((y_obs - y_obs.mean()) ** 2) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 rmse = np.sqrt(np.mean((y_obs - mu) ** 2)) ax.set_xlabel("Actual", fontsize=11) ax.set_ylabel("Predicted", fontsize=11) ax.set_title(f"Predicted vs Actual (R²={r2:.4f}, RMSE={rmse:.4f})", fontsize=12, fontweight='bold') ax.legend(fontsize=9) ax.grid(True, alpha=0.3) ax.set_aspect('equal', adjustable='box') plt.tight_layout() return fig def plot_feature_importance(model, model_type, param_names): """Feature importance from surrogate model.""" fig, ax = plt.subplots(figsize=(8, max(3, 0.5 * len(param_names)))) if model_type == "Gaussian Process": try: kernel = model.kernel_ ls = None def find_ls(k): if hasattr(k, 'length_scale') and not isinstance(k, WhiteKernel): return np.atleast_1d(k.length_scale) if hasattr(k, 'k1'): r = find_ls(k.k1) if r is not None: return r if hasattr(k, 'k2'): r = find_ls(k.k2) if r is not None: return r return None ls = find_ls(kernel) if ls is not None and len(ls) == len(param_names): importance = 1.0 / ls importance = importance / importance.sum() else: importance = np.ones(len(param_names)) / len(param_names) except Exception: importance = np.ones(len(param_names)) / len(param_names) title = "Feature Importance (GP Inverse Lengthscale)" elif model_type in ["Random Forest", "Extra Trees", "Gradient Boosting"]: importance = model.feature_importances_ title = f"Feature Importance ({model_type})" else: importance = np.ones(len(param_names)) / len(param_names) title = "Feature Importance" sorted_idx = np.argsort(importance) ax.barh(range(len(param_names)), importance[sorted_idx], color='steelblue', edgecolor='navy') ax.set_yticks(range(len(param_names))) ax.set_yticklabels(np.array(param_names)[sorted_idx]) ax.set_xlabel("Importance", fontsize=11) ax.set_title(title, fontsize=12, fontweight='bold') ax.grid(True, alpha=0.3, axis='x') plt.tight_layout() return fig def plot_residuals(model, model_type, X_obs, y_obs): """Residual analysis plots.""" if len(X_obs) == 0: fig, ax = plt.subplots() return fig mu, _ = predict_with_uncertainty(model, X_obs, model_type) residuals = y_obs - mu fig, axes = plt.subplots(1, 3, figsize=(15, 4)) axes[0].scatter(mu, residuals, c='steelblue', s=40, edgecolors='navy') axes[0].axhline(y=0, color='red', linestyle='--', linewidth=1) axes[0].set_xlabel("Predicted", fontsize=10) axes[0].set_ylabel("Residual", fontsize=10) axes[0].set_title("Residuals vs Predicted", fontsize=11, fontweight='bold') axes[0].grid(True, alpha=0.3) axes[1].hist(residuals, bins=max(5, len(residuals) // 3), color='steelblue', edgecolor='navy', alpha=0.8) axes[1].set_xlabel("Residual", fontsize=10) axes[1].set_ylabel("Count", fontsize=10) axes[1].set_title("Residual Distribution", fontsize=11, fontweight='bold') axes[1].grid(True, alpha=0.3) from scipy.stats import probplot probplot(residuals, dist="norm", plot=axes[2]) axes[2].set_title("Normal Q-Q Plot", fontsize=11, fontweight='bold') axes[2].grid(True, alpha=0.3) plt.tight_layout() return fig def plot_3d_surface(model, model_type, param_names, bounds, X_obs, y_obs, dim_x=0, dim_y=1): """3D surface plot using Plotly.""" if len(param_names) < 2: fig = go.Figure() fig.add_annotation(text="Need 2+ parameters", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig resolution = 40 x1 = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution) x2 = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution) X1, X2 = np.meshgrid(x1, x2) d = len(param_names) X_grid = np.zeros((resolution * resolution, d)) X_grid[:, dim_x] = X1.ravel() X_grid[:, dim_y] = X2.ravel() for i in range(d): if i != dim_x and i != dim_y: X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2 mu, sigma = predict_with_uncertainty(model, X_grid, model_type) fig = go.Figure() fig.add_trace(go.Surface( x=x1, y=x2, z=mu.reshape(resolution, resolution), colorscale='Viridis', name='GP Mean', opacity=0.85, colorbar=dict(title='Predicted') )) fig.add_trace(go.Scatter3d( x=X_obs[:, dim_x], y=X_obs[:, dim_y], z=y_obs, mode='markers', marker=dict(size=5, color='red', symbol='diamond'), name='Observations' )) fig.update_layout( title="3D Response Surface", scene=dict(xaxis_title=param_names[dim_x], yaxis_title=param_names[dim_y], zaxis_title="Response"), height=550, margin=dict(l=0, r=0, t=40, b=0) ) return fig def plot_uncertainty_heatmap(model, model_type, param_names, bounds, X_obs, dim_x=0, dim_y=1): """Heatmap of prediction uncertainty.""" if len(param_names) < 2: fig = go.Figure() fig.add_annotation(text="Need 2+ params", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig resolution = 50 x1 = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution) x2 = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution) X1, X2 = np.meshgrid(x1, x2) d = len(param_names) X_grid = np.zeros((resolution * resolution, d)) X_grid[:, dim_x] = X1.ravel() X_grid[:, dim_y] = X2.ravel() for i in range(d): if i != dim_x and i != dim_y: X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2 _, sigma = predict_with_uncertainty(model, X_grid, model_type) fig = go.Figure(data=go.Heatmap( x=x1, y=x2, z=sigma.reshape(resolution, resolution), colorscale='YlOrRd', colorbar=dict(title='Uncertainty (σ)') )) fig.add_trace(go.Scatter( x=X_obs[:, dim_x], y=X_obs[:, dim_y], mode='markers', marker=dict(size=8, color='blue', symbol='x', line=dict(width=1)), name='Observations' )) fig.update_layout( title="Prediction Uncertainty Map", xaxis_title=param_names[dim_x], yaxis_title=param_names[dim_y], height=450 ) return fig def plot_correlation_matrix(X_obs, y_obs, param_names): """Correlation matrix heatmap.""" if len(X_obs) == 0 or len(X_obs) < 3: fig = go.Figure() fig.add_annotation(text="Need ≥3 data points", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig df = pd.DataFrame(X_obs, columns=param_names) df['Response'] = y_obs corr = df.corr() fig = go.Figure(data=go.Heatmap( z=corr.values, x=corr.columns.tolist(), y=corr.index.tolist(), colorscale='RdBu_r', zmin=-1, zmax=1, text=np.round(corr.values, 2), texttemplate="%{text}", colorbar=dict(title='Correlation') )) fig.update_layout(title="Correlation Matrix", height=450, xaxis_title="", yaxis_title="") return fig def plot_scatter_matrix(X_obs, y_obs, param_names): """Scatter matrix (pair plot).""" if len(X_obs) < 2: fig = go.Figure() fig.add_annotation(text="Need ≥2 data points", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig df = pd.DataFrame(X_obs, columns=param_names) df['Response'] = y_obs fig = px.scatter_matrix(df, dimensions=param_names + ['Response'], color='Response', color_continuous_scale='Viridis', title="Scatter Matrix", height=max(400, 150 * len(param_names))) fig.update_traces(diagonal_visible=True, marker=dict(size=4)) fig.update_layout(margin=dict(l=40, r=40, t=60, b=40)) return fig # ═══════════════════════════════════════════════════════════════════ # MAIN DASHBOARD # ═══════════════════════════════════════════════════════════════════ CUSTOM_CSS = """ .main-header { text-align: center; margin-bottom: 5px; } .main-header h1 { font-size: 1.8em; margin-bottom: 2px; } .main-header p { color: #666; font-size: 0.95em; } .info-box { background: #f0f7ff; border-left: 4px solid #2196F3; padding: 10px 14px; border-radius: 4px; margin: 5px 0; font-size: 0.9em; } .warn-box { background: #fff8e1; border-left: 4px solid #ff9800; padding: 10px 14px; border-radius: 4px; margin: 5px 0; font-size: 0.9em; } """ def create_default_factor_table(): return pd.DataFrame({ "Factor Name": ["Temperature", "Pressure", "Time"], "Levels (comma-sep or lo:hi:n)": ["100, 150, 200", "1, 2, 3", "30, 60"] }) def add_factor_row(df): new_row = pd.DataFrame({"Factor Name": [f"Factor_{len(df)+1}"], "Levels (comma-sep or lo:hi:n)": ["0, 1"]}) return pd.concat([df, new_row], ignore_index=True) def remove_last_factor(df): if len(df) > 1: return df.iloc[:-1].reset_index(drop=True) return df def generate_design(factor_df, design_type, n_lhs_runs): if design_type == "Full Factorial": return generate_full_factorial(factor_df) elif design_type == "Fractional Factorial (½)": return generate_fractional_factorial(factor_df, 0.5) elif design_type == "Fractional Factorial (¼)": return generate_fractional_factorial(factor_df, 0.25) elif design_type == "Latin Hypercube (LHS)": return generate_lhs_design(factor_df, n_lhs_runs) elif design_type == "Sobol Sequence": return generate_sobol_design(factor_df, n_lhs_runs) elif design_type == "Central Composite (CCD)": return generate_central_composite(factor_df) elif design_type == "Box-Behnken": return generate_box_behnken(factor_df) else: return generate_full_factorial(factor_df) def add_experiment_row(exp_df): if exp_df is None or len(exp_df) == 0: return exp_df new_row = pd.DataFrame({col: [np.nan] for col in exp_df.columns}) new_row["Run"] = len(exp_df) + 1 return pd.concat([exp_df, new_row], ignore_index=True) def add_column_to_experiments(exp_df, col_name): if exp_df is None or not col_name.strip(): return exp_df exp_df[col_name.strip()] = np.nan return exp_df def compute_doe_stats(exp_df): if exp_df is None or len(exp_df) == 0: return "No experiments generated yet." n_runs = len(exp_df) n_params = len([c for c in exp_df.columns if c not in ["Run", "Response"]]) n_filled = exp_df["Response"].notna().sum() if "Response" in exp_df.columns else 0 return f"""### 📊 Design Summary | Metric | Value | |--------|-------| | Total Runs | {n_runs} | | Parameters | {n_params} | | Responses Filled | {n_filled} / {n_runs} | | Completion | {n_filled/n_runs*100:.0f}% | """ def run_bo_pipeline(exp_df, model_type, kernel_name, acq_name, n_suggestions, xi_val, kappa_val, maximize, n_estimators, alpha_val, n_restarts): """Run the full BO pipeline.""" empty_fig = plt.figure(figsize=(6, 4)) empty_plotly = go.Figure() n_outputs = 16 def make_error(msg): return (msg, pd.DataFrame(), msg, empty_fig, empty_fig, empty_fig, empty_fig, empty_fig, empty_fig, empty_fig, empty_plotly, empty_plotly, empty_plotly, empty_plotly, empty_plotly, None) if exp_df is None or len(exp_df) == 0: return make_error("⚠️ No experiment data. Generate a design first and fill in Response values.") param_cols = [c for c in exp_df.columns if c not in ["Run", "Response"]] if "Response" not in exp_df.columns: return make_error("⚠️ No 'Response' column found in experiment table.") valid_mask = exp_df["Response"].notna() if valid_mask.sum() < 2: return make_error("⚠️ Need at least 2 completed experiments (with Response values) to fit a model.") X_obs = exp_df.loc[valid_mask, param_cols].values.astype(float) y_obs = exp_df.loc[valid_mask, "Response"].values.astype(float) param_names = param_cols n_dims = len(param_names) bounds = [] for col in param_cols: all_vals = exp_df[col].dropna().astype(float) lo, hi = all_vals.min(), all_vals.max() if lo == hi: lo -= 1; hi += 1 bounds.append((lo, hi)) try: model = build_surrogate(model_type, kernel_name, n_dims, int(n_estimators), float(alpha_val), int(n_restarts)) model.fit(X_obs, y_obs) except Exception as e: return make_error(f"⚠️ Model fitting failed: {str(e)}") mu_train, sigma_train = predict_with_uncertainty(model, X_obs, model_type) ss_res = np.sum((y_obs - mu_train) ** 2) ss_tot = np.sum((y_obs - y_obs.mean()) ** 2) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 rmse = np.sqrt(np.mean((y_obs - mu_train) ** 2)) y_best = y_obs.max() if maximize else y_obs.min() best_idx = np.argmax(y_obs) if maximize else np.argmin(y_obs) fit_report = f"""### 🤖 Model Fit Report | Metric | Value | |--------|-------| | Model | {model_type} ({kernel_name if model_type == 'Gaussian Process' else ''}) | | Training Points | {len(X_obs)} | | R² Score | {r2:.4f} | | RMSE | {rmse:.4f} | | Best Observed | {y_best:.4f} (Run #{int(exp_df.loc[valid_mask].iloc[best_idx]['Run']) if 'Run' in exp_df.columns else best_idx+1}) | | Objective | {'Maximize ↑' if maximize else 'Minimize ↓'} | """ if model_type == "Gaussian Process": try: fit_report += f"| Kernel (fitted) | `{model.kernel_}` |\n" fit_report += f"| Log Marginal Likelihood | {model.log_marginal_likelihood_value_:.2f} |\n" except: pass try: X_sug, acq_sug, mu_sug, sigma_sug = suggest_next_experiments( model, model_type, acq_name, bounds, int(n_suggestions), y_best, float(xi_val), float(kappa_val), maximize ) sug_df = pd.DataFrame(X_sug, columns=param_names) sug_df.insert(0, "Suggestion #", range(1, len(sug_df) + 1)) sug_df["Predicted Mean"] = mu_sug.round(4) sug_df["Predicted σ"] = sigma_sug.round(4) sug_df["Acquisition Value"] = acq_sug.round(6) for col in param_names: sug_df[col] = sug_df[col].round(4) except Exception as e: sug_df = pd.DataFrame({"Error": [str(e)]}) # Generate all plots with error handling def safe_plot(fn, *args, plotly=False, **kwargs): try: return fn(*args, **kwargs) except Exception: return go.Figure() if plotly else plt.figure() fig_surface = safe_plot(plot_gp_surface_2d, model, model_type, param_names, bounds, X_obs, y_obs) fig_1d = safe_plot(plot_gp_1d_slices, model, model_type, param_names, bounds, X_obs, y_obs) fig_acq = safe_plot(plot_acquisition_landscape, model, model_type, acq_name, param_names, bounds, X_obs, y_obs, y_best, float(xi_val), float(kappa_val), maximize) fig_conv = safe_plot(plot_convergence, y_obs, maximize) fig_pva = safe_plot(plot_predicted_vs_actual, model, model_type, X_obs, y_obs) fig_imp = safe_plot(plot_feature_importance, model, model_type, param_names) fig_resid = safe_plot(plot_residuals, model, model_type, X_obs, y_obs) fig_3d = safe_plot(plot_3d_surface, model, model_type, param_names, bounds, X_obs, y_obs, plotly=True) fig_parallel = safe_plot(plot_parallel_coordinates, X_obs, y_obs, param_names, plotly=True) fig_unc = safe_plot(plot_uncertainty_heatmap, model, model_type, param_names, bounds, X_obs, plotly=True) fig_corr = safe_plot(plot_correlation_matrix, X_obs, y_obs, param_names, plotly=True) fig_scatter = safe_plot(plot_scatter_matrix, X_obs, y_obs, param_names, plotly=True) model_state_val = { "model": model, "model_type": model_type, "param_names": param_names, "bounds": bounds, "X_obs": X_obs, "y_obs": y_obs, "y_best": y_best, } return (fit_report, sug_df, compute_doe_stats(exp_df), fig_surface, fig_1d, fig_acq, fig_conv, fig_pva, fig_imp, fig_resid, fig_3d, fig_parallel, fig_unc, fig_corr, fig_scatter, model_state_val) def merge_suggestions(exp_df, sug_df): if sug_df is None or len(sug_df) == 0 or exp_df is None: return exp_df if "Error" in sug_df.columns: return exp_df param_cols = [c for c in exp_df.columns if c not in ["Run", "Response"]] sug_params = [c for c in sug_df.columns if c in param_cols] if not sug_params: return exp_df new_rows = [] start_run = int(exp_df["Run"].max()) + 1 if "Run" in exp_df.columns else len(exp_df) + 1 for i, (_, row) in enumerate(sug_df.iterrows()): new_row = {"Run": start_run + i} for col in param_cols: new_row[col] = row[col] if col in sug_df.columns else np.nan new_row["Response"] = np.nan for col in exp_df.columns: if col not in new_row: new_row[col] = np.nan new_rows.append(new_row) return pd.concat([exp_df, pd.DataFrame(new_rows)], ignore_index=True) # ═══════════════════════════════════════════════════════════════════ # BUILD THE DASHBOARD # ═══════════════════════════════════════════════════════════════════ with gr.Blocks(title="DoE & Bayesian Optimization Dashboard", css=CUSTOM_CSS) as demo: model_state = gr.State(value=None) gr.HTML("""

🔬 Design of Experiments & Bayesian Optimization

Plan experiments, fit surrogate models, and get AI-guided suggestions for your next experiments

""") with gr.Tabs(): # ── TAB 1: DESIGN OF EXPERIMENTS ── with gr.Tab("📐 Design of Experiments", id="doe_tab"): gr.HTML('
💡 Define your factors and levels below. Levels can be comma-separated (e.g. 10, 20, 30) or range notation (10:30:3 = 3 points from 10 to 30).
') with gr.Row(): with gr.Column(scale=2): gr.Markdown("### ⚙️ Factor Definition") factor_table = gr.Dataframe( value=create_default_factor_table(), headers=["Factor Name", "Levels (comma-sep or lo:hi:n)"], interactive=True, label="Define Factors & Levels", ) with gr.Row(): add_factor_btn = gr.Button("➕ Add Factor", size="sm") remove_factor_btn = gr.Button("➖ Remove Last", size="sm") with gr.Column(scale=1): gr.Markdown("### 🎯 Design Type") design_type = gr.Dropdown( choices=["Full Factorial", "Fractional Factorial (½)", "Fractional Factorial (¼)", "Latin Hypercube (LHS)", "Sobol Sequence", "Central Composite (CCD)", "Box-Behnken"], value="Full Factorial", label="Design Method", ) n_space_filling = gr.Slider(minimum=4, maximum=200, value=20, step=1, label="Number of Runs (for LHS/Sobol)", visible=False) generate_btn = gr.Button("🚀 Generate Experiment Design", variant="primary", size="lg") doe_stats = gr.Markdown("") gr.Markdown("---") gr.Markdown("### 📋 Experiment Table") gr.HTML('
📝 Edit cells directly. Fill in the Response column with your experimental results. You can add rows or columns as needed.
') experiment_table = gr.Dataframe(value=pd.DataFrame(), interactive=True, label="Experiments (edit Response values here)") with gr.Row(): add_row_btn = gr.Button("➕ Add Row", size="sm") new_col_name = gr.Textbox(label="New Column Name", placeholder="e.g. Yield", scale=2) add_col_btn = gr.Button("➕ Add Column", size="sm") with gr.Row(): download_btn = gr.Button("📥 Export as CSV", size="sm") csv_output = gr.File(label="Download", visible=False) def show_n_runs(dt): return gr.Slider(visible=dt in ["Latin Hypercube (LHS)", "Sobol Sequence"]) design_type.change(show_n_runs, inputs=design_type, outputs=n_space_filling) add_factor_btn.click(add_factor_row, inputs=factor_table, outputs=factor_table) remove_factor_btn.click(remove_last_factor, inputs=factor_table, outputs=factor_table) def gen_and_stats(ft, dt, n): df = generate_design(ft, dt, n) return df, compute_doe_stats(df) generate_btn.click(gen_and_stats, inputs=[factor_table, design_type, n_space_filling], outputs=[experiment_table, doe_stats]) add_row_btn.click(add_experiment_row, inputs=experiment_table, outputs=experiment_table) add_col_btn.click(add_column_to_experiments, inputs=[experiment_table, new_col_name], outputs=experiment_table) def export_csv(df): if df is None or len(df) == 0: return gr.File(visible=False) path = "/tmp/experiment_design.csv" df.to_csv(path, index=False) return gr.File(value=path, visible=True) download_btn.click(export_csv, inputs=experiment_table, outputs=csv_output) # ── TAB 2: BAYESIAN OPTIMIZATION ── with gr.Tab("🤖 Bayesian Optimization", id="bo_tab"): gr.HTML('
🧠 Configure your surrogate model and acquisition function, then click "Run" to fit the model and get suggestions for your next experiments.
') with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 🔧 Surrogate Model") model_type = gr.Dropdown( choices=["Gaussian Process", "Random Forest", "Extra Trees", "Gradient Boosting"], value="Gaussian Process", label="Model Type", ) with gr.Group() as gp_options: kernel_type = gr.Dropdown( choices=["Matérn 5/2", "Matérn 3/2", "RBF", "Rational Quadratic", "RBF + Matérn"], value="Matérn 5/2", label="GP Kernel", ) alpha_val = gr.Number(value=1e-6, label="Alpha (noise regularization)", precision=8) n_restarts = gr.Slider(1, 20, value=5, step=1, label="Optimizer Restarts") with gr.Group(visible=False) as tree_options: n_estimators = gr.Slider(10, 500, value=100, step=10, label="Number of Trees") gr.Markdown("---") gr.Markdown("### 🎯 Acquisition Function") acq_function = gr.Dropdown( choices=["Expected Improvement (EI)", "Probability of Improvement (PI)", "Upper Confidence Bound (UCB)", "Thompson Sampling"], value="Expected Improvement (EI)", label="Acquisition Function", ) with gr.Group() as ei_pi_options: xi_param = gr.Slider(0.0, 1.0, value=0.01, step=0.001, label="ξ (exploration-exploitation)") with gr.Group(visible=False) as ucb_options: kappa_param = gr.Slider(0.1, 10.0, value=2.0, step=0.1, label="κ (exploration weight)") gr.Markdown("---") gr.Markdown("### 📊 Optimization Settings") maximize_toggle = gr.Checkbox(value=True, label="Maximize (uncheck to minimize)") n_suggestions = gr.Slider(1, 20, value=5, step=1, label="Number of Suggestions") run_bo_btn = gr.Button("🚀 Run Optimization", variant="primary", size="lg") with gr.Column(scale=2): gr.Markdown("### 📈 Model Fit Report") fit_report_md = gr.Markdown("*Run the optimizer to see results...*") gr.Markdown("### 💡 Suggested Next Experiments") suggestion_table = gr.Dataframe(label="Next Experiments to Run", interactive=False) merge_btn = gr.Button("📋 Add Suggestions to Experiment Table", variant="secondary") merge_status = gr.Markdown("") doe_stats_bo = gr.Markdown("") def toggle_model_options(mt): return gr.Group(visible=mt == "Gaussian Process"), gr.Group(visible=mt != "Gaussian Process") def toggle_acq_options(acq): is_ucb = acq == "Upper Confidence Bound (UCB)" is_ts = acq == "Thompson Sampling" return gr.Group(visible=not is_ucb and not is_ts), gr.Group(visible=is_ucb) model_type.change(toggle_model_options, inputs=model_type, outputs=[gp_options, tree_options]) acq_function.change(toggle_acq_options, inputs=acq_function, outputs=[ei_pi_options, ucb_options]) # ── TAB 3: VISUALIZATIONS ── with gr.Tab("📊 Visualizations", id="viz_tab"): gr.HTML('
📉 Toggle visualizations on/off. All plots update when you run the optimizer. Use the checkboxes to show/hide each visualization.
') gr.Markdown("### 🎛️ Visualization Controls") with gr.Row(): show_surface = gr.Checkbox(value=True, label="🗺️ Surrogate Surface (2D)") show_1d = gr.Checkbox(value=True, label="📉 1D Parameter Slices") show_3d = gr.Checkbox(value=True, label="🏔️ 3D Response Surface") show_acq = gr.Checkbox(value=True, label="🎯 Acquisition Landscape") with gr.Row(): show_convergence = gr.Checkbox(value=True, label="📈 Convergence Plot") show_pva = gr.Checkbox(value=True, label="⚖️ Predicted vs Actual") show_importance = gr.Checkbox(value=True, label="📊 Feature Importance") show_residuals = gr.Checkbox(value=True, label="📏 Residual Analysis") with gr.Row(): show_parallel = gr.Checkbox(value=True, label="🔀 Parallel Coordinates") show_uncertainty = gr.Checkbox(value=True, label="🌡️ Uncertainty Heatmap") show_correlation = gr.Checkbox(value=True, label="🔗 Correlation Matrix") show_scatter_mat = gr.Checkbox(value=False, label="🔘 Scatter Matrix") gr.Markdown("---") with gr.Column(visible=True) as surface_col: gr.Markdown("### 🗺️ Surrogate Model Surface (2D Contour)") surface_plot = gr.Plot(label="Mean & Uncertainty Surface") with gr.Column(visible=True) as slices_col: gr.Markdown("### 📉 1D Parameter Effect Slices") slices_plot = gr.Plot(label="1D GP Slices") with gr.Column(visible=True) as three_d_col: gr.Markdown("### 🏔️ 3D Response Surface (Interactive)") three_d_plot = gr.Plot(label="3D Surface") with gr.Column(visible=True) as acq_col: gr.Markdown("### 🎯 Acquisition Function Landscape") acq_plot = gr.Plot(label="Acquisition Function") with gr.Column(visible=True) as conv_col: gr.Markdown("### 📈 Optimization Convergence") convergence_plot = gr.Plot(label="Convergence") with gr.Column(visible=True) as pva_col: gr.Markdown("### ⚖️ Predicted vs Actual (Parity Plot)") pva_plot = gr.Plot(label="Predicted vs Actual") with gr.Column(visible=True) as imp_col: gr.Markdown("### 📊 Feature Importance") importance_plot = gr.Plot(label="Feature Importance") with gr.Column(visible=True) as resid_col: gr.Markdown("### 📏 Residual Analysis") residuals_plot = gr.Plot(label="Residuals") with gr.Column(visible=True) as parallel_col: gr.Markdown("### 🔀 Parallel Coordinates") parallel_plot = gr.Plot(label="Parallel Coordinates") with gr.Column(visible=True) as unc_col: gr.Markdown("### 🌡️ Prediction Uncertainty Map") unc_plot = gr.Plot(label="Uncertainty Heatmap") with gr.Column(visible=True) as corr_col: gr.Markdown("### 🔗 Correlation Matrix") corr_plot = gr.Plot(label="Correlation Matrix") with gr.Column(visible=False) as scatter_mat_col: gr.Markdown("### 🔘 Scatter Matrix (Pair Plot)") scatter_mat_plot = gr.Plot(label="Scatter Matrix") show_surface.change(lambda v: gr.Column(visible=v), inputs=show_surface, outputs=surface_col) show_1d.change(lambda v: gr.Column(visible=v), inputs=show_1d, outputs=slices_col) show_3d.change(lambda v: gr.Column(visible=v), inputs=show_3d, outputs=three_d_col) show_acq.change(lambda v: gr.Column(visible=v), inputs=show_acq, outputs=acq_col) show_convergence.change(lambda v: gr.Column(visible=v), inputs=show_convergence, outputs=conv_col) show_pva.change(lambda v: gr.Column(visible=v), inputs=show_pva, outputs=pva_col) show_importance.change(lambda v: gr.Column(visible=v), inputs=show_importance, outputs=imp_col) show_residuals.change(lambda v: gr.Column(visible=v), inputs=show_residuals, outputs=resid_col) show_parallel.change(lambda v: gr.Column(visible=v), inputs=show_parallel, outputs=parallel_col) show_uncertainty.change(lambda v: gr.Column(visible=v), inputs=show_uncertainty, outputs=unc_col) show_correlation.change(lambda v: gr.Column(visible=v), inputs=show_correlation, outputs=corr_col) show_scatter_mat.change(lambda v: gr.Column(visible=v), inputs=show_scatter_mat, outputs=scatter_mat_col) # ── TAB 4: GUIDE ── with gr.Tab("📖 Guide", id="guide_tab"): gr.Markdown(""" ## 📖 User Guide ### 🔄 Workflow 1. **Define Factors** — In the DoE tab, set up your experimental parameters and their levels 2. **Generate Design** — Choose a design type and generate the experiment matrix 3. **Run Experiments** — Perform your physical/simulated experiments and fill in the Response column 4. **Fit Surrogate** — Go to the BO tab, configure your model, and click Run 5. **Get Suggestions** — The optimizer will suggest the most promising next experiments 6. **Iterate** — Add suggestions to your table, run those experiments, and repeat ### 📐 Design Types | Design | Best For | Notes | |--------|----------|-------| | **Full Factorial** | Few factors, few levels | Tests every combination. Grows exponentially! | | **Fractional Factorial** | Many factors, screening | Random subset of full factorial | | **Latin Hypercube (LHS)** | Space-filling, BO initialization | Even coverage of parameter space | | **Sobol Sequence** | High-dimensional, quasi-random | Low-discrepancy, n should be power of 2 | | **Central Composite (CCD)** | Response surface methodology | Factorial + axial + center points | | **Box-Behnken** | 3+ factors, fewer runs than CCD | No extreme corner points | ### 🤖 Surrogate Models | Model | Strengths | Weaknesses | |-------|-----------|------------| | **Gaussian Process** | Principled uncertainty, smooth | Scales O(n³), max ~500 points | | **Random Forest** | Handles categoricals, robust | Step-wise predictions, no gradients | | **Extra Trees** | Faster than RF, good uncertainty | Similar to RF | | **Gradient Boosting** | Strong predictive power | Weaker uncertainty estimates | ### 🎯 Acquisition Functions | Function | Behavior | Parameters | |----------|----------|------------| | **Expected Improvement (EI)** | Balanced exploration/exploitation | ξ: higher = more exploration | | **Probability of Improvement (PI)** | Greedy, exploits known good regions | ξ: trade-off parameter | | **Upper Confidence Bound (UCB)** | Direct exploration control | κ: higher = more exploration | | **Thompson Sampling** | Stochastic, good for batch | None (randomized) | ### 🔑 GP Kernels | Kernel | Best For | |--------|----------| | **Matérn 5/2** | Most physical/chemical processes (default) | | **Matérn 3/2** | Rougher functions | | **RBF** | Very smooth functions | | **Rational Quadratic** | Multi-scale patterns | | **RBF + Matérn** | Complex functions with mixed smoothness | ### 📊 Visualization Guide - **Surrogate Surface**: 2D contour of model predictions + uncertainty - **1D Slices**: Effect of each parameter while others fixed at center - **3D Surface**: Interactive 3D view of response surface (Plotly) - **Acquisition Landscape**: Where the optimizer "wants" to sample next - **Convergence**: Best-so-far trajectory over experiments - **Predicted vs Actual**: Model accuracy check (parity plot) - **Feature Importance**: Which parameters matter most - **Residuals**: Diagnostic plots for model assumptions - **Parallel Coordinates**: Multi-dimensional experiment overview - **Uncertainty Heatmap**: Where the model is least certain - **Correlation Matrix**: Parameter-response correlations - **Scatter Matrix**: All pairwise relationships """) # ── WIRING ── run_bo_btn.click( fn=run_bo_pipeline, inputs=[experiment_table, model_type, kernel_type, acq_function, n_suggestions, xi_param, kappa_param, maximize_toggle, n_estimators, alpha_val, n_restarts], outputs=[fit_report_md, suggestion_table, doe_stats_bo, surface_plot, slices_plot, acq_plot, convergence_plot, pva_plot, importance_plot, residuals_plot, three_d_plot, parallel_plot, unc_plot, corr_plot, scatter_mat_plot, model_state], ) def do_merge(exp_df, sug_df): merged = merge_suggestions(exp_df, sug_df) return merged, "✅ Suggestions added to experiment table! Switch to DoE tab to see them." merge_btn.click(do_merge, inputs=[experiment_table, suggestion_table], outputs=[experiment_table, merge_status]) if __name__ == "__main__": demo.launch()