""" Dataset Generation Script for DiffEC Flow Matching Model This script generates simulated electrochemical datasets using DiffEC simulators. Each sample contains: - Input: current (flux) and potential profiles over time - Output: 1D temporal concentration distribution for oxidized/reduced species The data format is designed for training flow matching models that learn to generate concentration profiles from electrochemical measurements. Supports multiprocessing for fast parallel dataset generation. """ import os import sys import json import argparse import numpy as np from tqdm import tqdm from multiprocessing import Pool, cpu_count from functools import partial # Pure NumPy/SciPy backend — no JAX dependency for data generation import warnings import scipy.linalg warnings.filterwarnings("ignore", message=".*[Ii]ll.conditioned.*") # ============================================================================= # Simulation Hyperparameters (from DiffEC) # ============================================================================= CYCLES = 1 DELTA_X = 2e-6 # Initial space step DELTA_THETA = 5e-2 # Potential step EXPANDING_GRID_FACTOR = 1.05 SIMULATION_SPACE_MULTIPLE = 6.0 # ============================================================================= # Grid and Coefficient Functions (adapted from DiffEC) # ============================================================================= def calc_n(Xi, deltaX, maxX, expanding_grid_factor): """Calculate number of grid points.""" current_X = Xi n = 0 dX = deltaX while current_X < maxX: current_X += dX dX = dX * expanding_grid_factor n += 1 return n + 1 def gen_grid(Xi, deltaX, maxX, expanding_grid_factor): """Generate expanding spatial grid.""" n = calc_n(Xi, deltaX, maxX, expanding_grid_factor) X_grid = np.zeros(n) X_grid[0] = Xi dX = deltaX for i in range(1, n): X_grid[i] = X_grid[i-1] + dX dX = dX * expanding_grid_factor return X_grid, n def ini_conc(n, C_A_bulk, C_B_bulk): """Initialize concentration arrays.""" conc = np.zeros(2 * n) conc[:n] = C_A_bulk conc[n:] = C_B_bulk conc_d = conc.copy() return conc, conc_d def ini_coeff(n): """Initialize coefficient arrays.""" A_matrix = np.zeros((2*n, 2*n)) aA = np.zeros(n) bA = np.zeros(n) cA = np.zeros(n) aB = np.zeros(n) bB = np.zeros(n) cB = np.zeros(n) return A_matrix, aA, bA, cA, aB, bB, cB def calc_abc_linear(n, X_grid, deltaT, a, b, c, D): """Calculate coefficients for linear diffusion.""" for i in range(1, n-1): deltaX_m = X_grid[i] - X_grid[i-1] deltaX_p = X_grid[i+1] - X_grid[i] a[i] = D * (-2.0 * deltaT) / (deltaX_m * (deltaX_m + deltaX_p)) c[i] = D * (-2.0 * deltaT) / (deltaX_p * (deltaX_m + deltaX_p)) b[i] = 1.0 - a[i] - c[i] return a, b, c def update_d(Theta, conc, conc_d, n, C_A_bulk, C_B_bulk, kinetics, **kw): """Update RHS vector.""" conc_d[:] = conc[:] if kinetics == 'Nernst': conc_d[n-1] = 1.0 / (1.0 + np.exp(-Theta)) conc_d[n] = 0.0 elif kinetics in ('BV', 'MHC'): conc_d[n-1] = 0.0 conc_d[n] = 0.0 elif kinetics == 'Ads': conc_d[n-1] = 0.0 conc_d[n] = 0.0 conc_d[0] = C_A_bulk conc_d[2*n-1] = C_B_bulk return conc_d # ============================================================================= # Marcus-Hush-Chidsey (MHC) rate constants # ============================================================================= def _mhc_integrand_red(Theta_effective, reorg_e, hermgauss_degree=50): """Gauss-Hermite quadrature for MHC reduction rate integral.""" pts, wts = np.polynomial.hermite.hermgauss(hermgauss_degree) y = 2 * np.sqrt(reorg_e) / ( 1.0 + np.exp(-(reorg_e * (pts * 2.0 / np.sqrt(reorg_e) - 1.0) - Theta_effective)) ) return np.sum(wts * y) def _mhc_integrand_ox(Theta_effective, reorg_e, hermgauss_degree=50): """Gauss-Hermite quadrature for MHC oxidation rate integral.""" pts, wts = np.polynomial.hermite.hermgauss(hermgauss_degree) y = -2.0 * np.sqrt(reorg_e) / ( 1.0 + np.exp(-reorg_e * (pts * 2.0 / np.sqrt(reorg_e) - 1.0) - Theta_effective) ) return np.sum(wts * y) def calc_mhc_rates(Theta, K0, reorg_e): """Compute MHC reduction and oxidation rate constants.""" I_red = _mhc_integrand_red(Theta, reorg_e) I_red0 = _mhc_integrand_red(0.0, reorg_e) I_ox = _mhc_integrand_ox(Theta, reorg_e) I_ox0 = _mhc_integrand_ox(0.0, reorg_e) K_red = K0 * I_red / I_red0 K_ox = K0 * I_ox / I_ox0 return K_red, K_ox def calc_matrix(A_matrix, X_grid, Theta, kinetics, n, aA, bA, cA, dA, aB, bB, cB, dB, K0, alpha, beta, **kw): """Build the coefficient matrix for all supported kinetics.""" # Interior points for species A (reversed row indices) rows_A = np.arange(n-2, 0, -1) A_matrix[rows_A, rows_A - 1] = cA[1:n-1] A_matrix[rows_A, rows_A] = bA[1:n-1] A_matrix[rows_A, rows_A + 1] = aA[1:n-1] # Interior points for species B rows_B = np.arange(n+1, 2*n-1) A_matrix[rows_B, rows_B - 1] = aB[1:n-1] A_matrix[rows_B, rows_B] = bB[1:n-1] A_matrix[rows_B, rows_B + 1] = cB[1:n-1] X0 = X_grid[1] - X_grid[0] if kinetics == 'Nernst': A_matrix[n-1, n-2] = 0.0 A_matrix[n-1, n-1] = 1.0 A_matrix[n-1, n] = 0.0 A_matrix[n, n-2] = -dA A_matrix[n, n-1] = dA A_matrix[n, n] = dB A_matrix[n, n+1] = -dB elif kinetics == 'BV': K_red = K0 * np.exp(-alpha * Theta) K_ox = K0 * np.exp(beta * Theta) A_matrix[n-1, n-2] = -1.0 A_matrix[n-1, n-1] = 1.0 + X0/dA * K_red A_matrix[n-1, n] = -X0/dA * K_ox A_matrix[n, n-1] = -X0/dB * K_red A_matrix[n, n] = 1.0 + X0/dB * K_ox A_matrix[n, n+1] = -1.0 elif kinetics == 'MHC': reorg_e = kw.get('reorg_e', 20.0) K_red, K_ox = calc_mhc_rates(Theta, K0, reorg_e) A_matrix[n-1, n-2] = -1.0 A_matrix[n-1, n-1] = 1.0 + X0/dA * K_red A_matrix[n-1, n] = -X0/dA * K_ox A_matrix[n, n-1] = -X0/dB * K_red A_matrix[n, n] = 1.0 + X0/dB * K_ox A_matrix[n, n+1] = -1.0 elif kinetics == 'Ads': K_red = K0 * np.exp(-alpha * Theta) K_ox = K0 * np.exp(beta * Theta) Gamma_sat = kw.get('Gamma_sat', 1.0) A_matrix[n-1, n-2] = -1.0 A_matrix[n-1, n-1] = 1.0 + X0/dA * K_red * Gamma_sat A_matrix[n-1, n] = -X0/dA * K_ox * Gamma_sat A_matrix[n, n-1] = -X0/dB * K_red * Gamma_sat A_matrix[n, n] = 1.0 + X0/dB * K_ox * Gamma_sat A_matrix[n, n+1] = -1.0 # Far boundary conditions A_matrix[0, 0] = 1.0 A_matrix[0, 1] = 0.0 A_matrix[2*n-1, 2*n-1] = 1.0 A_matrix[2*n-1, 2*n-2] = 0.0 return A_matrix def calc_flux(conc, n, dA, X_grid): """Calculate current flux at electrode surface.""" return -dA * (conc[n-2] - conc[n-1]) / (X_grid[1] - X_grid[0]) # ============================================================================= # Main Simulation Function with Concentration History # ============================================================================= def run_cv_simulation( sigma=1.0, K0=1.0, alpha=0.5, beta=None, kinetics='BV', C_A_bulk=1.0, C_B_bulk=0.0, dA=1.0, dB=1.0, theta_i=20.0, theta_v=-20.0, cycles=1, n_spatial_out=64, verbose=False, **extra_kinetics_params, ): """ Run CV simulation and return full concentration history. Parameters ---------- sigma : float Dimensionless scan rate K0 : float Dimensionless electrochemical rate constant alpha : float Cathodic transfer coefficient beta : float, optional Anodic transfer coefficient (default: 1 - alpha) kinetics : str 'BV' for Butler-Volmer or 'Nernst' for reversible C_A_bulk : float Dimensionless bulk concentration of oxidized species C_B_bulk : float Dimensionless bulk concentration of reduced species dA, dB : float Dimensionless diffusion coefficients theta_i, theta_v : float Dimensionless start and vertex potentials cycles : int Number of CV cycles n_spatial_out : int Number of spatial grid points in output (will resample) verbose : bool Show progress bar Returns ------- dict with keys: - 'potential': (T,) array of potentials - 'flux': (T,) array of current flux - 'time': (T,) array of dimensionless time - 'c_ox': (T, X) concentration of oxidized species - 'c_red': (T, X) concentration of reduced species - 'x_grid': (X,) spatial grid (original) - 'x_grid_out': (n_spatial_out,) resampled spatial grid - 'params': dict of simulation parameters """ if beta is None: beta = 1.0 - alpha deltaT = DELTA_THETA / sigma maxT = cycles * 2.0 * np.abs(theta_v - theta_i) / sigma nTimeSteps = int(2 * np.abs(theta_v - theta_i) / DELTA_THETA) + 1 Esteps = np.arange(nTimeSteps) E = np.where( Esteps < nTimeSteps / 2.0, theta_i - DELTA_THETA * Esteps, theta_v + DELTA_THETA * (Esteps - nTimeSteps / 2.0) ) E = np.tile(E, cycles) total_steps = len(E) Xi = 0.0 maxX = SIMULATION_SPACE_MULTIPLE * np.sqrt(maxT) X_grid, n = gen_grid(Xi, DELTA_X, maxX, EXPANDING_GRID_FACTOR) conc, conc_d = ini_conc(n, C_A_bulk, C_B_bulk) A_matrix, aA, bA, cA, aB, bB, cB = ini_coeff(n) aA, bA, cA = calc_abc_linear(n, X_grid, deltaT, aA, bA, cA, dA) aB, bB, cB = calc_abc_linear(n, X_grid, deltaT, aB, bB, cB, dB) fluxes = np.zeros(total_steps) conc_A_history = [] conc_B_history = [] # For Nernst, the matrix is constant (no Theta-dependent terms), # so we factorize once and reuse via lu_solve every timestep. lu_factor = None if kinetics == 'Nernst': A_mat = calc_matrix( A_matrix, X_grid, 0.0, kinetics, n, aA, bA, cA, dA, aB, bB, cB, dB, K0, alpha, beta, **extra_kinetics_params ) lu_factor = scipy.linalg.lu_factor(A_mat) iterator = range(total_steps) if verbose: iterator = tqdm(iterator, desc="Simulating CV") for idx in iterator: Theta = E[idx] conc_d = update_d(Theta, conc, conc_d, n, C_A_bulk, C_B_bulk, kinetics) if lu_factor is not None: conc = scipy.linalg.lu_solve(lu_factor, conc_d) else: A_mat = calc_matrix( A_matrix, X_grid, Theta, kinetics, n, aA, bA, cA, dA, aB, bB, cB, dB, K0, alpha, beta, **extra_kinetics_params ) conc = scipy.linalg.solve(A_mat, conc_d) flux = calc_flux(conc, n, dA, X_grid) fluxes[idx] = flux conc_A_history.append(conc[:n].copy()) conc_B_history.append(conc[n:].copy()) conc_A_history = np.stack(conc_A_history, axis=0) conc_B_history = np.stack(conc_B_history, axis=0) x_grid_out = np.linspace(X_grid[0], X_grid[-1], n_spatial_out) c_ox_out = np.zeros((total_steps, n_spatial_out)) c_red_out = np.zeros((total_steps, n_spatial_out)) for t in range(total_steps): c_ox_out[t] = np.interp(x_grid_out, X_grid, conc_A_history[t]) c_red_out[t] = np.interp(x_grid_out, X_grid, conc_B_history[t]) time_array = np.arange(total_steps) * float(deltaT) params_out = { 'experiment_type': 'CV', 'sigma': float(sigma), 'K0': float(K0), 'alpha': float(alpha), 'beta': float(beta), 'kinetics': kinetics, 'C_A_bulk': float(C_A_bulk), 'C_B_bulk': float(C_B_bulk), 'dA': float(dA), 'dB': float(dB), 'theta_i': float(theta_i), 'theta_v': float(theta_v), 'cycles': cycles, } for k, v in extra_kinetics_params.items(): params_out[k] = float(v) if isinstance(v, (int, float, np.floating)) else v return { 'potential': E, 'flux': fluxes, 'time': time_array, 'c_ox': c_ox_out, 'c_red': c_red_out, 'x_grid': X_grid, 'x_grid_out': x_grid_out, 'params': params_out, } # ============================================================================= # Surface-Confined (Laviron) CV Simulation # ============================================================================= def run_ads_cv_simulation( sigma=1.0, K0=1.0, alpha=0.5, beta=None, Gamma_sat=1.0, theta_i=20.0, theta_v=-20.0, cycles=1, n_spatial_out=64, verbose=False, **extra_params, ): """ Surface-confined (Laviron) CV: no diffusion, ODE for surface coverage. The redox species is adsorbed on the electrode surface. The fractional coverage of the oxidized form, f_ox, evolves as: df_ox/dTheta = -(1/sigma) * [k_red * f_ox - k_ox * (1 - f_ox)] where k_red = K0*exp(-alpha*Theta), k_ox = K0*exp(beta*Theta). Dimensionless current: i = Gamma_sat * df_ox/dTheta * sigma (= Gamma_sat * [k_red*f_ox - k_ox*(1-f_ox)]) Returns the same dict format as run_cv_simulation for compatibility. Concentration arrays represent uniform surface coverage (no spatial variation). """ if beta is None: beta = 1.0 - alpha nTimeSteps = int(2 * np.abs(theta_v - theta_i) / DELTA_THETA) + 1 Esteps = np.arange(nTimeSteps) E = np.where( Esteps < nTimeSteps / 2.0, theta_i - DELTA_THETA * Esteps, theta_v + DELTA_THETA * (Esteps - nTimeSteps / 2.0) ) E = np.tile(E, cycles) total_steps = len(E) deltaT = DELTA_THETA / sigma # Initial condition: fully oxidized surface f_ox = 1.0 fluxes = np.zeros(total_steps) f_ox_history = np.zeros(total_steps) for idx in range(total_steps): Theta = E[idx] k_red = K0 * np.exp(-alpha * Theta) k_ox = K0 * np.exp(beta * Theta) # Analytical solution for this timestep (exact for constant Theta). # df_ox/dt = -(k_red + k_ox)*f_ox + k_ox # f_ox(t+dt) = f_eq + (f_ox - f_eq) * exp(-(k_red+k_ox)*dt) k_total = k_red + k_ox f_eq = k_ox / k_total f_ox_new = f_eq + (f_ox - f_eq) * np.exp(-k_total * deltaT) f_ox_new = np.clip(f_ox_new, 0.0, 1.0) # Current = -Gamma_sat * (change in coverage) / deltaT # Negative sign: reduction (f_ox decreasing) should give negative flux, # consistent with diffusion-based mechanisms where reduction = negative current. fluxes[idx] = -Gamma_sat * (f_ox - f_ox_new) / deltaT f_ox_history[idx] = f_ox f_ox = f_ox_new time_array = np.arange(total_steps) * float(deltaT) # Build concentration arrays compatible with diffusion-based output. # c_ox = f_ox (surface coverage of oxidized), c_red = 1 - f_ox # Replicated across a dummy spatial grid for format compatibility. x_grid_out = np.linspace(0.0, 1.0, n_spatial_out) c_ox_out = np.outer(f_ox_history, np.ones(n_spatial_out)) c_red_out = np.outer(1.0 - f_ox_history, np.ones(n_spatial_out)) params_out = { 'experiment_type': 'CV', 'sigma': float(sigma), 'K0': float(K0), 'alpha': float(alpha), 'beta': float(beta), 'kinetics': 'Ads', 'Gamma_sat': float(Gamma_sat), 'theta_i': float(theta_i), 'theta_v': float(theta_v), 'cycles': cycles, 'C_A_bulk': 0.0, 'C_B_bulk': 0.0, 'dA': 1e-10, 'dB': 1e-10, } return { 'potential': E, 'flux': fluxes, 'time': time_array, 'c_ox': c_ox_out.astype(np.float64), 'c_red': c_red_out.astype(np.float64), 'x_grid': x_grid_out, 'x_grid_out': x_grid_out, 'params': params_out, } # ============================================================================= # EC Mechanism Simulation (Electrochemical-Chemical) # ============================================================================= def run_ec_cv_simulation( sigma=1.0, K0=1.0, alpha=0.5, beta=None, kinetics='EC', C_A_bulk=1.0, C_B_bulk=0.0, dA=1.0, dB=1.0, theta_i=20.0, theta_v=-20.0, cycles=1, n_spatial_out=64, verbose=False, kc=1.0, **extra_params, ): """ EC mechanism CV: A + e- -> B (BV kinetics), then B -> Y (first-order chemical). The follow-up chemical reaction consumes B in the bulk with rate constant kc (dimensionless, scaled by scan rate). This modifies the diffusion equation for species B to include a homogeneous reaction sink term. The PDE for B becomes: dc_B/dt = D_B * d²c_B/dx² - kc * c_B We handle this via operator splitting: after solving the diffusion step (same matrix as BV), we apply the chemical decay analytically: c_B(t+dt) = c_B_diffusion * exp(-kc * dt) Parameters ---------- kc : float Dimensionless first-order chemical rate constant for B -> Y. kc = k_chem / (F*v/RT) where k_chem is the dimensional rate constant and v is the scan rate. Large kc means fast follow-up chemistry. """ if beta is None: beta = 1.0 - alpha deltaT = DELTA_THETA / sigma maxT = cycles * 2.0 * np.abs(theta_v - theta_i) / sigma nTimeSteps = int(2 * np.abs(theta_v - theta_i) / DELTA_THETA) + 1 Esteps = np.arange(nTimeSteps) E = np.where( Esteps < nTimeSteps / 2.0, theta_i - DELTA_THETA * Esteps, theta_v + DELTA_THETA * (Esteps - nTimeSteps / 2.0) ) E = np.tile(E, cycles) total_steps = len(E) Xi = 0.0 maxX = SIMULATION_SPACE_MULTIPLE * np.sqrt(maxT) X_grid, n = gen_grid(Xi, DELTA_X, maxX, EXPANDING_GRID_FACTOR) conc, conc_d = ini_conc(n, C_A_bulk, C_B_bulk) A_matrix, aA, bA, cA, aB, bB, cB = ini_coeff(n) aA, bA, cA = calc_abc_linear(n, X_grid, deltaT, aA, bA, cA, dA) aB, bB, cB = calc_abc_linear(n, X_grid, deltaT, aB, bB, cB, dB) fluxes = np.zeros(total_steps) conc_A_history = [] conc_B_history = [] decay_factor = np.exp(-kc * deltaT) iterator = range(total_steps) if verbose: iterator = tqdm(iterator, desc="Simulating EC CV") for idx in iterator: Theta = E[idx] conc_d = update_d(Theta, conc, conc_d, n, C_A_bulk, C_B_bulk, 'BV') A_mat = calc_matrix( A_matrix, X_grid, Theta, 'BV', n, aA, bA, cA, dA, aB, bB, cB, dB, K0, alpha, beta ) conc = scipy.linalg.solve(A_mat, conc_d) # Chemical step: B -> Y (first-order decay in bulk, not at boundaries) conc[n:2*n-1] *= decay_factor # Keep far boundary at C_B_bulk conc[2*n-1] = C_B_bulk flux = calc_flux(conc, n, dA, X_grid) fluxes[idx] = flux conc_A_history.append(conc[:n].copy()) conc_B_history.append(conc[n:].copy()) conc_A_history = np.stack(conc_A_history, axis=0) conc_B_history = np.stack(conc_B_history, axis=0) x_grid_out = np.linspace(X_grid[0], X_grid[-1], n_spatial_out) c_ox_out = np.zeros((total_steps, n_spatial_out)) c_red_out = np.zeros((total_steps, n_spatial_out)) for t in range(total_steps): c_ox_out[t] = np.interp(x_grid_out, X_grid, conc_A_history[t]) c_red_out[t] = np.interp(x_grid_out, X_grid, conc_B_history[t]) time_array = np.arange(total_steps) * float(deltaT) params_out = { 'experiment_type': 'CV', 'sigma': float(sigma), 'K0': float(K0), 'alpha': float(alpha), 'beta': float(beta), 'kinetics': 'EC', 'C_A_bulk': float(C_A_bulk), 'C_B_bulk': float(C_B_bulk), 'dA': float(dA), 'dB': float(dB), 'theta_i': float(theta_i), 'theta_v': float(theta_v), 'cycles': cycles, 'kc': float(kc), } return { 'potential': E, 'flux': fluxes, 'time': time_array, 'c_ox': c_ox_out, 'c_red': c_red_out, 'x_grid': X_grid, 'x_grid_out': x_grid_out, 'params': params_out, } # ============================================================================= # Langmuir-Hinshelwood Adsorption + ET Simulation # ============================================================================= def run_lh_cv_simulation( sigma=1.0, K0=1.0, alpha=0.5, beta=None, kinetics='LH', C_A_bulk=1.0, C_B_bulk=0.0, dA=1.0, dB=1.0, theta_i=20.0, theta_v=-20.0, cycles=1, n_spatial_out=64, verbose=False, KA_eq=10.0, KB_eq=1.0, **extra_params, ): """ Langmuir-Hinshelwood CV: diffusion + fast adsorption equilibrium + surface ET. Reaction scheme: A_sol <-> A_ads (fast Langmuir equilibrium, constant K_A) B_sol <-> B_ads (fast Langmuir equilibrium, constant K_B) A_ads + e- <-> B_ads (BV kinetics with K0, alpha) Under the pre-equilibrium approximation, surface coverages are: theta_A = K_A * c_A(0,t) / (1 + K_A*c_A(0,t) + K_B*c_B(0,t)) theta_B = K_B * c_B(0,t) / (1 + K_A*c_A(0,t) + K_B*c_B(0,t)) The effective boundary condition becomes coverage-dependent BV kinetics. The flux at the surface is: j = K0 * [exp(-alpha*Theta) * theta_A - exp(beta*Theta) * theta_B] This is implemented by modifying the boundary condition at each timestep based on the current surface concentrations. Parameters ---------- KA_eq : float Dimensionless adsorption equilibrium constant for oxidized species A. KB_eq : float Dimensionless adsorption equilibrium constant for reduced species B. """ if beta is None: beta = 1.0 - alpha deltaT = DELTA_THETA / sigma maxT = cycles * 2.0 * np.abs(theta_v - theta_i) / sigma nTimeSteps = int(2 * np.abs(theta_v - theta_i) / DELTA_THETA) + 1 Esteps = np.arange(nTimeSteps) E = np.where( Esteps < nTimeSteps / 2.0, theta_i - DELTA_THETA * Esteps, theta_v + DELTA_THETA * (Esteps - nTimeSteps / 2.0) ) E = np.tile(E, cycles) total_steps = len(E) Xi = 0.0 maxX = SIMULATION_SPACE_MULTIPLE * np.sqrt(maxT) X_grid, n = gen_grid(Xi, DELTA_X, maxX, EXPANDING_GRID_FACTOR) conc, conc_d = ini_conc(n, C_A_bulk, C_B_bulk) A_matrix, aA, bA, cA, aB, bB, cB = ini_coeff(n) aA, bA, cA = calc_abc_linear(n, X_grid, deltaT, aA, bA, cA, dA) aB, bB, cB = calc_abc_linear(n, X_grid, deltaT, aB, bB, cB, dB) fluxes = np.zeros(total_steps) conc_A_history = [] conc_B_history = [] X0 = X_grid[1] - X_grid[0] iterator = range(total_steps) if verbose: iterator = tqdm(iterator, desc="Simulating LH CV") for idx in iterator: Theta = E[idx] conc_d[:] = conc[:] conc_d[n-1] = 0.0 conc_d[n] = 0.0 conc_d[0] = C_A_bulk conc_d[2*n-1] = C_B_bulk # Compute Langmuir coverage fractions from current surface concentrations cA_surf = max(conc[n-1], 0.0) cB_surf = max(conc[n], 0.0) denom = 1.0 + KA_eq * cA_surf + KB_eq * cB_surf theta_A = KA_eq * cA_surf / denom theta_B = KB_eq * cB_surf / denom K_red = K0 * np.exp(-alpha * Theta) K_ox = K0 * np.exp(beta * Theta) # Effective rate constants scaled by coverage sensitivity # The flux is: j = K_red * theta_A - K_ox * theta_B # We linearize around current concentrations for the matrix: # theta_A ≈ KA/(1+KA*cA+KB*cB) * cA (leading term) # For the boundary condition, we use effective rates: K_red_eff = K_red * KA_eq / denom K_ox_eff = K_ox * KB_eq / denom # Build matrix with LH-modified boundary conditions # Interior points rows_A = np.arange(n-2, 0, -1) A_matrix[rows_A, rows_A - 1] = cA[1:n-1] A_matrix[rows_A, rows_A] = bA[1:n-1] A_matrix[rows_A, rows_A + 1] = aA[1:n-1] rows_B = np.arange(n+1, 2*n-1) A_matrix[rows_B, rows_B - 1] = aB[1:n-1] A_matrix[rows_B, rows_B] = bB[1:n-1] A_matrix[rows_B, rows_B + 1] = cB[1:n-1] # Boundary: species A at electrode (row n-1) A_matrix[n-1, n-2] = -1.0 A_matrix[n-1, n-1] = 1.0 + X0/dA * K_red_eff A_matrix[n-1, n] = -X0/dA * K_ox_eff # Boundary: species B at electrode (row n) A_matrix[n, n-1] = -X0/dB * K_red_eff A_matrix[n, n] = 1.0 + X0/dB * K_ox_eff A_matrix[n, n+1] = -1.0 # Far boundaries A_matrix[0, 0] = 1.0 A_matrix[0, 1] = 0.0 A_matrix[2*n-1, 2*n-1] = 1.0 A_matrix[2*n-1, 2*n-2] = 0.0 conc = scipy.linalg.solve(A_matrix, conc_d) flux = calc_flux(conc, n, dA, X_grid) fluxes[idx] = flux conc_A_history.append(conc[:n].copy()) conc_B_history.append(conc[n:].copy()) conc_A_history = np.stack(conc_A_history, axis=0) conc_B_history = np.stack(conc_B_history, axis=0) x_grid_out = np.linspace(X_grid[0], X_grid[-1], n_spatial_out) c_ox_out = np.zeros((total_steps, n_spatial_out)) c_red_out = np.zeros((total_steps, n_spatial_out)) for t in range(total_steps): c_ox_out[t] = np.interp(x_grid_out, X_grid, conc_A_history[t]) c_red_out[t] = np.interp(x_grid_out, X_grid, conc_B_history[t]) time_array = np.arange(total_steps) * float(deltaT) params_out = { 'experiment_type': 'CV', 'sigma': float(sigma), 'K0': float(K0), 'alpha': float(alpha), 'beta': float(beta), 'kinetics': 'LH', 'C_A_bulk': float(C_A_bulk), 'C_B_bulk': float(C_B_bulk), 'dA': float(dA), 'dB': float(dB), 'theta_i': float(theta_i), 'theta_v': float(theta_v), 'cycles': cycles, 'KA_eq': float(KA_eq), 'KB_eq': float(KB_eq), } return { 'potential': E, 'flux': fluxes, 'time': time_array, 'c_ox': c_ox_out, 'c_red': c_red_out, 'x_grid': X_grid, 'x_grid_out': x_grid_out, 'params': params_out, } # ============================================================================= # Chronoamperometry Simulation # ============================================================================= def run_ca_simulation( theta_step=-10.0, K0=1.0, alpha=0.5, beta=None, kinetics='BV', C_A_bulk=1.0, C_B_bulk=0.0, dA=1.0, dB=1.0, t_max=10.0, dt=0.01, n_spatial_out=64, verbose=False, ): """ Run chronoamperometry simulation: step potential held constant. Parameters ---------- theta_step : float Dimensionless step potential (held constant for all t > 0) K0 : float Dimensionless electrochemical rate constant alpha : float Cathodic transfer coefficient beta : float, optional Anodic transfer coefficient (default: 1 - alpha) kinetics : str 'BV' for Butler-Volmer C_A_bulk, C_B_bulk : float Dimensionless bulk concentrations dA, dB : float Dimensionless diffusion coefficients t_max : float Dimensionless total experiment time dt : float Dimensionless time step n_spatial_out : int Number of spatial grid points in output verbose : bool Show progress bar Returns ------- dict with same keys as run_cv_simulation, plus experiment_type='CA' """ if beta is None: beta = 1.0 - alpha total_steps = int(t_max / dt) + 1 deltaT = dt E = np.full(total_steps, theta_step) Xi = 0.0 maxX = SIMULATION_SPACE_MULTIPLE * np.sqrt(t_max * max(dA, dB)) X_grid, n = gen_grid(Xi, DELTA_X, maxX, EXPANDING_GRID_FACTOR) conc, conc_d = ini_conc(n, C_A_bulk, C_B_bulk) A_matrix, aA, bA, cA, aB, bB, cB = ini_coeff(n) aA, bA, cA = calc_abc_linear(n, X_grid, deltaT, aA, bA, cA, dA) aB, bB, cB = calc_abc_linear(n, X_grid, deltaT, aB, bB, cB, dB) fluxes = np.zeros(total_steps) conc_A_history = [] conc_B_history = [] iterator = range(total_steps) if verbose: iterator = tqdm(iterator, desc="Simulating CA") for idx in iterator: Theta = E[idx] A_mat = calc_matrix( A_matrix, X_grid, Theta, kinetics, n, aA, bA, cA, dA, aB, bB, cB, dB, K0, alpha, beta ) conc_d = update_d(Theta, conc, conc_d, n, C_A_bulk, C_B_bulk, kinetics) conc = scipy.linalg.solve(A_mat, conc_d) flux = calc_flux(conc, n, dA, X_grid) fluxes[idx] = flux conc_A_history.append(conc[:n].copy()) conc_B_history.append(conc[n:].copy()) conc_A_history = np.stack(conc_A_history, axis=0) conc_B_history = np.stack(conc_B_history, axis=0) x_grid_out = np.linspace(X_grid[0], X_grid[-1], n_spatial_out) c_ox_out = np.zeros((total_steps, n_spatial_out)) c_red_out = np.zeros((total_steps, n_spatial_out)) for t in range(total_steps): c_ox_out[t] = np.interp(x_grid_out, X_grid, conc_A_history[t]) c_red_out[t] = np.interp(x_grid_out, X_grid, conc_B_history[t]) time_array = np.arange(total_steps) * float(deltaT) return { 'potential': E, 'flux': fluxes, 'time': time_array, 'c_ox': c_ox_out, 'c_red': c_red_out, 'x_grid': X_grid, 'x_grid_out': x_grid_out, 'params': { 'experiment_type': 'CA', 'theta_step': float(theta_step), 'K0': float(K0), 'alpha': float(alpha), 'beta': float(beta), 'kinetics': kinetics, 'C_A_bulk': float(C_A_bulk), 'C_B_bulk': float(C_B_bulk), 'dA': float(dA), 'dB': float(dB), 't_max': float(t_max), 'dt': float(dt), } } # ============================================================================= # Parameter Sampling # ============================================================================= def sample_ca_params(rng=None): """ Sample random dimensionless parameters for chronoamperometry. CA holds a constant potential and observes the transient current decay. The Cottrell equation predicts j(t) ~ 1/sqrt(t) for diffusion-limited case. """ if rng is None: rng = np.random.default_rng() # Step potential: large negative drives reduction, large positive drives oxidation theta_step = rng.uniform(-20.0, -2.0) # Rate constant — same range as CV log_K0 = rng.uniform(-2, 2) K0 = 10 ** log_K0 # Transfer coefficient alpha = rng.uniform(0.3, 0.7) # Diffusion coefficient ratio d_ratio = 10 ** rng.uniform(-0.3, 0.3) dA = 1.0 dB = dA * d_ratio # Experiment duration and time step log_tmax = rng.uniform(0.5, 2.0) # t_max in [~3, 100] t_max = 10 ** log_tmax dt = t_max / rng.uniform(500, 1500) # 500-1500 time steps return { 'theta_step': theta_step, 'K0': K0, 'alpha': alpha, 'kinetics': 'BV', 'C_A_bulk': 1.0, 'C_B_bulk': 0.0, 'dA': dA, 'dB': dB, 't_max': t_max, 'dt': dt, } def _sample_common_cv_params(rng): """Sample scan rate, diffusion ratio, and potential window (shared across mechanisms).""" log_sigma = rng.uniform(-1, 2) sigma = 10 ** log_sigma d_ratio = 10 ** rng.uniform(-0.3, 0.3) dA = 1.0 dB = dA * d_ratio theta_center = rng.uniform(-5, 5) theta_range = rng.uniform(15, 25) theta_i = theta_center + theta_range / 2 theta_v = theta_center - theta_range / 2 return { 'sigma': sigma, 'C_A_bulk': 1.0, 'C_B_bulk': 0.0, 'dA': dA, 'dB': dB, 'theta_i': theta_i, 'theta_v': theta_v, 'cycles': 1, } def sample_cv_params_nernst(rng=None): """Sample parameters for Nernstian (reversible) CV.""" if rng is None: rng = np.random.default_rng() p = _sample_common_cv_params(rng) p['kinetics'] = 'Nernst' p['K0'] = 1e6 # effectively infinite (not a free parameter) p['alpha'] = 0.5 # not used p['E0_offset'] = 0.0 # dimensionless formal potential offset return p def sample_cv_params_bv(rng=None): """Sample parameters for Butler-Volmer CV.""" if rng is None: rng = np.random.default_rng() p = _sample_common_cv_params(rng) p['kinetics'] = 'BV' p['K0'] = 10 ** rng.uniform(-2, 2) p['alpha'] = rng.uniform(0.3, 0.7) return p def sample_cv_params_mhc(rng=None): """Sample parameters for Marcus-Hush-Chidsey CV.""" if rng is None: rng = np.random.default_rng() p = _sample_common_cv_params(rng) p['kinetics'] = 'MHC' p['K0'] = 10 ** rng.uniform(-2, 2) p['alpha'] = 0.5 # not used by MHC, but kept for interface consistency p['reorg_e'] = 10 ** rng.uniform(0.5, 2.0) # dimensionless reorganization energy ~3-100 return p def sample_cv_params_ads(rng=None): """Sample parameters for surface adsorption (Laviron-type) CV. Surface-confined model: no diffusion, purely ODE for surface coverage. Parameters: K0 (rate constant), alpha (transfer coefficient), Gamma_sat (coverage). """ if rng is None: rng = np.random.default_rng() log_sigma = rng.uniform(-1, 2) sigma = 10 ** log_sigma theta_center = rng.uniform(-5, 5) theta_range = rng.uniform(15, 25) theta_i = theta_center + theta_range / 2 theta_v = theta_center - theta_range / 2 return { 'sigma': sigma, 'theta_i': theta_i, 'theta_v': theta_v, 'cycles': 1, 'kinetics': 'Ads', 'K0': 10 ** rng.uniform(-2, 2), 'alpha': rng.uniform(0.3, 0.7), 'Gamma_sat': 10 ** rng.uniform(-1, 1), } def sample_cv_params_ec(rng=None): """Sample parameters for EC mechanism CV. EC: A + e- -> B (BV kinetics), then B -> Y (first-order chemical). Parameters: K0, alpha, dB (same as BV) + kc (chemical rate constant). """ if rng is None: rng = np.random.default_rng() p = _sample_common_cv_params(rng) p['kinetics'] = 'EC' p['K0'] = 10 ** rng.uniform(-2, 2) p['alpha'] = rng.uniform(0.3, 0.7) # kc: dimensionless chemical rate constant # Small kc (~0.01): slow chemistry, CV looks like BV # Large kc (~100): fast chemistry, irreversible CV with shifted peak p['kc'] = 10 ** rng.uniform(-2, 2) return p def sample_cv_params_lh(rng=None): """Sample parameters for Langmuir-Hinshelwood CV. LH: diffusion + fast Langmuir adsorption equilibrium + surface BV ET. Parameters: K0, alpha, KA_eq, KB_eq, dB. """ if rng is None: rng = np.random.default_rng() p = _sample_common_cv_params(rng) p['kinetics'] = 'LH' p['K0'] = 10 ** rng.uniform(-2, 2) p['alpha'] = rng.uniform(0.3, 0.7) # Adsorption equilibrium constants (dimensionless) # KA_eq, KB_eq ~ 0.1-100: moderate to strong adsorption p['KA_eq'] = 10 ** rng.uniform(-1, 2) p['KB_eq'] = 10 ** rng.uniform(-1, 2) return p MECHANISM_SAMPLERS = { 'Nernst': sample_cv_params_nernst, 'BV': sample_cv_params_bv, 'MHC': sample_cv_params_mhc, 'Ads': sample_cv_params_ads, 'EC': sample_cv_params_ec, 'LH': sample_cv_params_lh, } def sample_cv_params(rng=None, mechanism=None): """ Sample random dimensionless parameters for CV simulation. If mechanism is specified, samples from that mechanism. Otherwise defaults to BV for backward compatibility. """ if rng is None: rng = np.random.default_rng() if mechanism is None: mechanism = 'BV' return MECHANISM_SAMPLERS[mechanism](rng) # ============================================================================= # Dataset Generation # ============================================================================= MECHANISM_LIST = ['Nernst', 'BV', 'MHC', 'Ads', 'EC', 'LH'] MECHANISM_TO_ID = {m: i for i, m in enumerate(MECHANISM_LIST)} def _run_single_cv(params, n_spatial): """Run a single CV simulation, dispatching to the correct simulator.""" kin = params.get('kinetics') if kin == 'Ads': return run_ads_cv_simulation(**params, n_spatial_out=n_spatial, verbose=False) elif kin == 'EC': return run_ec_cv_simulation(**params, n_spatial_out=n_spatial, verbose=False) elif kin == 'LH': return run_lh_cv_simulation(**params, n_spatial_out=n_spatial, verbose=False) else: return run_cv_simulation(**params, n_spatial_out=n_spatial, verbose=False) def _sample_scan_rates(rng, n_scan_rates, log_sigma_range=(-1, 2)): """Sample log-spaced scan rates spanning the given range.""" lo, hi = log_sigma_range if n_scan_rates == 1: return np.array([10 ** rng.uniform(lo, hi)]) anchors = np.linspace(lo, hi, n_scan_rates) jitter = (hi - lo) / (n_scan_rates - 1) * 0.3 log_sigmas = np.array([rng.uniform(a - jitter, a + jitter) for a in anchors]) log_sigmas = np.clip(log_sigmas, lo, hi) log_sigmas.sort() return 10 ** log_sigmas def _add_noise(flux, rng, noise_range=(0.001, 0.02)): """Add Gaussian noise to flux signal. Returns (noisy_flux, sigma_noise).""" sigma_noise = rng.uniform(*noise_range) peak = np.max(np.abs(flux)) + 1e-20 noise = sigma_noise * peak * rng.standard_normal(flux.shape) return flux + noise.astype(flux.dtype), float(sigma_noise) def generate_sample_single(idx, outdir, seed, n_spatial=64, experiment_type='CV', mechanism=None, n_scan_rates=1, add_noise=True): """ Generate and save a single sample (single or multi-scan-rate). When n_scan_rates > 1, the same kinetic parameters are simulated at multiple scan rates and saved together. Parameters ---------- n_scan_rates : int Number of scan rates per sample. Use -1 to randomly sample from {1, 2, 3} per sample (drawn from the per-sample RNG for reproducibility). add_noise : bool If True, add Gaussian noise to flux signals (sigma_noise ~ U(0.001, 0.02) as a fraction of peak current). """ rng = np.random.default_rng(seed + idx) if n_scan_rates == -1: n_scan_rates = int(rng.integers(1, 4)) # 1, 2, or 3 if experiment_type == 'mixed': exp = rng.choice(['CV', 'CA']) else: exp = experiment_type try: if exp == 'CV': params = sample_cv_params(rng, mechanism=mechanism) actual_mechanism = params.get('kinetics', 'BV') mechanism_id = MECHANISM_TO_ID.get(actual_mechanism, -1) if n_scan_rates <= 1: # Single-scan-rate path result = _run_single_cv(params, n_spatial) flux = result['flux'].astype(np.float32) sigma_noise = 0.0 if add_noise: flux, sigma_noise = _add_noise(flux, rng) # Convert K0/kc to canonical sigma=1 (same convention as multi-scan) sigma_ref = params['sigma'] kin = actual_mechanism phys_params = dict(result['params']) K0_ref = params.get('K0', 1.0) if kin in ('BV', 'MHC', 'EC', 'LH'): phys_params['K0'] = float(K0_ref * np.sqrt(sigma_ref)) elif kin == 'Ads': phys_params['K0'] = float(K0_ref * sigma_ref) if kin == 'EC': phys_params['kc'] = float(params.get('kc', 1.0) * sigma_ref) phys_params['sigma_noise'] = sigma_noise np.savez_compressed( os.path.join(outdir, f"sample_{idx:06d}.npz"), potential=result['potential'].astype(np.float32), flux=flux, time=result['time'].astype(np.float32), c_ox=result['c_ox'].astype(np.float32), c_red=result['c_red'].astype(np.float32), x_grid=result['x_grid_out'].astype(np.float32), params=phys_params, mechanism_id=np.int32(mechanism_id), ) meta = { 'idx': idx, 'success': True, 'experiment_type': exp, 'mechanism': actual_mechanism, 'mechanism_id': int(mechanism_id), 'K0': float(phys_params['K0']), 'alpha': float(params.get('alpha', 0.5)), 'n_time': len(result['time']), 'sigma': float(params['sigma']), 'n_scan_rates': 1, 'sigma_noise': sigma_noise, } return meta # Multi-scan-rate: simulate the same physical system at N scan rates. # # The sampled params define dimensionless rate constants at the # originally sampled sigma_ref. We convert to a canonical reference # sigma=1 so that saved K0/kc values are sigma-independent and # comparable across samples. # # Scaling relations (dimensionless = physical / sigma^power): # BV/MHC/EC/LH: K0 ~ 1/sqrt(sigma) # Ads: K0 ~ 1/sigma # EC: kc ~ 1/sigma sigmas = _sample_scan_rates(rng, n_scan_rates) sigma_ref = params['sigma'] kin = actual_mechanism # Compute K0 and kc at canonical sigma=1 K0_at_ref = params.get('K0', 1.0) kc_at_ref = params.get('kc', 1.0) if kin in ('BV', 'MHC', 'EC', 'LH'): K0_at_1 = K0_at_ref * np.sqrt(sigma_ref) elif kin == 'Ads': K0_at_1 = K0_at_ref * sigma_ref else: K0_at_1 = K0_at_ref kc_at_1 = kc_at_ref * sigma_ref if kin == 'EC' else kc_at_ref potentials, fluxes, times = [], [], [] for sigma in sigmas: p = dict(params) p['sigma'] = float(sigma) if kin in ('BV', 'MHC', 'EC', 'LH'): p['K0'] = K0_at_1 / np.sqrt(sigma) elif kin == 'Ads': p['K0'] = K0_at_1 / sigma if kin == 'EC': p['kc'] = kc_at_1 / sigma result = _run_single_cv(p, n_spatial) potentials.append(result['potential'].astype(np.float32)) flux_clean = result['flux'].astype(np.float32) if add_noise: flux_clean, _ = _add_noise(flux_clean, rng) fluxes.append(flux_clean) times.append(result['time'].astype(np.float32)) # Save params with K0/kc at canonical sigma=1 so targets are # sigma-independent and comparable across samples. phys_params = dict(params) phys_params['kinetics'] = actual_mechanism phys_params['K0'] = float(K0_at_1) if kin == 'EC': phys_params['kc'] = float(kc_at_1) # Pad CVs to the same length (different sigma -> different T) max_t = max(len(p) for p in potentials) n_s = len(sigmas) pot_arr = np.zeros((n_s, max_t), dtype=np.float32) flux_arr = np.zeros((n_s, max_t), dtype=np.float32) time_arr = np.zeros((n_s, max_t), dtype=np.float32) lengths = np.zeros(n_s, dtype=np.int32) for i in range(n_s): t_len = len(potentials[i]) pot_arr[i, :t_len] = potentials[i] flux_arr[i, :t_len] = fluxes[i] time_arr[i, :t_len] = times[i] lengths[i] = t_len np.savez_compressed( os.path.join(outdir, f"sample_{idx:06d}.npz"), potential=pot_arr, flux=flux_arr, time=time_arr, sigmas=sigmas.astype(np.float32), lengths=lengths, params=phys_params, mechanism_id=np.int32(mechanism_id), n_scan_rates=np.int32(n_scan_rates), ) meta = { 'idx': idx, 'success': True, 'experiment_type': exp, 'mechanism': actual_mechanism, 'mechanism_id': int(mechanism_id), 'K0': float(params.get('K0', 0)), 'alpha': float(params.get('alpha', 0.5)), 'n_time_max': int(max_t), 'sigmas': [float(s) for s in sigmas], 'n_scan_rates': n_scan_rates, } return meta elif exp == 'CA': params = sample_ca_params(rng) result = run_ca_simulation(**params, n_spatial_out=n_spatial, verbose=False) actual_mechanism = params.get('kinetics', 'BV') mechanism_id = MECHANISM_TO_ID.get(actual_mechanism, -1) np.savez_compressed( os.path.join(outdir, f"sample_{idx:06d}.npz"), potential=result['potential'].astype(np.float32), flux=result['flux'].astype(np.float32), time=result['time'].astype(np.float32), c_ox=result['c_ox'].astype(np.float32), c_red=result['c_red'].astype(np.float32), x_grid=result['x_grid_out'].astype(np.float32), params=result['params'], mechanism_id=np.int32(mechanism_id), ) meta = { 'idx': idx, 'success': True, 'experiment_type': exp, 'mechanism': actual_mechanism, 'mechanism_id': int(mechanism_id), 'n_time': len(result['time']), 'theta_step': float(params['theta_step']), 'n_scan_rates': 1, } return meta else: raise ValueError(f"Unknown experiment type: {exp}") except Exception as e: return { 'idx': idx, 'success': False, 'error': str(e), } def _worker_generate(args): """Worker function for multiprocessing (must be at module level).""" import signal def _timeout_handler(signum, frame): raise TimeoutError("Sample generation timed out") if len(args) == 8: idx, outdir, seed, n_spatial, experiment_type, mechanism, n_scan_rates, add_noise = args elif len(args) == 7: idx, outdir, seed, n_spatial, experiment_type, mechanism, n_scan_rates = args add_noise = True elif len(args) == 6: idx, outdir, seed, n_spatial, experiment_type, mechanism = args n_scan_rates = 1 add_noise = True else: idx, outdir, seed, n_spatial, experiment_type = args mechanism = None n_scan_rates = 1 add_noise = True # Skip if output file already exists (resume support) outpath = os.path.join(outdir, f"sample_{idx:06d}.npz") if os.path.exists(outpath): return {'idx': idx, 'success': True, 'error': None, 'skipped': True} old_handler = signal.signal(signal.SIGALRM, _timeout_handler) signal.alarm(180) # 3 minute timeout per sample try: result = generate_sample_single(idx, outdir, seed, n_spatial, experiment_type, mechanism, n_scan_rates, add_noise) except (TimeoutError, Exception) as e: result = {'idx': idx, 'success': False, 'error': f'Timeout or error: {e}'} finally: signal.alarm(0) signal.signal(signal.SIGALRM, old_handler) return result def generate_dataset( n_samples=1000, outdir="data/diffec_cv", n_spatial=64, seed=42, n_workers=None, experiment_type='CV', mechanism=None, multi_mechanism=False, n_per_mechanism=None, n_scan_rates=1, add_noise=True, ): """ Generate a dataset of electrochemical simulations. Parameters ---------- n_samples : int Number of samples to generate (used when mechanism is single or None) outdir : str Output directory n_spatial : int Number of spatial grid points in output seed : int Random seed n_workers : int, optional Number of parallel workers. Default: number of CPU cores. experiment_type : str 'CV' for cyclic voltammetry, 'CA' for chronoamperometry, 'mixed' for random mix of both. mechanism : str or None Specific mechanism to use. None defaults to BV. multi_mechanism : bool If True, generate balanced data across all 4 mechanisms. n_per_mechanism : int or None Samples per mechanism when multi_mechanism=True. Defaults to n_samples. n_scan_rates : int Number of scan rates per sample. 1 = legacy single-CV format. >1 = multi-scan-rate format with shared kinetic parameters. -1 = randomly sample from {1, 2, 3} per sample. """ os.makedirs(outdir, exist_ok=True) if n_workers is None: n_workers = max(1, cpu_count() - 1) if multi_mechanism: if n_per_mechanism is None: n_per_mechanism = n_samples total = n_per_mechanism * len(MECHANISM_LIST) print(f"Generating multi-mechanism dataset: {n_per_mechanism} per mechanism x {len(MECHANISM_LIST)} = {total}") args_list = [] for mech_idx, mech in enumerate(MECHANISM_LIST): offset = mech_idx * n_per_mechanism for i in range(n_per_mechanism): args_list.append((offset + i, outdir, seed, n_spatial, experiment_type, mech, n_scan_rates, add_noise)) n_samples = total else: args_list = [(i, outdir, seed, n_spatial, experiment_type, mechanism, n_scan_rates, add_noise) for i in range(n_samples)] n_workers = min(n_workers, n_samples) print(f"Generating {n_samples} samples...") print(f"Output directory: {outdir}") print(f"Spatial resolution: {n_spatial} points") print(f"Using {n_workers} worker(s)") metadata = [] if n_workers == 1: # Sequential processing for args in tqdm(args_list, desc="Generating samples"): meta = _worker_generate(args) metadata.append(meta) if not meta['success']: print(f"\nSample {meta['idx']} failed: {meta.get('error', 'Unknown error')}") else: from concurrent.futures import ProcessPoolExecutor, as_completed TASK_TIMEOUT = 180 # 3 minutes per sample try: with ProcessPoolExecutor(max_workers=n_workers) as executor: futures = {executor.submit(_worker_generate, a): a[0] for a in args_list} pbar = tqdm(total=n_samples, desc="Generating samples") for future in as_completed(futures): idx = futures[future] try: meta = future.result(timeout=TASK_TIMEOUT) except Exception as e: meta = {'idx': idx, 'success': False, 'error': f'Worker error: {e}'} metadata.append(meta) if not meta['success'] and not meta.get('skipped'): tqdm.write(f"Sample {meta['idx']} failed: {meta.get('error', 'Unknown error')}") pbar.update(1) pbar.close() except (PermissionError, OSError) as e: print(f"\nWarning: Multiprocessing failed ({e}). Falling back to sequential processing...") metadata = [] for args in tqdm(args_list, desc="Generating samples (sequential)"): meta = _worker_generate(args) metadata.append(meta) if not meta['success']: print(f"\nSample {meta['idx']} failed: {meta.get('error', 'Unknown error')}") # Sort metadata by index (parallel processing may return out of order) metadata = sorted(metadata, key=lambda x: x['idx']) # Save metadata n_success = sum(1 for m in metadata if m['success']) mech_counts = {} for m in metadata: if m['success']: mech = m.get('mechanism', 'BV') mech_counts[mech] = mech_counts.get(mech, 0) + 1 summary = { 'n_samples': n_samples, 'n_success': n_success, 'n_spatial': n_spatial, 'n_scan_rates': 'random(1-3)' if n_scan_rates == -1 else n_scan_rates, 'add_noise': add_noise, 'seed': seed, 'n_workers': n_workers, 'experiment_type': experiment_type, 'multi_mechanism': multi_mechanism, 'mechanism_counts': mech_counts, 'samples': metadata, } with open(os.path.join(outdir, "metadata.json"), "w") as f: json.dump(summary, f, indent=2) print(f"\nGeneration complete: {n_success}/{n_samples} successful") print(f"Metadata saved to {os.path.join(outdir, 'metadata.json')}") return metadata # ============================================================================= # Data Loading Utilities # ============================================================================= def load_sample(filepath): """Load a single sample from npz file.""" data = np.load(filepath, allow_pickle=True) return { 'potential': data['potential'], 'flux': data['flux'], 'time': data['time'], 'c_ox': data['c_ox'], 'c_red': data['c_red'], 'x_grid': data['x_grid'], 'params': data['params'].item(), } def load_dataset(datadir, max_samples=None): """Load all samples from a directory.""" import glob files = sorted(glob.glob(os.path.join(datadir, "sample_*.npz"))) if max_samples is not None: files = files[:max_samples] samples = [] for f in tqdm(files, desc="Loading samples"): samples.append(load_sample(f)) return samples # ============================================================================= # Main # ============================================================================= if __name__ == "__main__": parser = argparse.ArgumentParser( description="Generate DiffEC CV dataset for flow matching" ) parser.add_argument( "--n_samples", type=int, default=1000, help="Number of samples to generate" ) parser.add_argument( "--outdir", type=str, default="data/diffec_cv", help="Output directory" ) parser.add_argument( "--n_spatial", type=int, default=64, help="Number of spatial grid points in output" ) parser.add_argument( "--seed", type=int, default=42, help="Random seed" ) parser.add_argument( "--n_workers", type=int, default=None, help="Number of parallel workers (default: num_cpus - 1). Set to 1 for sequential." ) parser.add_argument( "--experiment_type", type=str, default="CV", choices=["CV", "CA", "mixed"], help="Experiment type: CV (cyclic voltammetry), CA (chronoamperometry), or mixed" ) parser.add_argument( "--mechanism", type=str, default=None, choices=["Nernst", "BV", "MHC", "Ads", "EC", "LH"], help="Specific mechanism to generate data for" ) parser.add_argument( "--multi_mechanism", action="store_true", help="Generate balanced data across all 4 mechanisms" ) parser.add_argument( "--n_per_mechanism", type=int, default=None, help="Samples per mechanism when using --multi_mechanism" ) parser.add_argument( "--n_scan_rates", type=str, default="1", help="Number of scan rates per sample. 1 = legacy single-CV. " ">1 = multi-scan-rate with shared kinetic parameters. " "'random' = randomly 1-3 per sample." ) parser.add_argument( "--no_noise", action="store_true", help="Disable noise augmentation (generate clean signals)" ) parser.add_argument( "--test", action="store_true", help="Run a single test simulation and plot results" ) args = parser.parse_args() if args.test: # Run a test simulation print("Running test simulation...") params = sample_cv_params(np.random.default_rng(42)) print(f"Parameters: {params}") result = run_cv_simulation(**params, n_spatial_out=64, verbose=True) print(f"\nResults:") print(f" Time steps: {len(result['time'])}") print(f" Spatial points: {result['c_ox'].shape[1]}") print(f" Potential range: [{result['potential'].min():.2f}, {result['potential'].max():.2f}]") print(f" Flux range: [{result['flux'].min():.4f}, {result['flux'].max():.4f}]") # Try to plot if matplotlib available try: import matplotlib.pyplot as plt fig, axes = plt.subplots(2, 2, figsize=(12, 10)) # CV plot ax = axes[0, 0] ax.plot(result['potential'], result['flux']) ax.set_xlabel('Potential (dimensionless)') ax.set_ylabel('Flux (dimensionless)') ax.set_title('Cyclic Voltammogram') # Concentration at electrode surface vs time ax = axes[0, 1] ax.plot(result['time'], result['c_ox'][:, -1], label='Ox (surface)') ax.plot(result['time'], result['c_red'][:, 0], label='Red (surface)') ax.set_xlabel('Time (dimensionless)') ax.set_ylabel('Concentration') ax.set_title('Surface Concentration vs Time') ax.legend() # Concentration heatmap - Ox ax = axes[1, 0] im = ax.imshow( result['c_ox'].T, aspect='auto', origin='lower', extent=[0, result['time'][-1], 0, result['x_grid_out'][-1]] ) plt.colorbar(im, ax=ax, label='Concentration') ax.set_xlabel('Time') ax.set_ylabel('Distance from electrode') ax.set_title('Oxidized Species Concentration') # Concentration heatmap - Red ax = axes[1, 1] im = ax.imshow( result['c_red'].T, aspect='auto', origin='lower', extent=[0, result['time'][-1], 0, result['x_grid_out'][-1]] ) plt.colorbar(im, ax=ax, label='Concentration') ax.set_xlabel('Time') ax.set_ylabel('Distance from electrode') ax.set_title('Reduced Species Concentration') plt.tight_layout() plt.savefig('test_simulation.png', dpi=150) print("\nPlot saved to test_simulation.png") except ImportError: print("\nmatplotlib not available, skipping plot") else: n_scan_rates = -1 if args.n_scan_rates.lower() == 'random' else int(args.n_scan_rates) generate_dataset( n_samples=args.n_samples, outdir=args.outdir, n_spatial=args.n_spatial, seed=args.seed, n_workers=args.n_workers, experiment_type=args.experiment_type, mechanism=args.mechanism, multi_mechanism=args.multi_mechanism, n_per_mechanism=args.n_per_mechanism, n_scan_rates=n_scan_rates, add_noise=not args.no_noise, )