Yerrapragada-Kaushik's picture
Add comprehensive DoE & Bayesian Optimization dashboard
9c39e28 verified
"""
DoE & Bayesian Optimization Dashboard
A comprehensive tool for experimentalists to design experiments,
fit surrogate models, and get AI-guided suggestions for next experiments.
"""
import gradio as gr
import numpy as np
import pandas as pd
import itertools
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.stats import norm
from scipy.stats.qmc import LatinHypercube, Sobol, scale as qmc_scale
from scipy.optimize import minimize as sp_minimize
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import (
Matern, RBF, ConstantKernel as C, WhiteKernel, RationalQuadratic, DotProduct
)
from sklearn.ensemble import RandomForestRegressor, ExtraTreesRegressor, GradientBoostingRegressor
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings("ignore")
# ═══════════════════════════════════════════════════════════════════
# HELPER: Design of Experiments generators
# ═══════════════════════════════════════════════════════════════════
def parse_levels(level_str):
"""Parse a level string like '10, 20, 30' or '10:30:3' into a list of floats."""
level_str = str(level_str).strip()
if not level_str:
return []
if ":" in level_str:
parts = level_str.split(":")
if len(parts) == 3:
lo, hi, n = float(parts[0]), float(parts[1]), int(parts[2])
return list(np.linspace(lo, hi, n))
elif len(parts) == 2:
lo, hi = float(parts[0]), float(parts[1])
return [lo, hi]
else:
return [float(x.strip()) for x in level_str.split(",") if x.strip()]
def generate_full_factorial(factor_df):
"""Generate full factorial design from a factor definition table."""
if factor_df is None or len(factor_df) == 0:
return pd.DataFrame({"Error": ["No factors defined"]})
factors = {}
for _, row in factor_df.iterrows():
name = str(row.iloc[0]).strip()
levels = parse_levels(row.iloc[1])
if name and levels:
factors[name] = levels
if not factors:
return pd.DataFrame({"Error": ["No valid factors found. Use format: '10,20,30' or '10:30:3'"]})
names = list(factors.keys())
level_lists = list(factors.values())
combos = list(itertools.product(*level_lists))
df = pd.DataFrame(combos, columns=names)
df.insert(0, "Run", range(1, len(df) + 1))
df["Response"] = np.nan
return df
def generate_fractional_factorial(factor_df, fraction=0.5):
"""Generate fractional factorial (random subset)."""
full = generate_full_factorial(factor_df)
if "Error" in full.columns:
return full
n_keep = max(2, int(len(full) * fraction))
sampled = full.sample(n=n_keep, random_state=42).reset_index(drop=True)
sampled["Run"] = range(1, len(sampled) + 1)
return sampled
def generate_lhs_design(factor_df, n_runs=20):
"""Generate Latin Hypercube Sampling design."""
if factor_df is None or len(factor_df) == 0:
return pd.DataFrame({"Error": ["No factors defined"]})
bounds = []
names = []
for _, row in factor_df.iterrows():
name = str(row.iloc[0]).strip()
levels = parse_levels(row.iloc[1])
if name and len(levels) >= 2:
names.append(name)
bounds.append((min(levels), max(levels)))
if not names:
return pd.DataFrame({"Error": ["Need at least 2 levels per factor for LHS"]})
d = len(names)
n_runs = int(n_runs)
sampler = LatinHypercube(d=d, seed=42)
X = sampler.random(n=n_runs)
lb = np.array([b[0] for b in bounds])
ub = np.array([b[1] for b in bounds])
X_scaled = qmc_scale(X, lb, ub)
df = pd.DataFrame(X_scaled, columns=names)
df.insert(0, "Run", range(1, n_runs + 1))
df["Response"] = np.nan
for col in names:
df[col] = df[col].round(4)
return df
def generate_sobol_design(factor_df, n_runs=16):
"""Generate Sobol sequence design."""
if factor_df is None or len(factor_df) == 0:
return pd.DataFrame({"Error": ["No factors defined"]})
bounds = []
names = []
for _, row in factor_df.iterrows():
name = str(row.iloc[0]).strip()
levels = parse_levels(row.iloc[1])
if name and len(levels) >= 2:
names.append(name)
bounds.append((min(levels), max(levels)))
if not names:
return pd.DataFrame({"Error": ["Need at least 2 levels per factor"]})
d = len(names)
n_runs = int(n_runs)
n_actual = 2 ** int(np.ceil(np.log2(max(n_runs, 2))))
sampler = Sobol(d=d, scramble=True, seed=42)
X = sampler.random(n=n_actual)
lb = np.array([b[0] for b in bounds])
ub = np.array([b[1] for b in bounds])
X_scaled = qmc_scale(X, lb, ub)
df = pd.DataFrame(X_scaled[:n_runs], columns=names)
df.insert(0, "Run", range(1, n_runs + 1))
df["Response"] = np.nan
for col in names:
df[col] = df[col].round(4)
return df
def generate_central_composite(factor_df):
"""Generate Central Composite Design (CCD)."""
if factor_df is None or len(factor_df) == 0:
return pd.DataFrame({"Error": ["No factors defined"]})
bounds = []
names = []
for _, row in factor_df.iterrows():
name = str(row.iloc[0]).strip()
levels = parse_levels(row.iloc[1])
if name and len(levels) >= 2:
names.append(name)
bounds.append((min(levels), max(levels)))
if not names:
return pd.DataFrame({"Error": ["Need at least 2 levels per factor"]})
k = len(names)
lb = np.array([b[0] for b in bounds])
ub = np.array([b[1] for b in bounds])
center = (lb + ub) / 2
half_range = (ub - lb) / 2
factorial_coded = np.array(list(itertools.product([-1, 1], repeat=k)))
alpha = np.sqrt(k)
axial_coded = np.zeros((2 * k, k))
for i in range(k):
axial_coded[2 * i, i] = -alpha
axial_coded[2 * i + 1, i] = alpha
center_coded = np.zeros((3, k))
all_coded = np.vstack([factorial_coded, axial_coded, center_coded])
all_actual = center + all_coded * half_range
all_actual = np.clip(all_actual, lb, ub)
df = pd.DataFrame(all_actual, columns=names)
df.insert(0, "Run", range(1, len(df) + 1))
df["Response"] = np.nan
for col in names:
df[col] = df[col].round(4)
return df
def generate_box_behnken(factor_df):
"""Generate Box-Behnken design (for 3+ factors)."""
if factor_df is None or len(factor_df) == 0:
return pd.DataFrame({"Error": ["No factors defined"]})
bounds = []
names = []
for _, row in factor_df.iterrows():
name = str(row.iloc[0]).strip()
levels = parse_levels(row.iloc[1])
if name and len(levels) >= 2:
names.append(name)
bounds.append((min(levels), max(levels)))
if len(names) < 3:
return pd.DataFrame({"Error": ["Box-Behnken requires at least 3 factors"]})
k = len(names)
lb = np.array([b[0] for b in bounds])
ub = np.array([b[1] for b in bounds])
center = (lb + ub) / 2
half_range = (ub - lb) / 2
runs_coded = []
for i in range(k):
for j in range(i + 1, k):
for vi in [-1, 1]:
for vj in [-1, 1]:
row = [0] * k
row[i] = vi
row[j] = vj
runs_coded.append(row)
for _ in range(3):
runs_coded.append([0] * k)
all_coded = np.array(runs_coded)
all_actual = center + all_coded * half_range
df = pd.DataFrame(all_actual, columns=names)
df.insert(0, "Run", range(1, len(df) + 1))
df["Response"] = np.nan
for col in names:
df[col] = df[col].round(4)
return df
# ═══════════════════════════════════════════════════════════════════
# HELPER: Surrogate Models & Acquisition Functions
# ═══════════════════════════════════════════════════════════════════
KERNEL_MAP = {
"MatΓ©rn 5/2": lambda d: C(1.0, (1e-3, 1e3)) * Matern(length_scale=np.ones(d), nu=2.5) + WhiteKernel(noise_level=1e-3),
"MatΓ©rn 3/2": lambda d: C(1.0, (1e-3, 1e3)) * Matern(length_scale=np.ones(d), nu=1.5) + WhiteKernel(noise_level=1e-3),
"RBF": lambda d: C(1.0, (1e-3, 1e3)) * RBF(length_scale=np.ones(d)) + WhiteKernel(noise_level=1e-3),
"Rational Quadratic": lambda d: C(1.0, (1e-3, 1e3)) * RationalQuadratic() + WhiteKernel(noise_level=1e-3),
"RBF + MatΓ©rn": lambda d: C(1.0) * RBF(length_scale=np.ones(d)) + C(0.5) * Matern(length_scale=np.ones(d), nu=2.5) + WhiteKernel(noise_level=1e-3),
}
def build_surrogate(model_type, kernel_name, n_dims, n_estimators=100, alpha=1e-6, n_restarts=5):
"""Build a surrogate model based on type selection."""
if model_type == "Gaussian Process":
kernel_fn = KERNEL_MAP.get(kernel_name, KERNEL_MAP["MatΓ©rn 5/2"])
kernel = kernel_fn(n_dims)
model = GaussianProcessRegressor(
kernel=kernel,
alpha=alpha,
n_restarts_optimizer=n_restarts,
normalize_y=True
)
return model
elif model_type == "Random Forest":
return RandomForestRegressor(n_estimators=n_estimators, min_samples_leaf=2, random_state=42)
elif model_type == "Extra Trees":
return ExtraTreesRegressor(n_estimators=n_estimators, min_samples_leaf=2, random_state=42)
elif model_type == "Gradient Boosting":
return GradientBoostingRegressor(n_estimators=n_estimators, max_depth=4, random_state=42)
else:
return GaussianProcessRegressor(normalize_y=True)
def predict_with_uncertainty(model, X, model_type="Gaussian Process"):
"""Predict mean and std for any surrogate model type."""
X = np.atleast_2d(X)
if model_type == "Gaussian Process":
mu, sigma = model.predict(X, return_std=True)
return mu, sigma
elif model_type in ["Random Forest", "Extra Trees"]:
preds = np.array([tree.predict(X) for tree in model.estimators_])
mu = preds.mean(axis=0)
sigma = preds.std(axis=0)
return mu, sigma
elif model_type == "Gradient Boosting":
mu = model.predict(X)
staged = np.array(list(model.staged_predict(X)))
sigma = staged.std(axis=0)
return mu, sigma
else:
mu = model.predict(X)
return mu, np.zeros_like(mu)
def expected_improvement(X, model, y_best, xi=0.01, model_type="Gaussian Process", maximize=True):
mu, sigma = predict_with_uncertainty(model, X, model_type)
sigma = np.maximum(sigma, 1e-9)
if maximize:
Z = (mu - y_best - xi) / sigma
else:
Z = (y_best - mu - xi) / sigma
ei = sigma * (Z * norm.cdf(Z) + norm.pdf(Z))
ei[sigma < 1e-10] = 0.0
return ei
def probability_of_improvement(X, model, y_best, xi=0.01, model_type="Gaussian Process", maximize=True):
mu, sigma = predict_with_uncertainty(model, X, model_type)
sigma = np.maximum(sigma, 1e-9)
if maximize:
Z = (mu - y_best - xi) / sigma
else:
Z = (y_best - mu - xi) / sigma
return norm.cdf(Z)
def upper_confidence_bound(X, model, kappa=2.0, model_type="Gaussian Process", maximize=True):
mu, sigma = predict_with_uncertainty(model, X, model_type)
if maximize:
return mu + kappa * sigma
else:
return -(mu - kappa * sigma)
def thompson_sampling(X, model, model_type="Gaussian Process", maximize=True):
mu, sigma = predict_with_uncertainty(model, X, model_type)
samples = np.random.normal(mu, np.maximum(sigma, 1e-9))
if not maximize:
samples = -samples
return samples
ACQ_MAP = {
"Expected Improvement (EI)": expected_improvement,
"Probability of Improvement (PI)": probability_of_improvement,
"Upper Confidence Bound (UCB)": upper_confidence_bound,
"Thompson Sampling": thompson_sampling,
}
def suggest_next_experiments(model, model_type, acq_name, bounds, n_suggestions,
y_best, xi=0.01, kappa=2.0, maximize=True, n_candidates=5000):
"""Suggest next experiments using acquisition function optimization."""
d = len(bounds)
lb = np.array([b[0] for b in bounds])
ub = np.array([b[1] for b in bounds])
sampler = LatinHypercube(d=d, seed=np.random.randint(0, 10000))
X_candidates = qmc_scale(sampler.random(n=n_candidates), lb, ub)
acq_fn = ACQ_MAP.get(acq_name, expected_improvement)
if acq_name == "Upper Confidence Bound (UCB)":
acq_vals = acq_fn(X_candidates, model, kappa=kappa, model_type=model_type, maximize=maximize)
elif acq_name == "Thompson Sampling":
acq_vals = acq_fn(X_candidates, model, model_type=model_type, maximize=maximize)
else:
acq_vals = acq_fn(X_candidates, model, y_best, xi=xi, model_type=model_type, maximize=maximize)
top_indices = np.argsort(acq_vals)[::-1]
selected = []
selected_points = []
for idx in top_indices:
if len(selected) >= n_suggestions:
break
pt = X_candidates[idx]
if selected_points:
dists = [np.linalg.norm(pt - sp) for sp in selected_points]
min_dist = min(dists)
range_diag = np.linalg.norm(ub - lb)
if min_dist < 0.02 * range_diag:
continue
selected.append(idx)
selected_points.append(pt)
X_suggested = X_candidates[selected]
acq_at_suggested = acq_vals[selected]
mu_sug, sigma_sug = predict_with_uncertainty(model, X_suggested, model_type)
return X_suggested, acq_at_suggested, mu_sug, sigma_sug
# ═══════════════════════════════════════════════════════════════════
# VISUALIZATION FUNCTIONS
# ═══════════════════════════════════════════════════════════════════
def plot_gp_surface_2d(model, model_type, param_names, bounds, X_obs, y_obs,
resolution=50, dim_x=0, dim_y=1, fixed_values=None):
"""2D contour plot of surrogate model prediction surface."""
if len(param_names) < 2:
fig, ax = plt.subplots(figsize=(8, 5))
ax.text(0.5, 0.5, "Need at least 2 parameters for 2D surface",
ha='center', va='center', fontsize=14, transform=ax.transAxes)
return fig
x1_range = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution)
x2_range = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution)
X1, X2 = np.meshgrid(x1_range, x2_range)
d = len(param_names)
X_grid = np.zeros((resolution * resolution, d))
X_grid[:, dim_x] = X1.ravel()
X_grid[:, dim_y] = X2.ravel()
for i in range(d):
if i != dim_x and i != dim_y:
if fixed_values and i < len(fixed_values):
X_grid[:, i] = fixed_values[i]
else:
X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2
mu, sigma = predict_with_uncertainty(model, X_grid, model_type)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
c1 = axes[0].contourf(X1, X2, mu.reshape(resolution, resolution), levels=25, cmap='viridis')
axes[0].scatter(X_obs[:, dim_x], X_obs[:, dim_y], c='red', s=60, edgecolors='white',
zorder=5, label='Observations')
plt.colorbar(c1, ax=axes[0])
axes[0].set_xlabel(param_names[dim_x], fontsize=11)
axes[0].set_ylabel(param_names[dim_y], fontsize=11)
axes[0].set_title("Surrogate Mean Prediction", fontsize=12, fontweight='bold')
axes[0].legend(fontsize=9)
c2 = axes[1].contourf(X1, X2, sigma.reshape(resolution, resolution), levels=25, cmap='YlOrRd')
axes[1].scatter(X_obs[:, dim_x], X_obs[:, dim_y], c='blue', s=60, edgecolors='white',
zorder=5, label='Observations')
plt.colorbar(c2, ax=axes[1])
axes[1].set_xlabel(param_names[dim_x], fontsize=11)
axes[1].set_ylabel(param_names[dim_y], fontsize=11)
axes[1].set_title("Prediction Uncertainty (Οƒ)", fontsize=12, fontweight='bold')
axes[1].legend(fontsize=9)
plt.tight_layout()
return fig
def plot_gp_1d_slices(model, model_type, param_names, bounds, X_obs, y_obs):
"""1D slice plots through each parameter dimension."""
d = len(param_names)
n_cols = min(3, d)
n_rows = int(np.ceil(d / n_cols))
fig, axes = plt.subplots(n_rows, n_cols, figsize=(5 * n_cols, 4 * n_rows))
if d == 1:
axes = np.array([axes])
axes = np.atleast_2d(axes)
center = np.array([(b[0] + b[1]) / 2 for b in bounds])
for i in range(d):
row, col = divmod(i, n_cols)
ax = axes[row, col]
x_range = np.linspace(bounds[i][0], bounds[i][1], 200)
X_plot = np.tile(center, (200, 1))
X_plot[:, i] = x_range
mu, sigma = predict_with_uncertainty(model, X_plot, model_type)
ax.fill_between(x_range, mu - 2 * sigma, mu + 2 * sigma, alpha=0.2, color='steelblue', label='Β±2Οƒ')
ax.fill_between(x_range, mu - sigma, mu + sigma, alpha=0.3, color='steelblue', label='Β±1Οƒ')
ax.plot(x_range, mu, 'b-', linewidth=2, label='Mean')
ax.scatter(X_obs[:, i], y_obs, c='red', s=40, zorder=5, label='Observations')
ax.set_xlabel(param_names[i], fontsize=11)
ax.set_ylabel("Response", fontsize=11)
ax.set_title(f"Effect of {param_names[i]}", fontsize=11, fontweight='bold')
ax.legend(fontsize=8)
ax.grid(True, alpha=0.3)
for i in range(d, n_rows * n_cols):
row, col = divmod(i, n_cols)
axes[row, col].set_visible(False)
plt.tight_layout()
return fig
def plot_acquisition_landscape(model, model_type, acq_name, param_names, bounds,
X_obs, y_obs, y_best, xi, kappa, maximize,
dim_x=0, dim_y=1, resolution=60):
"""Plot the acquisition function landscape."""
if len(param_names) < 2:
fig, ax = plt.subplots(figsize=(8, 5))
ax.text(0.5, 0.5, "Need at least 2 parameters", ha='center', va='center', fontsize=14,
transform=ax.transAxes)
return fig
x1_range = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution)
x2_range = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution)
X1, X2 = np.meshgrid(x1_range, x2_range)
d = len(param_names)
X_grid = np.zeros((resolution * resolution, d))
X_grid[:, dim_x] = X1.ravel()
X_grid[:, dim_y] = X2.ravel()
for i in range(d):
if i != dim_x and i != dim_y:
X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2
acq_fn = ACQ_MAP.get(acq_name, expected_improvement)
if acq_name == "Upper Confidence Bound (UCB)":
acq_vals = acq_fn(X_grid, model, kappa=kappa, model_type=model_type, maximize=maximize)
elif acq_name == "Thompson Sampling":
acq_vals = acq_fn(X_grid, model, model_type=model_type, maximize=maximize)
else:
acq_vals = acq_fn(X_grid, model, y_best, xi=xi, model_type=model_type, maximize=maximize)
fig, ax = plt.subplots(figsize=(8, 6))
c = ax.contourf(X1, X2, acq_vals.reshape(resolution, resolution), levels=25, cmap='plasma')
ax.scatter(X_obs[:, dim_x], X_obs[:, dim_y], c='white', s=60, edgecolors='black',
zorder=5, label='Observations')
best_idx = np.argmax(acq_vals)
ax.scatter(X_grid[best_idx, dim_x], X_grid[best_idx, dim_y],
marker='*', c='yellow', s=300, edgecolors='black', zorder=10, label='Best acquisition')
plt.colorbar(c, ax=ax)
ax.set_xlabel(param_names[dim_x], fontsize=11)
ax.set_ylabel(param_names[dim_y], fontsize=11)
ax.set_title(f"Acquisition Function: {acq_name}", fontsize=12, fontweight='bold')
ax.legend(fontsize=9)
plt.tight_layout()
return fig
def plot_convergence(y_obs, maximize=True):
"""Plot optimization convergence."""
if len(y_obs) == 0:
fig, ax = plt.subplots(figsize=(8, 5))
ax.text(0.5, 0.5, "No data yet", ha='center', va='center', fontsize=14, transform=ax.transAxes)
return fig
if maximize:
best_so_far = np.maximum.accumulate(y_obs)
ylabel = "Best Response (↑)"
else:
best_so_far = np.minimum.accumulate(y_obs)
ylabel = "Best Response (↓)"
iters = np.arange(1, len(y_obs) + 1)
fig, ax = plt.subplots(figsize=(9, 5))
ax.plot(iters, best_so_far, 'b-o', linewidth=2, markersize=4, label='Best so far', zorder=4)
ax.scatter(iters, y_obs, c='red', s=30, alpha=0.6, zorder=5, label='Individual observations')
ax.fill_between(iters, best_so_far, y_obs, alpha=0.1, color='blue')
ax.set_xlabel("Experiment Number", fontsize=11)
ax.set_ylabel(ylabel, fontsize=11)
ax.set_title("Optimization Convergence", fontsize=13, fontweight='bold')
ax.legend(fontsize=10)
ax.grid(True, alpha=0.3)
plt.tight_layout()
return fig
def plot_parallel_coordinates(X_obs, y_obs, param_names):
"""Interactive parallel coordinates plot with Plotly."""
if len(X_obs) == 0:
fig = go.Figure()
fig.add_annotation(text="No data yet", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=20))
return fig
df = pd.DataFrame(X_obs, columns=param_names)
df['Response'] = y_obs
dimensions = []
for col in param_names:
dimensions.append(dict(range=[df[col].min(), df[col].max()], label=col, values=df[col]))
dimensions.append(dict(range=[df['Response'].min(), df['Response'].max()], label='Response', values=df['Response']))
fig = go.Figure(data=go.Parcoords(
line=dict(color=df['Response'], colorscale='Viridis', showscale=True, colorbar=dict(title='Response')),
dimensions=dimensions
))
fig.update_layout(title="Parallel Coordinates: All Experiments", height=450, margin=dict(l=80, r=80, t=60, b=40))
return fig
def plot_predicted_vs_actual(model, model_type, X_obs, y_obs):
"""Parity plot: predicted vs actual."""
if len(X_obs) == 0:
fig, ax = plt.subplots(figsize=(6, 6))
ax.text(0.5, 0.5, "No data", ha='center', va='center', fontsize=14, transform=ax.transAxes)
return fig
mu, sigma = predict_with_uncertainty(model, X_obs, model_type)
fig, ax = plt.subplots(figsize=(6, 6))
ax.errorbar(y_obs, mu, yerr=2 * sigma, fmt='o', color='steelblue', ecolor='lightblue',
elinewidth=1.5, capsize=3, markersize=6, label='Predictions Β± 2Οƒ')
mn = min(y_obs.min(), mu.min())
mx = max(y_obs.max(), mu.max())
pad = (mx - mn) * 0.1
ax.plot([mn - pad, mx + pad], [mn - pad, mx + pad], 'k--', linewidth=1, label='Perfect fit')
ss_res = np.sum((y_obs - mu) ** 2)
ss_tot = np.sum((y_obs - y_obs.mean()) ** 2)
r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0
rmse = np.sqrt(np.mean((y_obs - mu) ** 2))
ax.set_xlabel("Actual", fontsize=11)
ax.set_ylabel("Predicted", fontsize=11)
ax.set_title(f"Predicted vs Actual (RΒ²={r2:.4f}, RMSE={rmse:.4f})", fontsize=12, fontweight='bold')
ax.legend(fontsize=9)
ax.grid(True, alpha=0.3)
ax.set_aspect('equal', adjustable='box')
plt.tight_layout()
return fig
def plot_feature_importance(model, model_type, param_names):
"""Feature importance from surrogate model."""
fig, ax = plt.subplots(figsize=(8, max(3, 0.5 * len(param_names))))
if model_type == "Gaussian Process":
try:
kernel = model.kernel_
ls = None
def find_ls(k):
if hasattr(k, 'length_scale') and not isinstance(k, WhiteKernel):
return np.atleast_1d(k.length_scale)
if hasattr(k, 'k1'):
r = find_ls(k.k1)
if r is not None:
return r
if hasattr(k, 'k2'):
r = find_ls(k.k2)
if r is not None:
return r
return None
ls = find_ls(kernel)
if ls is not None and len(ls) == len(param_names):
importance = 1.0 / ls
importance = importance / importance.sum()
else:
importance = np.ones(len(param_names)) / len(param_names)
except Exception:
importance = np.ones(len(param_names)) / len(param_names)
title = "Feature Importance (GP Inverse Lengthscale)"
elif model_type in ["Random Forest", "Extra Trees", "Gradient Boosting"]:
importance = model.feature_importances_
title = f"Feature Importance ({model_type})"
else:
importance = np.ones(len(param_names)) / len(param_names)
title = "Feature Importance"
sorted_idx = np.argsort(importance)
ax.barh(range(len(param_names)), importance[sorted_idx], color='steelblue', edgecolor='navy')
ax.set_yticks(range(len(param_names)))
ax.set_yticklabels(np.array(param_names)[sorted_idx])
ax.set_xlabel("Importance", fontsize=11)
ax.set_title(title, fontsize=12, fontweight='bold')
ax.grid(True, alpha=0.3, axis='x')
plt.tight_layout()
return fig
def plot_residuals(model, model_type, X_obs, y_obs):
"""Residual analysis plots."""
if len(X_obs) == 0:
fig, ax = plt.subplots()
return fig
mu, _ = predict_with_uncertainty(model, X_obs, model_type)
residuals = y_obs - mu
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
axes[0].scatter(mu, residuals, c='steelblue', s=40, edgecolors='navy')
axes[0].axhline(y=0, color='red', linestyle='--', linewidth=1)
axes[0].set_xlabel("Predicted", fontsize=10)
axes[0].set_ylabel("Residual", fontsize=10)
axes[0].set_title("Residuals vs Predicted", fontsize=11, fontweight='bold')
axes[0].grid(True, alpha=0.3)
axes[1].hist(residuals, bins=max(5, len(residuals) // 3), color='steelblue', edgecolor='navy', alpha=0.8)
axes[1].set_xlabel("Residual", fontsize=10)
axes[1].set_ylabel("Count", fontsize=10)
axes[1].set_title("Residual Distribution", fontsize=11, fontweight='bold')
axes[1].grid(True, alpha=0.3)
from scipy.stats import probplot
probplot(residuals, dist="norm", plot=axes[2])
axes[2].set_title("Normal Q-Q Plot", fontsize=11, fontweight='bold')
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
return fig
def plot_3d_surface(model, model_type, param_names, bounds, X_obs, y_obs, dim_x=0, dim_y=1):
"""3D surface plot using Plotly."""
if len(param_names) < 2:
fig = go.Figure()
fig.add_annotation(text="Need 2+ parameters", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False)
return fig
resolution = 40
x1 = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution)
x2 = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution)
X1, X2 = np.meshgrid(x1, x2)
d = len(param_names)
X_grid = np.zeros((resolution * resolution, d))
X_grid[:, dim_x] = X1.ravel()
X_grid[:, dim_y] = X2.ravel()
for i in range(d):
if i != dim_x and i != dim_y:
X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2
mu, sigma = predict_with_uncertainty(model, X_grid, model_type)
fig = go.Figure()
fig.add_trace(go.Surface(
x=x1, y=x2, z=mu.reshape(resolution, resolution),
colorscale='Viridis', name='GP Mean', opacity=0.85,
colorbar=dict(title='Predicted')
))
fig.add_trace(go.Scatter3d(
x=X_obs[:, dim_x], y=X_obs[:, dim_y], z=y_obs,
mode='markers', marker=dict(size=5, color='red', symbol='diamond'),
name='Observations'
))
fig.update_layout(
title="3D Response Surface",
scene=dict(xaxis_title=param_names[dim_x], yaxis_title=param_names[dim_y], zaxis_title="Response"),
height=550, margin=dict(l=0, r=0, t=40, b=0)
)
return fig
def plot_uncertainty_heatmap(model, model_type, param_names, bounds, X_obs, dim_x=0, dim_y=1):
"""Heatmap of prediction uncertainty."""
if len(param_names) < 2:
fig = go.Figure()
fig.add_annotation(text="Need 2+ params", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False)
return fig
resolution = 50
x1 = np.linspace(bounds[dim_x][0], bounds[dim_x][1], resolution)
x2 = np.linspace(bounds[dim_y][0], bounds[dim_y][1], resolution)
X1, X2 = np.meshgrid(x1, x2)
d = len(param_names)
X_grid = np.zeros((resolution * resolution, d))
X_grid[:, dim_x] = X1.ravel()
X_grid[:, dim_y] = X2.ravel()
for i in range(d):
if i != dim_x and i != dim_y:
X_grid[:, i] = (bounds[i][0] + bounds[i][1]) / 2
_, sigma = predict_with_uncertainty(model, X_grid, model_type)
fig = go.Figure(data=go.Heatmap(
x=x1, y=x2, z=sigma.reshape(resolution, resolution),
colorscale='YlOrRd', colorbar=dict(title='Uncertainty (Οƒ)')
))
fig.add_trace(go.Scatter(
x=X_obs[:, dim_x], y=X_obs[:, dim_y],
mode='markers', marker=dict(size=8, color='blue', symbol='x', line=dict(width=1)),
name='Observations'
))
fig.update_layout(
title="Prediction Uncertainty Map",
xaxis_title=param_names[dim_x], yaxis_title=param_names[dim_y],
height=450
)
return fig
def plot_correlation_matrix(X_obs, y_obs, param_names):
"""Correlation matrix heatmap."""
if len(X_obs) == 0 or len(X_obs) < 3:
fig = go.Figure()
fig.add_annotation(text="Need β‰₯3 data points", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False)
return fig
df = pd.DataFrame(X_obs, columns=param_names)
df['Response'] = y_obs
corr = df.corr()
fig = go.Figure(data=go.Heatmap(
z=corr.values, x=corr.columns.tolist(), y=corr.index.tolist(),
colorscale='RdBu_r', zmin=-1, zmax=1,
text=np.round(corr.values, 2), texttemplate="%{text}",
colorbar=dict(title='Correlation')
))
fig.update_layout(title="Correlation Matrix", height=450, xaxis_title="", yaxis_title="")
return fig
def plot_scatter_matrix(X_obs, y_obs, param_names):
"""Scatter matrix (pair plot)."""
if len(X_obs) < 2:
fig = go.Figure()
fig.add_annotation(text="Need β‰₯2 data points", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False)
return fig
df = pd.DataFrame(X_obs, columns=param_names)
df['Response'] = y_obs
fig = px.scatter_matrix(df, dimensions=param_names + ['Response'],
color='Response', color_continuous_scale='Viridis',
title="Scatter Matrix", height=max(400, 150 * len(param_names)))
fig.update_traces(diagonal_visible=True, marker=dict(size=4))
fig.update_layout(margin=dict(l=40, r=40, t=60, b=40))
return fig
# ═══════════════════════════════════════════════════════════════════
# MAIN DASHBOARD
# ═══════════════════════════════════════════════════════════════════
CUSTOM_CSS = """
.main-header { text-align: center; margin-bottom: 5px; }
.main-header h1 { font-size: 1.8em; margin-bottom: 2px; }
.main-header p { color: #666; font-size: 0.95em; }
.info-box { background: #f0f7ff; border-left: 4px solid #2196F3; padding: 10px 14px; border-radius: 4px; margin: 5px 0; font-size: 0.9em; }
.warn-box { background: #fff8e1; border-left: 4px solid #ff9800; padding: 10px 14px; border-radius: 4px; margin: 5px 0; font-size: 0.9em; }
"""
def create_default_factor_table():
return pd.DataFrame({
"Factor Name": ["Temperature", "Pressure", "Time"],
"Levels (comma-sep or lo:hi:n)": ["100, 150, 200", "1, 2, 3", "30, 60"]
})
def add_factor_row(df):
new_row = pd.DataFrame({"Factor Name": [f"Factor_{len(df)+1}"], "Levels (comma-sep or lo:hi:n)": ["0, 1"]})
return pd.concat([df, new_row], ignore_index=True)
def remove_last_factor(df):
if len(df) > 1:
return df.iloc[:-1].reset_index(drop=True)
return df
def generate_design(factor_df, design_type, n_lhs_runs):
if design_type == "Full Factorial":
return generate_full_factorial(factor_df)
elif design_type == "Fractional Factorial (Β½)":
return generate_fractional_factorial(factor_df, 0.5)
elif design_type == "Fractional Factorial (ΒΌ)":
return generate_fractional_factorial(factor_df, 0.25)
elif design_type == "Latin Hypercube (LHS)":
return generate_lhs_design(factor_df, n_lhs_runs)
elif design_type == "Sobol Sequence":
return generate_sobol_design(factor_df, n_lhs_runs)
elif design_type == "Central Composite (CCD)":
return generate_central_composite(factor_df)
elif design_type == "Box-Behnken":
return generate_box_behnken(factor_df)
else:
return generate_full_factorial(factor_df)
def add_experiment_row(exp_df):
if exp_df is None or len(exp_df) == 0:
return exp_df
new_row = pd.DataFrame({col: [np.nan] for col in exp_df.columns})
new_row["Run"] = len(exp_df) + 1
return pd.concat([exp_df, new_row], ignore_index=True)
def add_column_to_experiments(exp_df, col_name):
if exp_df is None or not col_name.strip():
return exp_df
exp_df[col_name.strip()] = np.nan
return exp_df
def compute_doe_stats(exp_df):
if exp_df is None or len(exp_df) == 0:
return "No experiments generated yet."
n_runs = len(exp_df)
n_params = len([c for c in exp_df.columns if c not in ["Run", "Response"]])
n_filled = exp_df["Response"].notna().sum() if "Response" in exp_df.columns else 0
return f"""### πŸ“Š Design Summary
| Metric | Value |
|--------|-------|
| Total Runs | {n_runs} |
| Parameters | {n_params} |
| Responses Filled | {n_filled} / {n_runs} |
| Completion | {n_filled/n_runs*100:.0f}% |
"""
def run_bo_pipeline(exp_df, model_type, kernel_name, acq_name, n_suggestions,
xi_val, kappa_val, maximize, n_estimators, alpha_val, n_restarts):
"""Run the full BO pipeline."""
empty_fig = plt.figure(figsize=(6, 4))
empty_plotly = go.Figure()
n_outputs = 16
def make_error(msg):
return (msg, pd.DataFrame(), msg,
empty_fig, empty_fig, empty_fig, empty_fig, empty_fig,
empty_fig, empty_fig, empty_plotly, empty_plotly, empty_plotly, empty_plotly,
empty_plotly, None)
if exp_df is None or len(exp_df) == 0:
return make_error("⚠️ No experiment data. Generate a design first and fill in Response values.")
param_cols = [c for c in exp_df.columns if c not in ["Run", "Response"]]
if "Response" not in exp_df.columns:
return make_error("⚠️ No 'Response' column found in experiment table.")
valid_mask = exp_df["Response"].notna()
if valid_mask.sum() < 2:
return make_error("⚠️ Need at least 2 completed experiments (with Response values) to fit a model.")
X_obs = exp_df.loc[valid_mask, param_cols].values.astype(float)
y_obs = exp_df.loc[valid_mask, "Response"].values.astype(float)
param_names = param_cols
n_dims = len(param_names)
bounds = []
for col in param_cols:
all_vals = exp_df[col].dropna().astype(float)
lo, hi = all_vals.min(), all_vals.max()
if lo == hi:
lo -= 1; hi += 1
bounds.append((lo, hi))
try:
model = build_surrogate(model_type, kernel_name, n_dims, int(n_estimators), float(alpha_val), int(n_restarts))
model.fit(X_obs, y_obs)
except Exception as e:
return make_error(f"⚠️ Model fitting failed: {str(e)}")
mu_train, sigma_train = predict_with_uncertainty(model, X_obs, model_type)
ss_res = np.sum((y_obs - mu_train) ** 2)
ss_tot = np.sum((y_obs - y_obs.mean()) ** 2)
r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0
rmse = np.sqrt(np.mean((y_obs - mu_train) ** 2))
y_best = y_obs.max() if maximize else y_obs.min()
best_idx = np.argmax(y_obs) if maximize else np.argmin(y_obs)
fit_report = f"""### πŸ€– Model Fit Report
| Metric | Value |
|--------|-------|
| Model | {model_type} ({kernel_name if model_type == 'Gaussian Process' else ''}) |
| Training Points | {len(X_obs)} |
| RΒ² Score | {r2:.4f} |
| RMSE | {rmse:.4f} |
| Best Observed | {y_best:.4f} (Run #{int(exp_df.loc[valid_mask].iloc[best_idx]['Run']) if 'Run' in exp_df.columns else best_idx+1}) |
| Objective | {'Maximize ↑' if maximize else 'Minimize ↓'} |
"""
if model_type == "Gaussian Process":
try:
fit_report += f"| Kernel (fitted) | `{model.kernel_}` |\n"
fit_report += f"| Log Marginal Likelihood | {model.log_marginal_likelihood_value_:.2f} |\n"
except:
pass
try:
X_sug, acq_sug, mu_sug, sigma_sug = suggest_next_experiments(
model, model_type, acq_name, bounds, int(n_suggestions),
y_best, float(xi_val), float(kappa_val), maximize
)
sug_df = pd.DataFrame(X_sug, columns=param_names)
sug_df.insert(0, "Suggestion #", range(1, len(sug_df) + 1))
sug_df["Predicted Mean"] = mu_sug.round(4)
sug_df["Predicted Οƒ"] = sigma_sug.round(4)
sug_df["Acquisition Value"] = acq_sug.round(6)
for col in param_names:
sug_df[col] = sug_df[col].round(4)
except Exception as e:
sug_df = pd.DataFrame({"Error": [str(e)]})
# Generate all plots with error handling
def safe_plot(fn, *args, plotly=False, **kwargs):
try:
return fn(*args, **kwargs)
except Exception:
return go.Figure() if plotly else plt.figure()
fig_surface = safe_plot(plot_gp_surface_2d, model, model_type, param_names, bounds, X_obs, y_obs)
fig_1d = safe_plot(plot_gp_1d_slices, model, model_type, param_names, bounds, X_obs, y_obs)
fig_acq = safe_plot(plot_acquisition_landscape, model, model_type, acq_name, param_names, bounds,
X_obs, y_obs, y_best, float(xi_val), float(kappa_val), maximize)
fig_conv = safe_plot(plot_convergence, y_obs, maximize)
fig_pva = safe_plot(plot_predicted_vs_actual, model, model_type, X_obs, y_obs)
fig_imp = safe_plot(plot_feature_importance, model, model_type, param_names)
fig_resid = safe_plot(plot_residuals, model, model_type, X_obs, y_obs)
fig_3d = safe_plot(plot_3d_surface, model, model_type, param_names, bounds, X_obs, y_obs, plotly=True)
fig_parallel = safe_plot(plot_parallel_coordinates, X_obs, y_obs, param_names, plotly=True)
fig_unc = safe_plot(plot_uncertainty_heatmap, model, model_type, param_names, bounds, X_obs, plotly=True)
fig_corr = safe_plot(plot_correlation_matrix, X_obs, y_obs, param_names, plotly=True)
fig_scatter = safe_plot(plot_scatter_matrix, X_obs, y_obs, param_names, plotly=True)
model_state_val = {
"model": model, "model_type": model_type, "param_names": param_names,
"bounds": bounds, "X_obs": X_obs, "y_obs": y_obs, "y_best": y_best,
}
return (fit_report, sug_df, compute_doe_stats(exp_df),
fig_surface, fig_1d, fig_acq, fig_conv, fig_pva,
fig_imp, fig_resid, fig_3d, fig_parallel, fig_unc, fig_corr,
fig_scatter, model_state_val)
def merge_suggestions(exp_df, sug_df):
if sug_df is None or len(sug_df) == 0 or exp_df is None:
return exp_df
if "Error" in sug_df.columns:
return exp_df
param_cols = [c for c in exp_df.columns if c not in ["Run", "Response"]]
sug_params = [c for c in sug_df.columns if c in param_cols]
if not sug_params:
return exp_df
new_rows = []
start_run = int(exp_df["Run"].max()) + 1 if "Run" in exp_df.columns else len(exp_df) + 1
for i, (_, row) in enumerate(sug_df.iterrows()):
new_row = {"Run": start_run + i}
for col in param_cols:
new_row[col] = row[col] if col in sug_df.columns else np.nan
new_row["Response"] = np.nan
for col in exp_df.columns:
if col not in new_row:
new_row[col] = np.nan
new_rows.append(new_row)
return pd.concat([exp_df, pd.DataFrame(new_rows)], ignore_index=True)
# ═══════════════════════════════════════════════════════════════════
# BUILD THE DASHBOARD
# ═══════════════════════════════════════════════════════════════════
with gr.Blocks(title="DoE & Bayesian Optimization Dashboard", css=CUSTOM_CSS) as demo:
model_state = gr.State(value=None)
gr.HTML("""
<div class="main-header">
<h1>πŸ”¬ Design of Experiments & Bayesian Optimization</h1>
<p>Plan experiments, fit surrogate models, and get AI-guided suggestions for your next experiments</p>
</div>
""")
with gr.Tabs():
# ── TAB 1: DESIGN OF EXPERIMENTS ──
with gr.Tab("πŸ“ Design of Experiments", id="doe_tab"):
gr.HTML('<div class="info-box">πŸ’‘ <b>Define your factors and levels below.</b> Levels can be comma-separated (e.g. <code>10, 20, 30</code>) or range notation (<code>10:30:3</code> = 3 points from 10 to 30).</div>')
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("### βš™οΈ Factor Definition")
factor_table = gr.Dataframe(
value=create_default_factor_table(),
headers=["Factor Name", "Levels (comma-sep or lo:hi:n)"],
interactive=True, label="Define Factors & Levels",
)
with gr.Row():
add_factor_btn = gr.Button("βž• Add Factor", size="sm")
remove_factor_btn = gr.Button("βž– Remove Last", size="sm")
with gr.Column(scale=1):
gr.Markdown("### 🎯 Design Type")
design_type = gr.Dropdown(
choices=["Full Factorial", "Fractional Factorial (Β½)", "Fractional Factorial (ΒΌ)",
"Latin Hypercube (LHS)", "Sobol Sequence", "Central Composite (CCD)", "Box-Behnken"],
value="Full Factorial", label="Design Method",
)
n_space_filling = gr.Slider(minimum=4, maximum=200, value=20, step=1,
label="Number of Runs (for LHS/Sobol)", visible=False)
generate_btn = gr.Button("πŸš€ Generate Experiment Design", variant="primary", size="lg")
doe_stats = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("### πŸ“‹ Experiment Table")
gr.HTML('<div class="info-box">πŸ“ Edit cells directly. Fill in the <b>Response</b> column with your experimental results. You can add rows or columns as needed.</div>')
experiment_table = gr.Dataframe(value=pd.DataFrame(), interactive=True,
label="Experiments (edit Response values here)")
with gr.Row():
add_row_btn = gr.Button("βž• Add Row", size="sm")
new_col_name = gr.Textbox(label="New Column Name", placeholder="e.g. Yield", scale=2)
add_col_btn = gr.Button("βž• Add Column", size="sm")
with gr.Row():
download_btn = gr.Button("πŸ“₯ Export as CSV", size="sm")
csv_output = gr.File(label="Download", visible=False)
def show_n_runs(dt):
return gr.Slider(visible=dt in ["Latin Hypercube (LHS)", "Sobol Sequence"])
design_type.change(show_n_runs, inputs=design_type, outputs=n_space_filling)
add_factor_btn.click(add_factor_row, inputs=factor_table, outputs=factor_table)
remove_factor_btn.click(remove_last_factor, inputs=factor_table, outputs=factor_table)
def gen_and_stats(ft, dt, n):
df = generate_design(ft, dt, n)
return df, compute_doe_stats(df)
generate_btn.click(gen_and_stats, inputs=[factor_table, design_type, n_space_filling],
outputs=[experiment_table, doe_stats])
add_row_btn.click(add_experiment_row, inputs=experiment_table, outputs=experiment_table)
add_col_btn.click(add_column_to_experiments, inputs=[experiment_table, new_col_name], outputs=experiment_table)
def export_csv(df):
if df is None or len(df) == 0:
return gr.File(visible=False)
path = "/tmp/experiment_design.csv"
df.to_csv(path, index=False)
return gr.File(value=path, visible=True)
download_btn.click(export_csv, inputs=experiment_table, outputs=csv_output)
# ── TAB 2: BAYESIAN OPTIMIZATION ──
with gr.Tab("πŸ€– Bayesian Optimization", id="bo_tab"):
gr.HTML('<div class="info-box">🧠 <b>Configure your surrogate model and acquisition function</b>, then click "Run" to fit the model and get suggestions for your next experiments.</div>')
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ”§ Surrogate Model")
model_type = gr.Dropdown(
choices=["Gaussian Process", "Random Forest", "Extra Trees", "Gradient Boosting"],
value="Gaussian Process", label="Model Type",
)
with gr.Group() as gp_options:
kernel_type = gr.Dropdown(
choices=["MatΓ©rn 5/2", "MatΓ©rn 3/2", "RBF", "Rational Quadratic", "RBF + MatΓ©rn"],
value="MatΓ©rn 5/2", label="GP Kernel",
)
alpha_val = gr.Number(value=1e-6, label="Alpha (noise regularization)", precision=8)
n_restarts = gr.Slider(1, 20, value=5, step=1, label="Optimizer Restarts")
with gr.Group(visible=False) as tree_options:
n_estimators = gr.Slider(10, 500, value=100, step=10, label="Number of Trees")
gr.Markdown("---")
gr.Markdown("### 🎯 Acquisition Function")
acq_function = gr.Dropdown(
choices=["Expected Improvement (EI)", "Probability of Improvement (PI)",
"Upper Confidence Bound (UCB)", "Thompson Sampling"],
value="Expected Improvement (EI)", label="Acquisition Function",
)
with gr.Group() as ei_pi_options:
xi_param = gr.Slider(0.0, 1.0, value=0.01, step=0.001, label="ΞΎ (exploration-exploitation)")
with gr.Group(visible=False) as ucb_options:
kappa_param = gr.Slider(0.1, 10.0, value=2.0, step=0.1, label="ΞΊ (exploration weight)")
gr.Markdown("---")
gr.Markdown("### πŸ“Š Optimization Settings")
maximize_toggle = gr.Checkbox(value=True, label="Maximize (uncheck to minimize)")
n_suggestions = gr.Slider(1, 20, value=5, step=1, label="Number of Suggestions")
run_bo_btn = gr.Button("πŸš€ Run Optimization", variant="primary", size="lg")
with gr.Column(scale=2):
gr.Markdown("### πŸ“ˆ Model Fit Report")
fit_report_md = gr.Markdown("*Run the optimizer to see results...*")
gr.Markdown("### πŸ’‘ Suggested Next Experiments")
suggestion_table = gr.Dataframe(label="Next Experiments to Run", interactive=False)
merge_btn = gr.Button("πŸ“‹ Add Suggestions to Experiment Table", variant="secondary")
merge_status = gr.Markdown("")
doe_stats_bo = gr.Markdown("")
def toggle_model_options(mt):
return gr.Group(visible=mt == "Gaussian Process"), gr.Group(visible=mt != "Gaussian Process")
def toggle_acq_options(acq):
is_ucb = acq == "Upper Confidence Bound (UCB)"
is_ts = acq == "Thompson Sampling"
return gr.Group(visible=not is_ucb and not is_ts), gr.Group(visible=is_ucb)
model_type.change(toggle_model_options, inputs=model_type, outputs=[gp_options, tree_options])
acq_function.change(toggle_acq_options, inputs=acq_function, outputs=[ei_pi_options, ucb_options])
# ── TAB 3: VISUALIZATIONS ──
with gr.Tab("πŸ“Š Visualizations", id="viz_tab"):
gr.HTML('<div class="info-box">πŸ“‰ <b>Toggle visualizations on/off.</b> All plots update when you run the optimizer. Use the checkboxes to show/hide each visualization.</div>')
gr.Markdown("### πŸŽ›οΈ Visualization Controls")
with gr.Row():
show_surface = gr.Checkbox(value=True, label="πŸ—ΊοΈ Surrogate Surface (2D)")
show_1d = gr.Checkbox(value=True, label="πŸ“‰ 1D Parameter Slices")
show_3d = gr.Checkbox(value=True, label="πŸ”οΈ 3D Response Surface")
show_acq = gr.Checkbox(value=True, label="🎯 Acquisition Landscape")
with gr.Row():
show_convergence = gr.Checkbox(value=True, label="πŸ“ˆ Convergence Plot")
show_pva = gr.Checkbox(value=True, label="βš–οΈ Predicted vs Actual")
show_importance = gr.Checkbox(value=True, label="πŸ“Š Feature Importance")
show_residuals = gr.Checkbox(value=True, label="πŸ“ Residual Analysis")
with gr.Row():
show_parallel = gr.Checkbox(value=True, label="πŸ”€ Parallel Coordinates")
show_uncertainty = gr.Checkbox(value=True, label="🌑️ Uncertainty Heatmap")
show_correlation = gr.Checkbox(value=True, label="πŸ”— Correlation Matrix")
show_scatter_mat = gr.Checkbox(value=False, label="πŸ”˜ Scatter Matrix")
gr.Markdown("---")
with gr.Column(visible=True) as surface_col:
gr.Markdown("### πŸ—ΊοΈ Surrogate Model Surface (2D Contour)")
surface_plot = gr.Plot(label="Mean & Uncertainty Surface")
with gr.Column(visible=True) as slices_col:
gr.Markdown("### πŸ“‰ 1D Parameter Effect Slices")
slices_plot = gr.Plot(label="1D GP Slices")
with gr.Column(visible=True) as three_d_col:
gr.Markdown("### πŸ”οΈ 3D Response Surface (Interactive)")
three_d_plot = gr.Plot(label="3D Surface")
with gr.Column(visible=True) as acq_col:
gr.Markdown("### 🎯 Acquisition Function Landscape")
acq_plot = gr.Plot(label="Acquisition Function")
with gr.Column(visible=True) as conv_col:
gr.Markdown("### πŸ“ˆ Optimization Convergence")
convergence_plot = gr.Plot(label="Convergence")
with gr.Column(visible=True) as pva_col:
gr.Markdown("### βš–οΈ Predicted vs Actual (Parity Plot)")
pva_plot = gr.Plot(label="Predicted vs Actual")
with gr.Column(visible=True) as imp_col:
gr.Markdown("### πŸ“Š Feature Importance")
importance_plot = gr.Plot(label="Feature Importance")
with gr.Column(visible=True) as resid_col:
gr.Markdown("### πŸ“ Residual Analysis")
residuals_plot = gr.Plot(label="Residuals")
with gr.Column(visible=True) as parallel_col:
gr.Markdown("### πŸ”€ Parallel Coordinates")
parallel_plot = gr.Plot(label="Parallel Coordinates")
with gr.Column(visible=True) as unc_col:
gr.Markdown("### 🌑️ Prediction Uncertainty Map")
unc_plot = gr.Plot(label="Uncertainty Heatmap")
with gr.Column(visible=True) as corr_col:
gr.Markdown("### πŸ”— Correlation Matrix")
corr_plot = gr.Plot(label="Correlation Matrix")
with gr.Column(visible=False) as scatter_mat_col:
gr.Markdown("### πŸ”˜ Scatter Matrix (Pair Plot)")
scatter_mat_plot = gr.Plot(label="Scatter Matrix")
show_surface.change(lambda v: gr.Column(visible=v), inputs=show_surface, outputs=surface_col)
show_1d.change(lambda v: gr.Column(visible=v), inputs=show_1d, outputs=slices_col)
show_3d.change(lambda v: gr.Column(visible=v), inputs=show_3d, outputs=three_d_col)
show_acq.change(lambda v: gr.Column(visible=v), inputs=show_acq, outputs=acq_col)
show_convergence.change(lambda v: gr.Column(visible=v), inputs=show_convergence, outputs=conv_col)
show_pva.change(lambda v: gr.Column(visible=v), inputs=show_pva, outputs=pva_col)
show_importance.change(lambda v: gr.Column(visible=v), inputs=show_importance, outputs=imp_col)
show_residuals.change(lambda v: gr.Column(visible=v), inputs=show_residuals, outputs=resid_col)
show_parallel.change(lambda v: gr.Column(visible=v), inputs=show_parallel, outputs=parallel_col)
show_uncertainty.change(lambda v: gr.Column(visible=v), inputs=show_uncertainty, outputs=unc_col)
show_correlation.change(lambda v: gr.Column(visible=v), inputs=show_correlation, outputs=corr_col)
show_scatter_mat.change(lambda v: gr.Column(visible=v), inputs=show_scatter_mat, outputs=scatter_mat_col)
# ── TAB 4: GUIDE ──
with gr.Tab("πŸ“– Guide", id="guide_tab"):
gr.Markdown("""
## πŸ“– User Guide
### πŸ”„ Workflow
1. **Define Factors** β€” In the DoE tab, set up your experimental parameters and their levels
2. **Generate Design** β€” Choose a design type and generate the experiment matrix
3. **Run Experiments** β€” Perform your physical/simulated experiments and fill in the Response column
4. **Fit Surrogate** β€” Go to the BO tab, configure your model, and click Run
5. **Get Suggestions** β€” The optimizer will suggest the most promising next experiments
6. **Iterate** β€” Add suggestions to your table, run those experiments, and repeat
### πŸ“ Design Types
| Design | Best For | Notes |
|--------|----------|-------|
| **Full Factorial** | Few factors, few levels | Tests every combination. Grows exponentially! |
| **Fractional Factorial** | Many factors, screening | Random subset of full factorial |
| **Latin Hypercube (LHS)** | Space-filling, BO initialization | Even coverage of parameter space |
| **Sobol Sequence** | High-dimensional, quasi-random | Low-discrepancy, n should be power of 2 |
| **Central Composite (CCD)** | Response surface methodology | Factorial + axial + center points |
| **Box-Behnken** | 3+ factors, fewer runs than CCD | No extreme corner points |
### πŸ€– Surrogate Models
| Model | Strengths | Weaknesses |
|-------|-----------|------------|
| **Gaussian Process** | Principled uncertainty, smooth | Scales O(nΒ³), max ~500 points |
| **Random Forest** | Handles categoricals, robust | Step-wise predictions, no gradients |
| **Extra Trees** | Faster than RF, good uncertainty | Similar to RF |
| **Gradient Boosting** | Strong predictive power | Weaker uncertainty estimates |
### 🎯 Acquisition Functions
| Function | Behavior | Parameters |
|----------|----------|------------|
| **Expected Improvement (EI)** | Balanced exploration/exploitation | ΞΎ: higher = more exploration |
| **Probability of Improvement (PI)** | Greedy, exploits known good regions | ΞΎ: trade-off parameter |
| **Upper Confidence Bound (UCB)** | Direct exploration control | ΞΊ: higher = more exploration |
| **Thompson Sampling** | Stochastic, good for batch | None (randomized) |
### πŸ”‘ GP Kernels
| Kernel | Best For |
|--------|----------|
| **MatΓ©rn 5/2** | Most physical/chemical processes (default) |
| **MatΓ©rn 3/2** | Rougher functions |
| **RBF** | Very smooth functions |
| **Rational Quadratic** | Multi-scale patterns |
| **RBF + MatΓ©rn** | Complex functions with mixed smoothness |
### πŸ“Š Visualization Guide
- **Surrogate Surface**: 2D contour of model predictions + uncertainty
- **1D Slices**: Effect of each parameter while others fixed at center
- **3D Surface**: Interactive 3D view of response surface (Plotly)
- **Acquisition Landscape**: Where the optimizer "wants" to sample next
- **Convergence**: Best-so-far trajectory over experiments
- **Predicted vs Actual**: Model accuracy check (parity plot)
- **Feature Importance**: Which parameters matter most
- **Residuals**: Diagnostic plots for model assumptions
- **Parallel Coordinates**: Multi-dimensional experiment overview
- **Uncertainty Heatmap**: Where the model is least certain
- **Correlation Matrix**: Parameter-response correlations
- **Scatter Matrix**: All pairwise relationships
""")
# ── WIRING ──
run_bo_btn.click(
fn=run_bo_pipeline,
inputs=[experiment_table, model_type, kernel_type, acq_function,
n_suggestions, xi_param, kappa_param, maximize_toggle,
n_estimators, alpha_val, n_restarts],
outputs=[fit_report_md, suggestion_table, doe_stats_bo,
surface_plot, slices_plot, acq_plot, convergence_plot, pva_plot,
importance_plot, residuals_plot, three_d_plot, parallel_plot,
unc_plot, corr_plot, scatter_mat_plot, model_state],
)
def do_merge(exp_df, sug_df):
merged = merge_suggestions(exp_df, sug_df)
return merged, "βœ… Suggestions added to experiment table! Switch to DoE tab to see them."
merge_btn.click(do_merge, inputs=[experiment_table, suggestion_table],
outputs=[experiment_table, merge_status])
if __name__ == "__main__":
demo.launch()