| """ |
| DoE & Bayesian Optimization Dashboard |
| A comprehensive tool for experimentalists to design experiments, |
| fit surrogate models, and get AI-guided suggestions for next experiments. |
| """ |
|
|
| import gradio as gr |
| import numpy as np |
| import pandas as pd |
| import itertools |
| import matplotlib |
| matplotlib.use("Agg") |
| import matplotlib.pyplot as plt |
| import plotly.express as px |
| import plotly.graph_objects as go |
| from plotly.subplots import make_subplots |
| from scipy.stats import norm |
| from scipy.stats.qmc import LatinHypercube, Sobol, scale as qmc_scale |
| from scipy.optimize import minimize as sp_minimize |
| from sklearn.gaussian_process import GaussianProcessRegressor |
| from sklearn.gaussian_process.kernels import ( |
| Matern, RBF, ConstantKernel as C, WhiteKernel, RationalQuadratic, DotProduct |
| ) |
| from sklearn.ensemble import RandomForestRegressor, ExtraTreesRegressor, GradientBoostingRegressor |
| from sklearn.preprocessing import StandardScaler |
| import warnings |
| warnings.filterwarnings("ignore") |
|
|
| |
| |
| |
|
|
| def parse_levels(level_str): |
| """Parse a level string like '10, 20, 30' or '10:30:3' into a list of floats.""" |
| level_str = str(level_str).strip() |
| if not level_str: |
| return [] |
| if ":" in level_str: |
| parts = level_str.split(":") |
| if len(parts) == 3: |
| lo, hi, n = float(parts[0]), float(parts[1]), int(parts[2]) |
| return list(np.linspace(lo, hi, n)) |
| elif len(parts) == 2: |
| lo, hi = float(parts[0]), float(parts[1]) |
| return [lo, hi] |
| else: |
| return [float(x.strip()) for x in level_str.split(",") if x.strip()] |
|
|
| def generate_full_factorial(factor_df): |
| """Generate full factorial design from a factor definition table.""" |
| if factor_df is None or len(factor_df) == 0: |
| return pd.DataFrame({"Error": ["No factors defined"]}) |
| |
| factors = {} |
| for _, row in factor_df.iterrows(): |
| name = str(row.iloc[0]).strip() |
| levels = parse_levels(row.iloc[1]) |
| if name and levels: |
| factors[name] = levels |
| |
| if not factors: |
| return pd.DataFrame({"Error": ["No valid factors found. Use format: '10,20,30' or '10:30:3'"]}) |
| |
| names = list(factors.keys()) |
| level_lists = list(factors.values()) |
| combos = list(itertools.product(*level_lists)) |
| df = pd.DataFrame(combos, columns=names) |
| df.insert(0, "Run", range(1, len(df) + 1)) |
| df["Response"] = np.nan |
| return df |
|
|
|
|
| def generate_fractional_factorial(factor_df, fraction=0.5): |
| """Generate fractional factorial (random subset).""" |
| full = generate_full_factorial(factor_df) |
| if "Error" in full.columns: |
| return full |
| n_keep = max(2, int(len(full) * fraction)) |
| sampled = full.sample(n=n_keep, random_state=42).reset_index(drop=True) |
| sampled["Run"] = range(1, len(sampled) + 1) |
| return sampled |
|
|
|
|
| def generate_lhs_design(factor_df, n_runs=20): |
| """Generate Latin Hypercube Sampling design.""" |
| if factor_df is None or len(factor_df) == 0: |
| return pd.DataFrame({"Error": ["No factors defined"]}) |
| |
| bounds = [] |
| names = [] |
| for _, row in factor_df.iterrows(): |
| name = str(row.iloc[0]).strip() |
| levels = parse_levels(row.iloc[1]) |
| if name and len(levels) >= 2: |
| names.append(name) |
| bounds.append((min(levels), max(levels))) |
| |
| if not names: |
| return pd.DataFrame({"Error": ["Need at least 2 levels per factor for LHS"]}) |
| |
| d = len(names) |
| n_runs = int(n_runs) |
| sampler = LatinHypercube(d=d, seed=42) |
| X = sampler.random(n=n_runs) |
| lb = np.array([b[0] for b in bounds]) |
| ub = np.array([b[1] for b in bounds]) |
| X_scaled = qmc_scale(X, lb, ub) |
| |
| df = pd.DataFrame(X_scaled, columns=names) |
| df.insert(0, "Run", range(1, n_runs + 1)) |
| df["Response"] = np.nan |
| for col in names: |
| df[col] = df[col].round(4) |
| return df |
|
|
|
|
| def generate_sobol_design(factor_df, n_runs=16): |
| """Generate Sobol sequence design.""" |
| if factor_df is None or len(factor_df) == 0: |
| return pd.DataFrame({"Error": ["No factors defined"]}) |
| |
| bounds = [] |
| names = [] |
| for _, row in factor_df.iterrows(): |
| name = str(row.iloc[0]).strip() |
| levels = parse_levels(row.iloc[1]) |
| if name and len(levels) >= 2: |
| names.append(name) |
| bounds.append((min(levels), max(levels))) |
| |
| if not names: |
| return pd.DataFrame({"Error": ["Need at least 2 levels per factor"]}) |
| |
| d = len(names) |
| n_runs = int(n_runs) |
| n_actual = 2 ** int(np.ceil(np.log2(max(n_runs, 2)))) |
| sampler = Sobol(d=d, scramble=True, seed=42) |
| X = sampler.random(n=n_actual) |
| lb = np.array([b[0] for b in bounds]) |
| ub = np.array([b[1] for b in bounds]) |
| X_scaled = qmc_scale(X, lb, ub) |
| |
| df = pd.DataFrame(X_scaled[:n_runs], columns=names) |
| df.insert(0, "Run", range(1, n_runs + 1)) |
| df["Response"] = np.nan |
| for col in names: |
| df[col] = df[col].round(4) |
| return df |
|
|
|
|
| def generate_central_composite(factor_df): |
| """Generate Central Composite Design (CCD).""" |
| if factor_df is None or len(factor_df) == 0: |
| return pd.DataFrame({"Error": ["No factors defined"]}) |
| |
| bounds = [] |
| names = [] |
| for _, row in factor_df.iterrows(): |
| name = str(row.iloc[0]).strip() |
| levels = parse_levels(row.iloc[1]) |
| if name and len(levels) >= 2: |
| names.append(name) |
| bounds.append((min(levels), max(levels))) |
| |
| if not names: |
| return pd.DataFrame({"Error": ["Need at least 2 levels per factor"]}) |
| |
| k = len(names) |
| lb = np.array([b[0] for b in bounds]) |
| ub = np.array([b[1] for b in bounds]) |
| center = (lb + ub) / 2 |
| half_range = (ub - lb) / 2 |
| |
| factorial_coded = np.array(list(itertools.product([-1, 1], repeat=k))) |
| alpha = np.sqrt(k) |
| axial_coded = np.zeros((2 * k, k)) |
| for i in range(k): |
| axial_coded[2 * i, i] = -alpha |
| axial_coded[2 * i + 1, i] = alpha |
| center_coded = np.zeros((3, k)) |
| |
| all_coded = np.vstack([factorial_coded, axial_coded, center_coded]) |
| all_actual = center + all_coded * half_range |
| all_actual = np.clip(all_actual, lb, ub) |
| |
| df = pd.DataFrame(all_actual, columns=names) |
| df.insert(0, "Run", range(1, len(df) + 1)) |
| df["Response"] = np.nan |
| for col in names: |
| df[col] = df[col].round(4) |
| return df |
|
|
|
|
| def generate_box_behnken(factor_df): |
| """Generate Box-Behnken design (for 3+ factors).""" |
| if factor_df is None or len(factor_df) == 0: |
| return pd.DataFrame({"Error": ["No factors defined"]}) |
| |
| bounds = [] |
| names = [] |
| for _, row in factor_df.iterrows(): |
| name = str(row.iloc[0]).strip() |
| levels = parse_levels(row.iloc[1]) |
| if name and len(levels) >= 2: |
| names.append(name) |
| bounds.append((min(levels), max(levels))) |
| |
| if len(names) < 3: |
| return pd.DataFrame({"Error": ["Box-Behnken requires at least 3 factors"]}) |
| |
| k = len(names) |
| lb = np.array([b[0] for b in bounds]) |
| ub = np.array([b[1] for b in bounds]) |
| center = (lb + ub) / 2 |
| half_range = (ub - lb) / 2 |
| |
| runs_coded = [] |
| for i in range(k): |
| for j in range(i + 1, k): |
| for vi in [-1, 1]: |
| for vj in [-1, 1]: |
| row = [0] * k |
| row[i] = vi |
| row[j] = vj |
| runs_coded.append(row) |
| for _ in range(3): |
| runs_coded.append([0] * k) |
| |
| all_coded = np.array(runs_coded) |
| all_actual = center + all_coded * half_range |
| |
| df = pd.DataFrame(all_actual, columns=names) |
| df.insert(0, "Run", range(1, len(df) + 1)) |
| df["Response"] = np.nan |
| for col in names: |
| df[col] = df[col].round(4) |
| return df |
|
|
|
|
| |
| |
| |
|
|
| KERNEL_MAP = { |
| "MatΓ©rn 5/2": lambda d: C(1.0, (1e-3, 1e3)) * Matern(length_scale=np.ones(d), nu=2.5) + WhiteKernel(noise_level=1e-3), |
| "MatΓ©rn 3/2": lambda d: C(1.0, (1e-3, 1e3)) * Matern(length_scale=np.ones(d), nu=1.5) + WhiteKernel(noise_level=1e-3), |
| "RBF": lambda d: C(1.0, (1e-3, 1e3)) * RBF(length_scale=np.ones(d)) + WhiteKernel(noise_level=1e-3), |
| "Rational Quadratic": lambda d: C(1.0, (1e-3, 1e3)) * RationalQuadratic() + WhiteKernel(noise_level=1e-3), |
| "RBF + MatΓ©rn": lambda d: C(1.0) * RBF(length_scale=np.ones(d)) + C(0.5) * Matern(length_scale=np.ones(d), nu=2.5) + WhiteKernel(noise_level=1e-3), |
| } |
|
|
|
|
| def build_surrogate(model_type, kernel_name, n_dims, n_estimators=100, alpha=1e-6, n_restarts=5): |
| """Build a surrogate model based on type selection.""" |
| if model_type == "Gaussian Process": |
| kernel_fn = KERNEL_MAP.get(kernel_name, KERNEL_MAP["MatΓ©rn 5/2"]) |
| kernel = kernel_fn(n_dims) |
| model = GaussianProcessRegressor( |
| kernel=kernel, |
| alpha=alpha, |
| n_restarts_optimizer=n_restarts, |
| normalize_y=True |
| ) |
| return model |
| elif model_type == "Random Forest": |
| return RandomForestRegressor(n_estimators=n_estimators, min_samples_leaf=2, random_state=42) |
| elif model_type == "Extra Trees": |
| return ExtraTreesRegressor(n_estimators=n_estimators, min_samples_leaf=2, random_state=42) |
| elif model_type == "Gradient Boosting": |
| return GradientBoostingRegressor(n_estimators=n_estimators, max_depth=4, random_state=42) |
| else: |
| return GaussianProcessRegressor(normalize_y=True) |
|
|
|
|
| def predict_with_uncertainty(model, X, model_type="Gaussian Process"): |
| """Predict mean and std for any surrogate model type.""" |
| X = np.atleast_2d(X) |
| if model_type == "Gaussian Process": |
| mu, sigma = model.predict(X, return_std=True) |
| return mu, sigma |
| elif model_type in ["Random Forest", "Extra Trees"]: |
| preds = np.array([tree.predict(X) for tree in model.estimators_]) |
| mu = preds.mean(axis=0) |
| sigma = preds.std(axis=0) |
| return mu, sigma |
| elif model_type == "Gradient Boosting": |
| mu = model.predict(X) |
| staged = np.array(list(model.staged_predict(X))) |
| sigma = staged.std(axis=0) |
| return mu, sigma |
| else: |
| mu = model.predict(X) |
| return mu, np.zeros_like(mu) |
|
|
|
|
| def expected_improvement(X, model, y_best, xi=0.01, model_type="Gaussian Process", maximize=True): |
| mu, sigma = predict_with_uncertainty(model, X, model_type) |
| sigma = np.maximum(sigma, 1e-9) |
| if maximize: |
| Z = (mu - y_best - xi) / sigma |
| else: |
| Z = (y_best - mu - xi) / sigma |
| ei = sigma * (Z * norm.cdf(Z) + norm.pdf(Z)) |
| ei[sigma < 1e-10] = 0.0 |
| return ei |
|
|
|
|
| def probability_of_improvement(X, model, y_best, xi=0.01, model_type="Gaussian Process", maximize=True): |
| mu, sigma = predict_with_uncertainty(model, X, model_type) |
| sigma = np.maximum(sigma, 1e-9) |
| if maximize: |
| Z = (mu - y_best - xi) / sigma |
| else: |
| Z = (y_best - mu - xi) / sigma |
| return norm.cdf(Z) |
|
|
|
|
| def upper_confidence_bound(X, model, kappa=2.0, model_type="Gaussian Process", maximize=True): |
| mu, sigma = predict_with_uncertainty(model, X, model_type) |
| if maximize: |
| return mu + kappa * sigma |
| else: |
| return -(mu - kappa * sigma) |
|
|
|
|
| def thompson_sampling(X, model, model_type="Gaussian Process", maximize=True): |
| mu, sigma = predict_with_uncertainty(model, X, model_type) |
| samples = np.random.normal(mu, np.maximum(sigma, 1e-9)) |
| if not maximize: |
| samples = -samples |
| return samples |
|
|
|
|
| ACQ_MAP = { |
| "Expected Improvement (EI)": expected_improvement, |
| "Probability of Improvement (PI)": probability_of_improvement, |
| "Upper Confidence Bound (UCB)": upper_confidence_bound, |
| "Thompson Sampling": thompson_sampling, |
| } |
|
|
|
|
| def suggest_next_experiments(model, model_type, acq_name, bounds, n_suggestions, |
| y_best, xi=0.01, kappa=2.0, maximize=True, n_candidates=5000): |
| """Suggest next experiments using acquisition function optimization.""" |
| d = len(bounds) |
| lb = np.array([b[0] for b in bounds]) |
| ub = np.array([b[1] for b in bounds]) |
| |
| sampler = LatinHypercube(d=d, seed=np.random.randint(0, 10000)) |
| X_candidates = qmc_scale(sampler.random(n=n_candidates), lb, ub) |
| |
| acq_fn = ACQ_MAP.get(acq_name, expected_improvement) |
| if acq_name == "Upper Confidence Bound (UCB)": |
| acq_vals = acq_fn(X_candidates, model, kappa=kappa, model_type=model_type, maximize=maximize) |
| elif acq_name == "Thompson Sampling": |
| acq_vals = acq_fn(X_candidates, model, model_type=model_type, maximize=maximize) |
| else: |
| acq_vals = acq_fn(X_candidates, model, y_best, xi=xi, model_type=model_type, maximize=maximize) |
| |
| top_indices = np.argsort(acq_vals)[::-1] |
| selected = [] |
| selected_points = [] |
| |
| for idx in top_indices: |
| if len(selected) >= n_suggestions: |
| break |
| pt = X_candidates[idx] |
| if selected_points: |
| dists = [np.linalg.norm(pt - sp) for sp in selected_points] |
| min_dist = min(dists) |
| range_diag = np.linalg.norm(ub - lb) |
| if min_dist < 0.02 * range_diag: |
| continue |
| selected.append(idx) |
| selected_points.append(pt) |
| |
| X_suggested = X_candidates[selected] |
| acq_at_suggested = acq_vals[selected] |
| mu_sug, sigma_sug = predict_with_uncertainty(model, X_suggested, model_type) |
| |
| return X_suggested, acq_at_suggested, mu_sug, sigma_sug |
|
|
|
|
| |
| |
| |
|
|
| def plot_gp_surface_2d(model, model_type, param_names, bounds, X_obs, y_obs, |
| resolution=50, dim_x=0, dim_y=1, fixed_values=None): |
| """2D contour plot of surrogate model prediction surface.""" |
| if len(param_names) < 2: |
| fig, ax = plt.subplots(figsize=(8, 5)) |
| ax.text(0.5, 0.5, "Need at least 2 parameters for 2D surface", |
| ha='center', va='center', fontsize=14, transform=ax.transAxes) |
| return fig |
| |
| x1_range = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution) |
| x2_range = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution) |
| X1, X2 = np.meshgrid(x1_range, x2_range) |
| |
| d = len(param_names) |
| X_grid = np.zeros((resolution * resolution, d)) |
| X_grid[:, dim_x] = X1.ravel() |
| X_grid[:, dim_y] = X2.ravel() |
| |
| for i in range(d): |
| if i != dim_x and i != dim_y: |
| if fixed_values and i < len(fixed_values): |
| X_grid[:, i] = fixed_values[i] |
| else: |
| X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2 |
| |
| mu, sigma = predict_with_uncertainty(model, X_grid, model_type) |
| |
| fig, axes = plt.subplots(1, 2, figsize=(14, 5)) |
| |
| c1 = axes[0].contourf(X1, X2, mu.reshape(resolution, resolution), levels=25, cmap='viridis') |
| axes[0].scatter(X_obs[:, dim_x], X_obs[:, dim_y], c='red', s=60, edgecolors='white', |
| zorder=5, label='Observations') |
| plt.colorbar(c1, ax=axes[0]) |
| axes[0].set_xlabel(param_names[dim_x], fontsize=11) |
| axes[0].set_ylabel(param_names[dim_y], fontsize=11) |
| axes[0].set_title("Surrogate Mean Prediction", fontsize=12, fontweight='bold') |
| axes[0].legend(fontsize=9) |
| |
| c2 = axes[1].contourf(X1, X2, sigma.reshape(resolution, resolution), levels=25, cmap='YlOrRd') |
| axes[1].scatter(X_obs[:, dim_x], X_obs[:, dim_y], c='blue', s=60, edgecolors='white', |
| zorder=5, label='Observations') |
| plt.colorbar(c2, ax=axes[1]) |
| axes[1].set_xlabel(param_names[dim_x], fontsize=11) |
| axes[1].set_ylabel(param_names[dim_y], fontsize=11) |
| axes[1].set_title("Prediction Uncertainty (Ο)", fontsize=12, fontweight='bold') |
| axes[1].legend(fontsize=9) |
| |
| plt.tight_layout() |
| return fig |
|
|
|
|
| def plot_gp_1d_slices(model, model_type, param_names, bounds, X_obs, y_obs): |
| """1D slice plots through each parameter dimension.""" |
| d = len(param_names) |
| n_cols = min(3, d) |
| n_rows = int(np.ceil(d / n_cols)) |
| fig, axes = plt.subplots(n_rows, n_cols, figsize=(5 * n_cols, 4 * n_rows)) |
| if d == 1: |
| axes = np.array([axes]) |
| axes = np.atleast_2d(axes) |
| |
| center = np.array([(b[0] + b[1]) / 2 for b in bounds]) |
| |
| for i in range(d): |
| row, col = divmod(i, n_cols) |
| ax = axes[row, col] |
| x_range = np.linspace(bounds[i][0], bounds[i][1], 200) |
| X_plot = np.tile(center, (200, 1)) |
| X_plot[:, i] = x_range |
| |
| mu, sigma = predict_with_uncertainty(model, X_plot, model_type) |
| |
| ax.fill_between(x_range, mu - 2 * sigma, mu + 2 * sigma, alpha=0.2, color='steelblue', label='Β±2Ο') |
| ax.fill_between(x_range, mu - sigma, mu + sigma, alpha=0.3, color='steelblue', label='Β±1Ο') |
| ax.plot(x_range, mu, 'b-', linewidth=2, label='Mean') |
| ax.scatter(X_obs[:, i], y_obs, c='red', s=40, zorder=5, label='Observations') |
| ax.set_xlabel(param_names[i], fontsize=11) |
| ax.set_ylabel("Response", fontsize=11) |
| ax.set_title(f"Effect of {param_names[i]}", fontsize=11, fontweight='bold') |
| ax.legend(fontsize=8) |
| ax.grid(True, alpha=0.3) |
| |
| for i in range(d, n_rows * n_cols): |
| row, col = divmod(i, n_cols) |
| axes[row, col].set_visible(False) |
| |
| plt.tight_layout() |
| return fig |
|
|
|
|
| def plot_acquisition_landscape(model, model_type, acq_name, param_names, bounds, |
| X_obs, y_obs, y_best, xi, kappa, maximize, |
| dim_x=0, dim_y=1, resolution=60): |
| """Plot the acquisition function landscape.""" |
| if len(param_names) < 2: |
| fig, ax = plt.subplots(figsize=(8, 5)) |
| ax.text(0.5, 0.5, "Need at least 2 parameters", ha='center', va='center', fontsize=14, |
| transform=ax.transAxes) |
| return fig |
| |
| x1_range = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution) |
| x2_range = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution) |
| X1, X2 = np.meshgrid(x1_range, x2_range) |
| |
| d = len(param_names) |
| X_grid = np.zeros((resolution * resolution, d)) |
| X_grid[:, dim_x] = X1.ravel() |
| X_grid[:, dim_y] = X2.ravel() |
| for i in range(d): |
| if i != dim_x and i != dim_y: |
| X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2 |
| |
| acq_fn = ACQ_MAP.get(acq_name, expected_improvement) |
| if acq_name == "Upper Confidence Bound (UCB)": |
| acq_vals = acq_fn(X_grid, model, kappa=kappa, model_type=model_type, maximize=maximize) |
| elif acq_name == "Thompson Sampling": |
| acq_vals = acq_fn(X_grid, model, model_type=model_type, maximize=maximize) |
| else: |
| acq_vals = acq_fn(X_grid, model, y_best, xi=xi, model_type=model_type, maximize=maximize) |
| |
| fig, ax = plt.subplots(figsize=(8, 6)) |
| c = ax.contourf(X1, X2, acq_vals.reshape(resolution, resolution), levels=25, cmap='plasma') |
| ax.scatter(X_obs[:, dim_x], X_obs[:, dim_y], c='white', s=60, edgecolors='black', |
| zorder=5, label='Observations') |
| |
| best_idx = np.argmax(acq_vals) |
| ax.scatter(X_grid[best_idx, dim_x], X_grid[best_idx, dim_y], |
| marker='*', c='yellow', s=300, edgecolors='black', zorder=10, label='Best acquisition') |
| |
| plt.colorbar(c, ax=ax) |
| ax.set_xlabel(param_names[dim_x], fontsize=11) |
| ax.set_ylabel(param_names[dim_y], fontsize=11) |
| ax.set_title(f"Acquisition Function: {acq_name}", fontsize=12, fontweight='bold') |
| ax.legend(fontsize=9) |
| plt.tight_layout() |
| return fig |
|
|
|
|
| def plot_convergence(y_obs, maximize=True): |
| """Plot optimization convergence.""" |
| if len(y_obs) == 0: |
| fig, ax = plt.subplots(figsize=(8, 5)) |
| ax.text(0.5, 0.5, "No data yet", ha='center', va='center', fontsize=14, transform=ax.transAxes) |
| return fig |
| |
| if maximize: |
| best_so_far = np.maximum.accumulate(y_obs) |
| ylabel = "Best Response (β)" |
| else: |
| best_so_far = np.minimum.accumulate(y_obs) |
| ylabel = "Best Response (β)" |
| |
| iters = np.arange(1, len(y_obs) + 1) |
| |
| fig, ax = plt.subplots(figsize=(9, 5)) |
| ax.plot(iters, best_so_far, 'b-o', linewidth=2, markersize=4, label='Best so far', zorder=4) |
| ax.scatter(iters, y_obs, c='red', s=30, alpha=0.6, zorder=5, label='Individual observations') |
| ax.fill_between(iters, best_so_far, y_obs, alpha=0.1, color='blue') |
| ax.set_xlabel("Experiment Number", fontsize=11) |
| ax.set_ylabel(ylabel, fontsize=11) |
| ax.set_title("Optimization Convergence", fontsize=13, fontweight='bold') |
| ax.legend(fontsize=10) |
| ax.grid(True, alpha=0.3) |
| plt.tight_layout() |
| return fig |
|
|
|
|
| def plot_parallel_coordinates(X_obs, y_obs, param_names): |
| """Interactive parallel coordinates plot with Plotly.""" |
| if len(X_obs) == 0: |
| fig = go.Figure() |
| fig.add_annotation(text="No data yet", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=20)) |
| return fig |
| |
| df = pd.DataFrame(X_obs, columns=param_names) |
| df['Response'] = y_obs |
| |
| dimensions = [] |
| for col in param_names: |
| dimensions.append(dict(range=[df[col].min(), df[col].max()], label=col, values=df[col])) |
| dimensions.append(dict(range=[df['Response'].min(), df['Response'].max()], label='Response', values=df['Response'])) |
| |
| fig = go.Figure(data=go.Parcoords( |
| line=dict(color=df['Response'], colorscale='Viridis', showscale=True, colorbar=dict(title='Response')), |
| dimensions=dimensions |
| )) |
| fig.update_layout(title="Parallel Coordinates: All Experiments", height=450, margin=dict(l=80, r=80, t=60, b=40)) |
| return fig |
|
|
|
|
| def plot_predicted_vs_actual(model, model_type, X_obs, y_obs): |
| """Parity plot: predicted vs actual.""" |
| if len(X_obs) == 0: |
| fig, ax = plt.subplots(figsize=(6, 6)) |
| ax.text(0.5, 0.5, "No data", ha='center', va='center', fontsize=14, transform=ax.transAxes) |
| return fig |
| |
| mu, sigma = predict_with_uncertainty(model, X_obs, model_type) |
| |
| fig, ax = plt.subplots(figsize=(6, 6)) |
| ax.errorbar(y_obs, mu, yerr=2 * sigma, fmt='o', color='steelblue', ecolor='lightblue', |
| elinewidth=1.5, capsize=3, markersize=6, label='Predictions Β± 2Ο') |
| |
| mn = min(y_obs.min(), mu.min()) |
| mx = max(y_obs.max(), mu.max()) |
| pad = (mx - mn) * 0.1 |
| ax.plot([mn - pad, mx + pad], [mn - pad, mx + pad], 'k--', linewidth=1, label='Perfect fit') |
| |
| ss_res = np.sum((y_obs - mu) ** 2) |
| ss_tot = np.sum((y_obs - y_obs.mean()) ** 2) |
| r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 |
| rmse = np.sqrt(np.mean((y_obs - mu) ** 2)) |
| |
| ax.set_xlabel("Actual", fontsize=11) |
| ax.set_ylabel("Predicted", fontsize=11) |
| ax.set_title(f"Predicted vs Actual (RΒ²={r2:.4f}, RMSE={rmse:.4f})", fontsize=12, fontweight='bold') |
| ax.legend(fontsize=9) |
| ax.grid(True, alpha=0.3) |
| ax.set_aspect('equal', adjustable='box') |
| plt.tight_layout() |
| return fig |
|
|
|
|
| def plot_feature_importance(model, model_type, param_names): |
| """Feature importance from surrogate model.""" |
| fig, ax = plt.subplots(figsize=(8, max(3, 0.5 * len(param_names)))) |
| |
| if model_type == "Gaussian Process": |
| try: |
| kernel = model.kernel_ |
| ls = None |
| def find_ls(k): |
| if hasattr(k, 'length_scale') and not isinstance(k, WhiteKernel): |
| return np.atleast_1d(k.length_scale) |
| if hasattr(k, 'k1'): |
| r = find_ls(k.k1) |
| if r is not None: |
| return r |
| if hasattr(k, 'k2'): |
| r = find_ls(k.k2) |
| if r is not None: |
| return r |
| return None |
| ls = find_ls(kernel) |
| |
| if ls is not None and len(ls) == len(param_names): |
| importance = 1.0 / ls |
| importance = importance / importance.sum() |
| else: |
| importance = np.ones(len(param_names)) / len(param_names) |
| except Exception: |
| importance = np.ones(len(param_names)) / len(param_names) |
| title = "Feature Importance (GP Inverse Lengthscale)" |
| |
| elif model_type in ["Random Forest", "Extra Trees", "Gradient Boosting"]: |
| importance = model.feature_importances_ |
| title = f"Feature Importance ({model_type})" |
| else: |
| importance = np.ones(len(param_names)) / len(param_names) |
| title = "Feature Importance" |
| |
| sorted_idx = np.argsort(importance) |
| ax.barh(range(len(param_names)), importance[sorted_idx], color='steelblue', edgecolor='navy') |
| ax.set_yticks(range(len(param_names))) |
| ax.set_yticklabels(np.array(param_names)[sorted_idx]) |
| ax.set_xlabel("Importance", fontsize=11) |
| ax.set_title(title, fontsize=12, fontweight='bold') |
| ax.grid(True, alpha=0.3, axis='x') |
| plt.tight_layout() |
| return fig |
|
|
|
|
| def plot_residuals(model, model_type, X_obs, y_obs): |
| """Residual analysis plots.""" |
| if len(X_obs) == 0: |
| fig, ax = plt.subplots() |
| return fig |
| |
| mu, _ = predict_with_uncertainty(model, X_obs, model_type) |
| residuals = y_obs - mu |
| |
| fig, axes = plt.subplots(1, 3, figsize=(15, 4)) |
| |
| axes[0].scatter(mu, residuals, c='steelblue', s=40, edgecolors='navy') |
| axes[0].axhline(y=0, color='red', linestyle='--', linewidth=1) |
| axes[0].set_xlabel("Predicted", fontsize=10) |
| axes[0].set_ylabel("Residual", fontsize=10) |
| axes[0].set_title("Residuals vs Predicted", fontsize=11, fontweight='bold') |
| axes[0].grid(True, alpha=0.3) |
| |
| axes[1].hist(residuals, bins=max(5, len(residuals) // 3), color='steelblue', edgecolor='navy', alpha=0.8) |
| axes[1].set_xlabel("Residual", fontsize=10) |
| axes[1].set_ylabel("Count", fontsize=10) |
| axes[1].set_title("Residual Distribution", fontsize=11, fontweight='bold') |
| axes[1].grid(True, alpha=0.3) |
| |
| from scipy.stats import probplot |
| probplot(residuals, dist="norm", plot=axes[2]) |
| axes[2].set_title("Normal Q-Q Plot", fontsize=11, fontweight='bold') |
| axes[2].grid(True, alpha=0.3) |
| |
| plt.tight_layout() |
| return fig |
|
|
|
|
| def plot_3d_surface(model, model_type, param_names, bounds, X_obs, y_obs, dim_x=0, dim_y=1): |
| """3D surface plot using Plotly.""" |
| if len(param_names) < 2: |
| fig = go.Figure() |
| fig.add_annotation(text="Need 2+ parameters", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) |
| return fig |
| |
| resolution = 40 |
| x1 = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution) |
| x2 = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution) |
| X1, X2 = np.meshgrid(x1, x2) |
| |
| d = len(param_names) |
| X_grid = np.zeros((resolution * resolution, d)) |
| X_grid[:, dim_x] = X1.ravel() |
| X_grid[:, dim_y] = X2.ravel() |
| for i in range(d): |
| if i != dim_x and i != dim_y: |
| X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2 |
| |
| mu, sigma = predict_with_uncertainty(model, X_grid, model_type) |
| |
| fig = go.Figure() |
| fig.add_trace(go.Surface( |
| x=x1, y=x2, z=mu.reshape(resolution, resolution), |
| colorscale='Viridis', name='GP Mean', opacity=0.85, |
| colorbar=dict(title='Predicted') |
| )) |
| |
| fig.add_trace(go.Scatter3d( |
| x=X_obs[:, dim_x], y=X_obs[:, dim_y], z=y_obs, |
| mode='markers', marker=dict(size=5, color='red', symbol='diamond'), |
| name='Observations' |
| )) |
| |
| fig.update_layout( |
| title="3D Response Surface", |
| scene=dict(xaxis_title=param_names[dim_x], yaxis_title=param_names[dim_y], zaxis_title="Response"), |
| height=550, margin=dict(l=0, r=0, t=40, b=0) |
| ) |
| return fig |
|
|
|
|
| def plot_uncertainty_heatmap(model, model_type, param_names, bounds, X_obs, dim_x=0, dim_y=1): |
| """Heatmap of prediction uncertainty.""" |
| if len(param_names) < 2: |
| fig = go.Figure() |
| fig.add_annotation(text="Need 2+ params", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) |
| return fig |
| |
| resolution = 50 |
| x1 = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution) |
| x2 = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution) |
| X1, X2 = np.meshgrid(x1, x2) |
| |
| d = len(param_names) |
| X_grid = np.zeros((resolution * resolution, d)) |
| X_grid[:, dim_x] = X1.ravel() |
| X_grid[:, dim_y] = X2.ravel() |
| for i in range(d): |
| if i != dim_x and i != dim_y: |
| X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2 |
| |
| _, sigma = predict_with_uncertainty(model, X_grid, model_type) |
| |
| fig = go.Figure(data=go.Heatmap( |
| x=x1, y=x2, z=sigma.reshape(resolution, resolution), |
| colorscale='YlOrRd', colorbar=dict(title='Uncertainty (Ο)') |
| )) |
| fig.add_trace(go.Scatter( |
| x=X_obs[:, dim_x], y=X_obs[:, dim_y], |
| mode='markers', marker=dict(size=8, color='blue', symbol='x', line=dict(width=1)), |
| name='Observations' |
| )) |
| fig.update_layout( |
| title="Prediction Uncertainty Map", |
| xaxis_title=param_names[dim_x], yaxis_title=param_names[dim_y], |
| height=450 |
| ) |
| return fig |
|
|
|
|
| def plot_correlation_matrix(X_obs, y_obs, param_names): |
| """Correlation matrix heatmap.""" |
| if len(X_obs) == 0 or len(X_obs) < 3: |
| fig = go.Figure() |
| fig.add_annotation(text="Need β₯3 data points", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) |
| return fig |
| |
| df = pd.DataFrame(X_obs, columns=param_names) |
| df['Response'] = y_obs |
| corr = df.corr() |
| |
| fig = go.Figure(data=go.Heatmap( |
| z=corr.values, x=corr.columns.tolist(), y=corr.index.tolist(), |
| colorscale='RdBu_r', zmin=-1, zmax=1, |
| text=np.round(corr.values, 2), texttemplate="%{text}", |
| colorbar=dict(title='Correlation') |
| )) |
| fig.update_layout(title="Correlation Matrix", height=450, xaxis_title="", yaxis_title="") |
| return fig |
|
|
|
|
| def plot_scatter_matrix(X_obs, y_obs, param_names): |
| """Scatter matrix (pair plot).""" |
| if len(X_obs) < 2: |
| fig = go.Figure() |
| fig.add_annotation(text="Need β₯2 data points", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) |
| return fig |
| |
| df = pd.DataFrame(X_obs, columns=param_names) |
| df['Response'] = y_obs |
| |
| fig = px.scatter_matrix(df, dimensions=param_names + ['Response'], |
| color='Response', color_continuous_scale='Viridis', |
| title="Scatter Matrix", height=max(400, 150 * len(param_names))) |
| fig.update_traces(diagonal_visible=True, marker=dict(size=4)) |
| fig.update_layout(margin=dict(l=40, r=40, t=60, b=40)) |
| return fig |
|
|
|
|
| |
| |
| |
|
|
| CUSTOM_CSS = """ |
| .main-header { text-align: center; margin-bottom: 5px; } |
| .main-header h1 { font-size: 1.8em; margin-bottom: 2px; } |
| .main-header p { color: #666; font-size: 0.95em; } |
| .info-box { background: #f0f7ff; border-left: 4px solid #2196F3; padding: 10px 14px; border-radius: 4px; margin: 5px 0; font-size: 0.9em; } |
| .warn-box { background: #fff8e1; border-left: 4px solid #ff9800; padding: 10px 14px; border-radius: 4px; margin: 5px 0; font-size: 0.9em; } |
| """ |
|
|
| def create_default_factor_table(): |
| return pd.DataFrame({ |
| "Factor Name": ["Temperature", "Pressure", "Time"], |
| "Levels (comma-sep or lo:hi:n)": ["100, 150, 200", "1, 2, 3", "30, 60"] |
| }) |
|
|
|
|
| def add_factor_row(df): |
| new_row = pd.DataFrame({"Factor Name": [f"Factor_{len(df)+1}"], "Levels (comma-sep or lo:hi:n)": ["0, 1"]}) |
| return pd.concat([df, new_row], ignore_index=True) |
|
|
|
|
| def remove_last_factor(df): |
| if len(df) > 1: |
| return df.iloc[:-1].reset_index(drop=True) |
| return df |
|
|
|
|
| def generate_design(factor_df, design_type, n_lhs_runs): |
| if design_type == "Full Factorial": |
| return generate_full_factorial(factor_df) |
| elif design_type == "Fractional Factorial (Β½)": |
| return generate_fractional_factorial(factor_df, 0.5) |
| elif design_type == "Fractional Factorial (ΒΌ)": |
| return generate_fractional_factorial(factor_df, 0.25) |
| elif design_type == "Latin Hypercube (LHS)": |
| return generate_lhs_design(factor_df, n_lhs_runs) |
| elif design_type == "Sobol Sequence": |
| return generate_sobol_design(factor_df, n_lhs_runs) |
| elif design_type == "Central Composite (CCD)": |
| return generate_central_composite(factor_df) |
| elif design_type == "Box-Behnken": |
| return generate_box_behnken(factor_df) |
| else: |
| return generate_full_factorial(factor_df) |
|
|
|
|
| def add_experiment_row(exp_df): |
| if exp_df is None or len(exp_df) == 0: |
| return exp_df |
| new_row = pd.DataFrame({col: [np.nan] for col in exp_df.columns}) |
| new_row["Run"] = len(exp_df) + 1 |
| return pd.concat([exp_df, new_row], ignore_index=True) |
|
|
|
|
| def add_column_to_experiments(exp_df, col_name): |
| if exp_df is None or not col_name.strip(): |
| return exp_df |
| exp_df[col_name.strip()] = np.nan |
| return exp_df |
|
|
|
|
| def compute_doe_stats(exp_df): |
| if exp_df is None or len(exp_df) == 0: |
| return "No experiments generated yet." |
| n_runs = len(exp_df) |
| n_params = len([c for c in exp_df.columns if c not in ["Run", "Response"]]) |
| n_filled = exp_df["Response"].notna().sum() if "Response" in exp_df.columns else 0 |
| return f"""### π Design Summary |
| | Metric | Value | |
| |--------|-------| |
| | Total Runs | {n_runs} | |
| | Parameters | {n_params} | |
| | Responses Filled | {n_filled} / {n_runs} | |
| | Completion | {n_filled/n_runs*100:.0f}% | |
| """ |
|
|
|
|
| def run_bo_pipeline(exp_df, model_type, kernel_name, acq_name, n_suggestions, |
| xi_val, kappa_val, maximize, n_estimators, alpha_val, n_restarts): |
| """Run the full BO pipeline.""" |
| |
| empty_fig = plt.figure(figsize=(6, 4)) |
| empty_plotly = go.Figure() |
| n_outputs = 16 |
| |
| def make_error(msg): |
| return (msg, pd.DataFrame(), msg, |
| empty_fig, empty_fig, empty_fig, empty_fig, empty_fig, |
| empty_fig, empty_fig, empty_plotly, empty_plotly, empty_plotly, empty_plotly, |
| empty_plotly, None) |
| |
| if exp_df is None or len(exp_df) == 0: |
| return make_error("β οΈ No experiment data. Generate a design first and fill in Response values.") |
| |
| param_cols = [c for c in exp_df.columns if c not in ["Run", "Response"]] |
| if "Response" not in exp_df.columns: |
| return make_error("β οΈ No 'Response' column found in experiment table.") |
| |
| valid_mask = exp_df["Response"].notna() |
| if valid_mask.sum() < 2: |
| return make_error("β οΈ Need at least 2 completed experiments (with Response values) to fit a model.") |
| |
| X_obs = exp_df.loc[valid_mask, param_cols].values.astype(float) |
| y_obs = exp_df.loc[valid_mask, "Response"].values.astype(float) |
| param_names = param_cols |
| n_dims = len(param_names) |
| |
| bounds = [] |
| for col in param_cols: |
| all_vals = exp_df[col].dropna().astype(float) |
| lo, hi = all_vals.min(), all_vals.max() |
| if lo == hi: |
| lo -= 1; hi += 1 |
| bounds.append((lo, hi)) |
| |
| try: |
| model = build_surrogate(model_type, kernel_name, n_dims, int(n_estimators), float(alpha_val), int(n_restarts)) |
| model.fit(X_obs, y_obs) |
| except Exception as e: |
| return make_error(f"β οΈ Model fitting failed: {str(e)}") |
| |
| mu_train, sigma_train = predict_with_uncertainty(model, X_obs, model_type) |
| ss_res = np.sum((y_obs - mu_train) ** 2) |
| ss_tot = np.sum((y_obs - y_obs.mean()) ** 2) |
| r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 |
| rmse = np.sqrt(np.mean((y_obs - mu_train) ** 2)) |
| |
| y_best = y_obs.max() if maximize else y_obs.min() |
| best_idx = np.argmax(y_obs) if maximize else np.argmin(y_obs) |
| |
| fit_report = f"""### π€ Model Fit Report |
| | Metric | Value | |
| |--------|-------| |
| | Model | {model_type} ({kernel_name if model_type == 'Gaussian Process' else ''}) | |
| | Training Points | {len(X_obs)} | |
| | RΒ² Score | {r2:.4f} | |
| | RMSE | {rmse:.4f} | |
| | Best Observed | {y_best:.4f} (Run #{int(exp_df.loc[valid_mask].iloc[best_idx]['Run']) if 'Run' in exp_df.columns else best_idx+1}) | |
| | Objective | {'Maximize β' if maximize else 'Minimize β'} | |
| """ |
| if model_type == "Gaussian Process": |
| try: |
| fit_report += f"| Kernel (fitted) | `{model.kernel_}` |\n" |
| fit_report += f"| Log Marginal Likelihood | {model.log_marginal_likelihood_value_:.2f} |\n" |
| except: |
| pass |
| |
| try: |
| X_sug, acq_sug, mu_sug, sigma_sug = suggest_next_experiments( |
| model, model_type, acq_name, bounds, int(n_suggestions), |
| y_best, float(xi_val), float(kappa_val), maximize |
| ) |
| sug_df = pd.DataFrame(X_sug, columns=param_names) |
| sug_df.insert(0, "Suggestion #", range(1, len(sug_df) + 1)) |
| sug_df["Predicted Mean"] = mu_sug.round(4) |
| sug_df["Predicted Ο"] = sigma_sug.round(4) |
| sug_df["Acquisition Value"] = acq_sug.round(6) |
| for col in param_names: |
| sug_df[col] = sug_df[col].round(4) |
| except Exception as e: |
| sug_df = pd.DataFrame({"Error": [str(e)]}) |
| |
| |
| def safe_plot(fn, *args, plotly=False, **kwargs): |
| try: |
| return fn(*args, **kwargs) |
| except Exception: |
| return go.Figure() if plotly else plt.figure() |
| |
| fig_surface = safe_plot(plot_gp_surface_2d, model, model_type, param_names, bounds, X_obs, y_obs) |
| fig_1d = safe_plot(plot_gp_1d_slices, model, model_type, param_names, bounds, X_obs, y_obs) |
| fig_acq = safe_plot(plot_acquisition_landscape, model, model_type, acq_name, param_names, bounds, |
| X_obs, y_obs, y_best, float(xi_val), float(kappa_val), maximize) |
| fig_conv = safe_plot(plot_convergence, y_obs, maximize) |
| fig_pva = safe_plot(plot_predicted_vs_actual, model, model_type, X_obs, y_obs) |
| fig_imp = safe_plot(plot_feature_importance, model, model_type, param_names) |
| fig_resid = safe_plot(plot_residuals, model, model_type, X_obs, y_obs) |
| fig_3d = safe_plot(plot_3d_surface, model, model_type, param_names, bounds, X_obs, y_obs, plotly=True) |
| fig_parallel = safe_plot(plot_parallel_coordinates, X_obs, y_obs, param_names, plotly=True) |
| fig_unc = safe_plot(plot_uncertainty_heatmap, model, model_type, param_names, bounds, X_obs, plotly=True) |
| fig_corr = safe_plot(plot_correlation_matrix, X_obs, y_obs, param_names, plotly=True) |
| fig_scatter = safe_plot(plot_scatter_matrix, X_obs, y_obs, param_names, plotly=True) |
| |
| model_state_val = { |
| "model": model, "model_type": model_type, "param_names": param_names, |
| "bounds": bounds, "X_obs": X_obs, "y_obs": y_obs, "y_best": y_best, |
| } |
| |
| return (fit_report, sug_df, compute_doe_stats(exp_df), |
| fig_surface, fig_1d, fig_acq, fig_conv, fig_pva, |
| fig_imp, fig_resid, fig_3d, fig_parallel, fig_unc, fig_corr, |
| fig_scatter, model_state_val) |
|
|
|
|
| def merge_suggestions(exp_df, sug_df): |
| if sug_df is None or len(sug_df) == 0 or exp_df is None: |
| return exp_df |
| if "Error" in sug_df.columns: |
| return exp_df |
| |
| param_cols = [c for c in exp_df.columns if c not in ["Run", "Response"]] |
| sug_params = [c for c in sug_df.columns if c in param_cols] |
| if not sug_params: |
| return exp_df |
| |
| new_rows = [] |
| start_run = int(exp_df["Run"].max()) + 1 if "Run" in exp_df.columns else len(exp_df) + 1 |
| for i, (_, row) in enumerate(sug_df.iterrows()): |
| new_row = {"Run": start_run + i} |
| for col in param_cols: |
| new_row[col] = row[col] if col in sug_df.columns else np.nan |
| new_row["Response"] = np.nan |
| for col in exp_df.columns: |
| if col not in new_row: |
| new_row[col] = np.nan |
| new_rows.append(new_row) |
| |
| return pd.concat([exp_df, pd.DataFrame(new_rows)], ignore_index=True) |
|
|
|
|
| |
| |
| |
|
|
| with gr.Blocks(title="DoE & Bayesian Optimization Dashboard", css=CUSTOM_CSS) as demo: |
| |
| model_state = gr.State(value=None) |
| |
| gr.HTML(""" |
| <div class="main-header"> |
| <h1>π¬ Design of Experiments & Bayesian Optimization</h1> |
| <p>Plan experiments, fit surrogate models, and get AI-guided suggestions for your next experiments</p> |
| </div> |
| """) |
| |
| with gr.Tabs(): |
| |
| |
| with gr.Tab("π Design of Experiments", id="doe_tab"): |
| gr.HTML('<div class="info-box">π‘ <b>Define your factors and levels below.</b> Levels can be comma-separated (e.g. <code>10, 20, 30</code>) or range notation (<code>10:30:3</code> = 3 points from 10 to 30).</div>') |
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| gr.Markdown("### βοΈ Factor Definition") |
| factor_table = gr.Dataframe( |
| value=create_default_factor_table(), |
| headers=["Factor Name", "Levels (comma-sep or lo:hi:n)"], |
| interactive=True, label="Define Factors & Levels", |
| ) |
| with gr.Row(): |
| add_factor_btn = gr.Button("β Add Factor", size="sm") |
| remove_factor_btn = gr.Button("β Remove Last", size="sm") |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### π― Design Type") |
| design_type = gr.Dropdown( |
| choices=["Full Factorial", "Fractional Factorial (Β½)", "Fractional Factorial (ΒΌ)", |
| "Latin Hypercube (LHS)", "Sobol Sequence", "Central Composite (CCD)", "Box-Behnken"], |
| value="Full Factorial", label="Design Method", |
| ) |
| n_space_filling = gr.Slider(minimum=4, maximum=200, value=20, step=1, |
| label="Number of Runs (for LHS/Sobol)", visible=False) |
| generate_btn = gr.Button("π Generate Experiment Design", variant="primary", size="lg") |
| doe_stats = gr.Markdown("") |
| |
| gr.Markdown("---") |
| gr.Markdown("### π Experiment Table") |
| gr.HTML('<div class="info-box">π Edit cells directly. Fill in the <b>Response</b> column with your experimental results. You can add rows or columns as needed.</div>') |
| |
| experiment_table = gr.Dataframe(value=pd.DataFrame(), interactive=True, |
| label="Experiments (edit Response values here)") |
| |
| with gr.Row(): |
| add_row_btn = gr.Button("β Add Row", size="sm") |
| new_col_name = gr.Textbox(label="New Column Name", placeholder="e.g. Yield", scale=2) |
| add_col_btn = gr.Button("β Add Column", size="sm") |
| |
| with gr.Row(): |
| download_btn = gr.Button("π₯ Export as CSV", size="sm") |
| csv_output = gr.File(label="Download", visible=False) |
| |
| def show_n_runs(dt): |
| return gr.Slider(visible=dt in ["Latin Hypercube (LHS)", "Sobol Sequence"]) |
| |
| design_type.change(show_n_runs, inputs=design_type, outputs=n_space_filling) |
| add_factor_btn.click(add_factor_row, inputs=factor_table, outputs=factor_table) |
| remove_factor_btn.click(remove_last_factor, inputs=factor_table, outputs=factor_table) |
| |
| def gen_and_stats(ft, dt, n): |
| df = generate_design(ft, dt, n) |
| return df, compute_doe_stats(df) |
| |
| generate_btn.click(gen_and_stats, inputs=[factor_table, design_type, n_space_filling], |
| outputs=[experiment_table, doe_stats]) |
| add_row_btn.click(add_experiment_row, inputs=experiment_table, outputs=experiment_table) |
| add_col_btn.click(add_column_to_experiments, inputs=[experiment_table, new_col_name], outputs=experiment_table) |
| |
| def export_csv(df): |
| if df is None or len(df) == 0: |
| return gr.File(visible=False) |
| path = "/tmp/experiment_design.csv" |
| df.to_csv(path, index=False) |
| return gr.File(value=path, visible=True) |
| |
| download_btn.click(export_csv, inputs=experiment_table, outputs=csv_output) |
| |
| |
| with gr.Tab("π€ Bayesian Optimization", id="bo_tab"): |
| gr.HTML('<div class="info-box">π§ <b>Configure your surrogate model and acquisition function</b>, then click "Run" to fit the model and get suggestions for your next experiments.</div>') |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### π§ Surrogate Model") |
| model_type = gr.Dropdown( |
| choices=["Gaussian Process", "Random Forest", "Extra Trees", "Gradient Boosting"], |
| value="Gaussian Process", label="Model Type", |
| ) |
| |
| with gr.Group() as gp_options: |
| kernel_type = gr.Dropdown( |
| choices=["MatΓ©rn 5/2", "MatΓ©rn 3/2", "RBF", "Rational Quadratic", "RBF + MatΓ©rn"], |
| value="MatΓ©rn 5/2", label="GP Kernel", |
| ) |
| alpha_val = gr.Number(value=1e-6, label="Alpha (noise regularization)", precision=8) |
| n_restarts = gr.Slider(1, 20, value=5, step=1, label="Optimizer Restarts") |
| |
| with gr.Group(visible=False) as tree_options: |
| n_estimators = gr.Slider(10, 500, value=100, step=10, label="Number of Trees") |
| |
| gr.Markdown("---") |
| gr.Markdown("### π― Acquisition Function") |
| acq_function = gr.Dropdown( |
| choices=["Expected Improvement (EI)", "Probability of Improvement (PI)", |
| "Upper Confidence Bound (UCB)", "Thompson Sampling"], |
| value="Expected Improvement (EI)", label="Acquisition Function", |
| ) |
| |
| with gr.Group() as ei_pi_options: |
| xi_param = gr.Slider(0.0, 1.0, value=0.01, step=0.001, label="ΞΎ (exploration-exploitation)") |
| with gr.Group(visible=False) as ucb_options: |
| kappa_param = gr.Slider(0.1, 10.0, value=2.0, step=0.1, label="ΞΊ (exploration weight)") |
| |
| gr.Markdown("---") |
| gr.Markdown("### π Optimization Settings") |
| maximize_toggle = gr.Checkbox(value=True, label="Maximize (uncheck to minimize)") |
| n_suggestions = gr.Slider(1, 20, value=5, step=1, label="Number of Suggestions") |
| |
| run_bo_btn = gr.Button("π Run Optimization", variant="primary", size="lg") |
| |
| with gr.Column(scale=2): |
| gr.Markdown("### π Model Fit Report") |
| fit_report_md = gr.Markdown("*Run the optimizer to see results...*") |
| |
| gr.Markdown("### π‘ Suggested Next Experiments") |
| suggestion_table = gr.Dataframe(label="Next Experiments to Run", interactive=False) |
| |
| merge_btn = gr.Button("π Add Suggestions to Experiment Table", variant="secondary") |
| merge_status = gr.Markdown("") |
| doe_stats_bo = gr.Markdown("") |
| |
| def toggle_model_options(mt): |
| return gr.Group(visible=mt == "Gaussian Process"), gr.Group(visible=mt != "Gaussian Process") |
| |
| def toggle_acq_options(acq): |
| is_ucb = acq == "Upper Confidence Bound (UCB)" |
| is_ts = acq == "Thompson Sampling" |
| return gr.Group(visible=not is_ucb and not is_ts), gr.Group(visible=is_ucb) |
| |
| model_type.change(toggle_model_options, inputs=model_type, outputs=[gp_options, tree_options]) |
| acq_function.change(toggle_acq_options, inputs=acq_function, outputs=[ei_pi_options, ucb_options]) |
| |
| |
| with gr.Tab("π Visualizations", id="viz_tab"): |
| gr.HTML('<div class="info-box">π <b>Toggle visualizations on/off.</b> All plots update when you run the optimizer. Use the checkboxes to show/hide each visualization.</div>') |
| |
| gr.Markdown("### ποΈ Visualization Controls") |
| with gr.Row(): |
| show_surface = gr.Checkbox(value=True, label="πΊοΈ Surrogate Surface (2D)") |
| show_1d = gr.Checkbox(value=True, label="π 1D Parameter Slices") |
| show_3d = gr.Checkbox(value=True, label="ποΈ 3D Response Surface") |
| show_acq = gr.Checkbox(value=True, label="π― Acquisition Landscape") |
| with gr.Row(): |
| show_convergence = gr.Checkbox(value=True, label="π Convergence Plot") |
| show_pva = gr.Checkbox(value=True, label="βοΈ Predicted vs Actual") |
| show_importance = gr.Checkbox(value=True, label="π Feature Importance") |
| show_residuals = gr.Checkbox(value=True, label="π Residual Analysis") |
| with gr.Row(): |
| show_parallel = gr.Checkbox(value=True, label="π Parallel Coordinates") |
| show_uncertainty = gr.Checkbox(value=True, label="π‘οΈ Uncertainty Heatmap") |
| show_correlation = gr.Checkbox(value=True, label="π Correlation Matrix") |
| show_scatter_mat = gr.Checkbox(value=False, label="π Scatter Matrix") |
| |
| gr.Markdown("---") |
| |
| with gr.Column(visible=True) as surface_col: |
| gr.Markdown("### πΊοΈ Surrogate Model Surface (2D Contour)") |
| surface_plot = gr.Plot(label="Mean & Uncertainty Surface") |
| |
| with gr.Column(visible=True) as slices_col: |
| gr.Markdown("### π 1D Parameter Effect Slices") |
| slices_plot = gr.Plot(label="1D GP Slices") |
| |
| with gr.Column(visible=True) as three_d_col: |
| gr.Markdown("### ποΈ 3D Response Surface (Interactive)") |
| three_d_plot = gr.Plot(label="3D Surface") |
| |
| with gr.Column(visible=True) as acq_col: |
| gr.Markdown("### π― Acquisition Function Landscape") |
| acq_plot = gr.Plot(label="Acquisition Function") |
| |
| with gr.Column(visible=True) as conv_col: |
| gr.Markdown("### π Optimization Convergence") |
| convergence_plot = gr.Plot(label="Convergence") |
| |
| with gr.Column(visible=True) as pva_col: |
| gr.Markdown("### βοΈ Predicted vs Actual (Parity Plot)") |
| pva_plot = gr.Plot(label="Predicted vs Actual") |
| |
| with gr.Column(visible=True) as imp_col: |
| gr.Markdown("### π Feature Importance") |
| importance_plot = gr.Plot(label="Feature Importance") |
| |
| with gr.Column(visible=True) as resid_col: |
| gr.Markdown("### π Residual Analysis") |
| residuals_plot = gr.Plot(label="Residuals") |
| |
| with gr.Column(visible=True) as parallel_col: |
| gr.Markdown("### π Parallel Coordinates") |
| parallel_plot = gr.Plot(label="Parallel Coordinates") |
| |
| with gr.Column(visible=True) as unc_col: |
| gr.Markdown("### π‘οΈ Prediction Uncertainty Map") |
| unc_plot = gr.Plot(label="Uncertainty Heatmap") |
| |
| with gr.Column(visible=True) as corr_col: |
| gr.Markdown("### π Correlation Matrix") |
| corr_plot = gr.Plot(label="Correlation Matrix") |
| |
| with gr.Column(visible=False) as scatter_mat_col: |
| gr.Markdown("### π Scatter Matrix (Pair Plot)") |
| scatter_mat_plot = gr.Plot(label="Scatter Matrix") |
| |
| show_surface.change(lambda v: gr.Column(visible=v), inputs=show_surface, outputs=surface_col) |
| show_1d.change(lambda v: gr.Column(visible=v), inputs=show_1d, outputs=slices_col) |
| show_3d.change(lambda v: gr.Column(visible=v), inputs=show_3d, outputs=three_d_col) |
| show_acq.change(lambda v: gr.Column(visible=v), inputs=show_acq, outputs=acq_col) |
| show_convergence.change(lambda v: gr.Column(visible=v), inputs=show_convergence, outputs=conv_col) |
| show_pva.change(lambda v: gr.Column(visible=v), inputs=show_pva, outputs=pva_col) |
| show_importance.change(lambda v: gr.Column(visible=v), inputs=show_importance, outputs=imp_col) |
| show_residuals.change(lambda v: gr.Column(visible=v), inputs=show_residuals, outputs=resid_col) |
| show_parallel.change(lambda v: gr.Column(visible=v), inputs=show_parallel, outputs=parallel_col) |
| show_uncertainty.change(lambda v: gr.Column(visible=v), inputs=show_uncertainty, outputs=unc_col) |
| show_correlation.change(lambda v: gr.Column(visible=v), inputs=show_correlation, outputs=corr_col) |
| show_scatter_mat.change(lambda v: gr.Column(visible=v), inputs=show_scatter_mat, outputs=scatter_mat_col) |
| |
| |
| with gr.Tab("π Guide", id="guide_tab"): |
| gr.Markdown(""" |
| ## π User Guide |
| |
| ### π Workflow |
| |
| 1. **Define Factors** β In the DoE tab, set up your experimental parameters and their levels |
| 2. **Generate Design** β Choose a design type and generate the experiment matrix |
| 3. **Run Experiments** β Perform your physical/simulated experiments and fill in the Response column |
| 4. **Fit Surrogate** β Go to the BO tab, configure your model, and click Run |
| 5. **Get Suggestions** β The optimizer will suggest the most promising next experiments |
| 6. **Iterate** β Add suggestions to your table, run those experiments, and repeat |
| |
| ### π Design Types |
| |
| | Design | Best For | Notes | |
| |--------|----------|-------| |
| | **Full Factorial** | Few factors, few levels | Tests every combination. Grows exponentially! | |
| | **Fractional Factorial** | Many factors, screening | Random subset of full factorial | |
| | **Latin Hypercube (LHS)** | Space-filling, BO initialization | Even coverage of parameter space | |
| | **Sobol Sequence** | High-dimensional, quasi-random | Low-discrepancy, n should be power of 2 | |
| | **Central Composite (CCD)** | Response surface methodology | Factorial + axial + center points | |
| | **Box-Behnken** | 3+ factors, fewer runs than CCD | No extreme corner points | |
| |
| ### π€ Surrogate Models |
| |
| | Model | Strengths | Weaknesses | |
| |-------|-----------|------------| |
| | **Gaussian Process** | Principled uncertainty, smooth | Scales O(nΒ³), max ~500 points | |
| | **Random Forest** | Handles categoricals, robust | Step-wise predictions, no gradients | |
| | **Extra Trees** | Faster than RF, good uncertainty | Similar to RF | |
| | **Gradient Boosting** | Strong predictive power | Weaker uncertainty estimates | |
| |
| ### π― Acquisition Functions |
| |
| | Function | Behavior | Parameters | |
| |----------|----------|------------| |
| | **Expected Improvement (EI)** | Balanced exploration/exploitation | ΞΎ: higher = more exploration | |
| | **Probability of Improvement (PI)** | Greedy, exploits known good regions | ΞΎ: trade-off parameter | |
| | **Upper Confidence Bound (UCB)** | Direct exploration control | ΞΊ: higher = more exploration | |
| | **Thompson Sampling** | Stochastic, good for batch | None (randomized) | |
| |
| ### π GP Kernels |
| |
| | Kernel | Best For | |
| |--------|----------| |
| | **MatΓ©rn 5/2** | Most physical/chemical processes (default) | |
| | **MatΓ©rn 3/2** | Rougher functions | |
| | **RBF** | Very smooth functions | |
| | **Rational Quadratic** | Multi-scale patterns | |
| | **RBF + MatΓ©rn** | Complex functions with mixed smoothness | |
| |
| ### π Visualization Guide |
| |
| - **Surrogate Surface**: 2D contour of model predictions + uncertainty |
| - **1D Slices**: Effect of each parameter while others fixed at center |
| - **3D Surface**: Interactive 3D view of response surface (Plotly) |
| - **Acquisition Landscape**: Where the optimizer "wants" to sample next |
| - **Convergence**: Best-so-far trajectory over experiments |
| - **Predicted vs Actual**: Model accuracy check (parity plot) |
| - **Feature Importance**: Which parameters matter most |
| - **Residuals**: Diagnostic plots for model assumptions |
| - **Parallel Coordinates**: Multi-dimensional experiment overview |
| - **Uncertainty Heatmap**: Where the model is least certain |
| - **Correlation Matrix**: Parameter-response correlations |
| - **Scatter Matrix**: All pairwise relationships |
| """) |
| |
| |
| run_bo_btn.click( |
| fn=run_bo_pipeline, |
| inputs=[experiment_table, model_type, kernel_type, acq_function, |
| n_suggestions, xi_param, kappa_param, maximize_toggle, |
| n_estimators, alpha_val, n_restarts], |
| outputs=[fit_report_md, suggestion_table, doe_stats_bo, |
| surface_plot, slices_plot, acq_plot, convergence_plot, pva_plot, |
| importance_plot, residuals_plot, three_d_plot, parallel_plot, |
| unc_plot, corr_plot, scatter_mat_plot, model_state], |
| ) |
| |
| def do_merge(exp_df, sug_df): |
| merged = merge_suggestions(exp_df, sug_df) |
| return merged, "β
Suggestions added to experiment table! Switch to DoE tab to see them." |
| |
| merge_btn.click(do_merge, inputs=[experiment_table, suggestion_table], |
| outputs=[experiment_table, merge_status]) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|