| """ |
| TPD/TPR Simulator and Dataset Generation for Heterogeneous Catalysis. |
| |
| Implements 6 temperature-programmed desorption/reaction mechanisms: |
| 1. FirstOrder - first-order desorption (Polanyi-Wigner, n=1) |
| 2. SecondOrder - second-order/recombinative desorption (n=2) |
| 3. LH_Surface - Langmuir-Hinshelwood bimolecular surface reaction |
| 4. MvK - Mars-van Krevelen lattice oxygen mechanism |
| 5. FirstOrderCovDep - first-order with coverage-dependent activation energy |
| 6. DiffLimited - diffusion-limited desorption from porous materials |
| |
| Each mechanism is solved as an ODE with a linear temperature ramp T = T0 + beta*t. |
| Supports multi-heating-rate generation (analogous to multi-scan-rate CVs in EC). |
| |
| Data format mirrors the electrochemistry pipeline for compatibility. |
| """ |
|
|
| import os |
| import sys |
| import json |
| import argparse |
| import numpy as np |
| from tqdm import tqdm |
| from multiprocessing import Pool, cpu_count |
|
|
| import scipy.integrate |
|
|
|
|
| |
| |
| |
|
|
| TPD_MECHANISM_LIST = [ |
| 'FirstOrder', 'SecondOrder', 'LH_Surface', 'MvK', |
| 'FirstOrderCovDep', 'DiffLimited', |
| ] |
| TPD_MECHANISM_TO_ID = {m: i for i, m in enumerate(TPD_MECHANISM_LIST)} |
|
|
| TPD_MECHANISM_PARAMS = { |
| 'FirstOrder': { |
| 'names': ['Ed', 'log10(nu)', 'theta_0'], |
| 'dim': 3, |
| }, |
| 'SecondOrder': { |
| 'names': ['Ed', 'log10(nu)', 'theta_0'], |
| 'dim': 3, |
| }, |
| 'LH_Surface': { |
| 'names': ['Ea', 'log10(nu)', 'theta_A0', 'theta_B0'], |
| 'dim': 4, |
| }, |
| 'MvK': { |
| 'names': ['Ea_red', 'Ea_reox', 'log10(nu_red)', 'theta_O0'], |
| 'dim': 4, |
| }, |
| 'FirstOrderCovDep': { |
| 'names': ['Ed0', 'alpha_cov', 'log10(nu)', 'theta_0'], |
| 'dim': 4, |
| }, |
| 'DiffLimited': { |
| 'names': ['Ed', 'log10(nu)', 'log10(D0)', 'E_diff', 'theta_0'], |
| 'dim': 5, |
| }, |
| } |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| T_REF = 1.0 |
| N_POINTS_DEFAULT = 500 |
|
|
|
|
| |
| |
| |
|
|
| def run_tpd_first_order(Ed, nu, theta_0, beta, T_start, T_end, |
| n_points=N_POINTS_DEFAULT): |
| """ |
| First-order desorption: d(theta)/dt = -nu * theta * exp(-Ed / T(t)) |
| where T(t) = T_start + beta * t. |
| |
| Parameters |
| ---------- |
| Ed : float |
| Dimensionless desorption energy (= Ea / R, in Kelvin). |
| nu : float |
| Pre-exponential factor (s^-1). |
| theta_0 : float |
| Initial fractional surface coverage [0, 1]. |
| beta : float |
| Heating rate (K/s). |
| T_start, T_end : float |
| Temperature ramp range (K). |
| n_points : int |
| Number of output points. |
| |
| Returns |
| ------- |
| dict with 'temperature', 'rate', 'time', 'coverage', 'params' |
| """ |
| t_end = (T_end - T_start) / beta |
| t_eval = np.linspace(0, t_end, n_points) |
|
|
| def rhs(t, y): |
| theta = y[0] |
| T = T_start + beta * t |
| if T < 1.0: |
| T = 1.0 |
| rate = -nu * theta * np.exp(-Ed / T) |
| return [rate] |
|
|
| sol = scipy.integrate.solve_ivp( |
| rhs, [0, t_end], [theta_0], t_eval=t_eval, |
| method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, |
| ) |
|
|
| theta = sol.y[0] |
| temperature = T_start + beta * sol.t |
| rate = nu * np.maximum(theta, 0) * np.exp(-Ed / np.maximum(temperature, 1.0)) |
|
|
| return { |
| 'temperature': temperature.astype(np.float32), |
| 'rate': rate.astype(np.float32), |
| 'time': sol.t.astype(np.float32), |
| 'coverage': theta.astype(np.float32), |
| 'params': { |
| 'mechanism': 'FirstOrder', |
| 'Ed': float(Ed), |
| 'nu': float(nu), |
| 'theta_0': float(theta_0), |
| 'beta': float(beta), |
| 'T_start': float(T_start), |
| 'T_end': float(T_end), |
| }, |
| } |
|
|
|
|
| def run_tpd_second_order(Ed, nu, theta_0, beta, T_start, T_end, |
| n_points=N_POINTS_DEFAULT): |
| """ |
| Second-order (recombinative) desorption: |
| d(theta)/dt = -nu * theta^2 * exp(-Ed / T(t)) |
| |
| Peak position shifts to lower T with increasing theta_0 (key diagnostic). |
| """ |
| t_end = (T_end - T_start) / beta |
| t_eval = np.linspace(0, t_end, n_points) |
|
|
| def rhs(t, y): |
| theta = y[0] |
| T = T_start + beta * t |
| if T < 1.0: |
| T = 1.0 |
| rate = -nu * theta ** 2 * np.exp(-Ed / T) |
| return [rate] |
|
|
| sol = scipy.integrate.solve_ivp( |
| rhs, [0, t_end], [theta_0], t_eval=t_eval, |
| method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, |
| ) |
|
|
| theta = sol.y[0] |
| temperature = T_start + beta * sol.t |
| rate = nu * np.maximum(theta, 0) ** 2 * np.exp(-Ed / np.maximum(temperature, 1.0)) |
|
|
| return { |
| 'temperature': temperature.astype(np.float32), |
| 'rate': rate.astype(np.float32), |
| 'time': sol.t.astype(np.float32), |
| 'coverage': theta.astype(np.float32), |
| 'params': { |
| 'mechanism': 'SecondOrder', |
| 'Ed': float(Ed), |
| 'nu': float(nu), |
| 'theta_0': float(theta_0), |
| 'beta': float(beta), |
| 'T_start': float(T_start), |
| 'T_end': float(T_end), |
| }, |
| } |
|
|
|
|
| def run_tpd_lh_surface(Ea, nu, theta_A0, theta_B0, beta, T_start, T_end, |
| n_points=N_POINTS_DEFAULT): |
| """ |
| Langmuir-Hinshelwood bimolecular surface reaction: |
| A(ads) + B(ads) -> products |
| d(theta_A)/dt = -nu * theta_A * theta_B * exp(-Ea / T) |
| d(theta_B)/dt = -nu * theta_A * theta_B * exp(-Ea / T) |
| rate = nu * theta_A * theta_B * exp(-Ea / T) |
| """ |
| t_end = (T_end - T_start) / beta |
| t_eval = np.linspace(0, t_end, n_points) |
|
|
| def rhs(t, y): |
| theta_A, theta_B = y |
| T = T_start + beta * t |
| if T < 1.0: |
| T = 1.0 |
| r = -nu * theta_A * theta_B * np.exp(-Ea / T) |
| return [r, r] |
|
|
| sol = scipy.integrate.solve_ivp( |
| rhs, [0, t_end], [theta_A0, theta_B0], t_eval=t_eval, |
| method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, |
| ) |
|
|
| theta_A = sol.y[0] |
| theta_B = sol.y[1] |
| temperature = T_start + beta * sol.t |
| rate = nu * np.maximum(theta_A, 0) * np.maximum(theta_B, 0) * \ |
| np.exp(-Ea / np.maximum(temperature, 1.0)) |
|
|
| return { |
| 'temperature': temperature.astype(np.float32), |
| 'rate': rate.astype(np.float32), |
| 'time': sol.t.astype(np.float32), |
| 'coverage': np.stack([theta_A, theta_B], axis=-1).astype(np.float32), |
| 'params': { |
| 'mechanism': 'LH_Surface', |
| 'Ea': float(Ea), |
| 'nu': float(nu), |
| 'theta_A0': float(theta_A0), |
| 'theta_B0': float(theta_B0), |
| 'beta': float(beta), |
| 'T_start': float(T_start), |
| 'T_end': float(T_end), |
| }, |
| } |
|
|
|
|
| def run_tpd_mvk(Ea_red, Ea_reox, nu_red, theta_O0, beta, T_start, T_end, |
| n_points=N_POINTS_DEFAULT): |
| """ |
| Mars-van Krevelen lattice oxygen mechanism: |
| Reduction: rate_red = nu_red * theta_O * exp(-Ea_red / T) |
| Reoxidation: rate_reox = nu_reox * (1 - theta_O) * exp(-Ea_reox / T) |
| d(theta_O)/dt = rate_reox - rate_red |
| Observable rate = rate_red (consumption of lattice oxygen) |
| |
| nu_reox is fixed at nu_red * 0.1 to reduce parameter count while |
| keeping the two-process competition that creates the distinctive MvK |
| peak shapes. |
| """ |
| nu_reox = nu_red * 0.1 |
| t_end = (T_end - T_start) / beta |
| t_eval = np.linspace(0, t_end, n_points) |
|
|
| def rhs(t, y): |
| theta_O = y[0] |
| T = T_start + beta * t |
| if T < 1.0: |
| T = 1.0 |
| r_red = nu_red * theta_O * np.exp(-Ea_red / T) |
| r_reox = nu_reox * (1.0 - theta_O) * np.exp(-Ea_reox / T) |
| return [r_reox - r_red] |
|
|
| sol = scipy.integrate.solve_ivp( |
| rhs, [0, t_end], [theta_O0], t_eval=t_eval, |
| method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, |
| ) |
|
|
| theta_O = sol.y[0] |
| temperature = T_start + beta * sol.t |
| rate = nu_red * np.maximum(theta_O, 0) * \ |
| np.exp(-Ea_red / np.maximum(temperature, 1.0)) |
|
|
| return { |
| 'temperature': temperature.astype(np.float32), |
| 'rate': rate.astype(np.float32), |
| 'time': sol.t.astype(np.float32), |
| 'coverage': theta_O.astype(np.float32), |
| 'params': { |
| 'mechanism': 'MvK', |
| 'Ea_red': float(Ea_red), |
| 'Ea_reox': float(Ea_reox), |
| 'nu_red': float(nu_red), |
| 'nu_reox': float(nu_reox), |
| 'theta_O0': float(theta_O0), |
| 'beta': float(beta), |
| 'T_start': float(T_start), |
| 'T_end': float(T_end), |
| }, |
| } |
|
|
|
|
| def run_tpd_first_order_covdep(Ed0, alpha_cov, nu, theta_0, beta, T_start, T_end, |
| n_points=N_POINTS_DEFAULT): |
| """ |
| First-order desorption with coverage-dependent activation energy: |
| Ed(theta) = Ed0 + alpha_cov * theta |
| d(theta)/dt = -nu * theta * exp(-Ed(theta) / T(t)) |
| |
| Repulsive lateral interactions (alpha_cov > 0) cause the peak to sharpen |
| and shift to lower T as coverage decreases — a signature that standard |
| Redhead analysis misinterprets as a change in Ed. |
| |
| Parameters |
| ---------- |
| Ed0 : float |
| Zero-coverage desorption energy (K, = Ea0/R). |
| alpha_cov : float |
| Coverage-dependence coefficient (K). Positive = repulsive interactions. |
| nu : float |
| Pre-exponential factor (s^-1). |
| theta_0 : float |
| Initial fractional surface coverage [0, 1]. |
| beta : float |
| Heating rate (K/s). |
| """ |
| t_end = (T_end - T_start) / beta |
| t_eval = np.linspace(0, t_end, n_points) |
|
|
| def rhs(t, y): |
| theta = y[0] |
| T = T_start + beta * t |
| if T < 1.0: |
| T = 1.0 |
| Ed_eff = Ed0 + alpha_cov * theta |
| rate = -nu * theta * np.exp(-Ed_eff / T) |
| return [rate] |
|
|
| sol = scipy.integrate.solve_ivp( |
| rhs, [0, t_end], [theta_0], t_eval=t_eval, |
| method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, |
| ) |
|
|
| theta = sol.y[0] |
| temperature = T_start + beta * sol.t |
| Ed_eff = Ed0 + alpha_cov * np.maximum(theta, 0) |
| rate = nu * np.maximum(theta, 0) * np.exp(-Ed_eff / np.maximum(temperature, 1.0)) |
|
|
| return { |
| 'temperature': temperature.astype(np.float32), |
| 'rate': rate.astype(np.float32), |
| 'time': sol.t.astype(np.float32), |
| 'coverage': theta.astype(np.float32), |
| 'params': { |
| 'mechanism': 'FirstOrderCovDep', |
| 'Ed0': float(Ed0), |
| 'alpha_cov': float(alpha_cov), |
| 'nu': float(nu), |
| 'theta_0': float(theta_0), |
| 'beta': float(beta), |
| 'T_start': float(T_start), |
| 'T_end': float(T_end), |
| }, |
| } |
|
|
|
|
| def run_tpd_diff_limited(Ed, nu, D0, E_diff, theta_0, beta, T_start, T_end, |
| n_points=N_POINTS_DEFAULT, n_shells=20): |
| """ |
| Diffusion-limited desorption from a porous/layered material. |
| |
| Models a 1D spherical particle with n_shells concentric shells. |
| Surface desorption follows first-order kinetics; replenishment of the |
| surface layer is limited by intra-particle diffusion with an |
| Arrhenius-type diffusivity D(T) = D0 * exp(-E_diff / T). |
| |
| This produces characteristic broadened, asymmetric peaks with long |
| high-temperature tails that traditional Redhead/Kissinger methods |
| cannot fit — the apparent activation energy depends on particle size |
| and diffusivity. |
| |
| Parameters |
| ---------- |
| Ed : float |
| Surface desorption energy (K, = Ea/R). |
| nu : float |
| Pre-exponential factor for desorption (s^-1). |
| D0 : float |
| Diffusion pre-exponential (s^-1, dimensionless Fourier units). |
| E_diff : float |
| Diffusion activation energy (K, = Ea_diff/R). |
| theta_0 : float |
| Initial uniform loading in all shells [0, 1]. |
| beta : float |
| Heating rate (K/s). |
| n_shells : int |
| Number of radial shells for the discretized diffusion. |
| """ |
| t_end = (T_end - T_start) / beta |
| t_eval = np.linspace(0, t_end, n_points) |
|
|
| |
| |
| dr = 1.0 / n_shells |
| r = np.array([(i + 0.5) * dr for i in range(n_shells)]) |
| r_face = np.array([i * dr for i in range(n_shells + 1)]) |
|
|
| |
| vol = (4.0 / 3.0) * np.pi * (r_face[1:] ** 3 - r_face[:-1] ** 3) |
|
|
| y0 = np.full(n_shells, theta_0) |
|
|
| def rhs(t, y): |
| T = T_start + beta * t |
| if T < 1.0: |
| T = 1.0 |
|
|
| D = D0 * np.exp(-E_diff / T) |
| dydt = np.zeros(n_shells) |
|
|
| |
| for i in range(n_shells - 1): |
| area = 4.0 * np.pi * r_face[i + 1] ** 2 |
| flux = D * area * (y[i] - y[i + 1]) / dr |
| dydt[i] -= flux / vol[i] |
| dydt[i + 1] += flux / vol[i + 1] |
|
|
| |
| k_des = nu * np.exp(-Ed / T) |
| dydt[-1] -= k_des * y[-1] |
|
|
| return dydt |
|
|
| sol = scipy.integrate.solve_ivp( |
| rhs, [0, t_end], y0, t_eval=t_eval, |
| method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, |
| ) |
|
|
| theta_all = sol.y |
| temperature = T_start + beta * sol.t |
|
|
| |
| theta_surf = np.maximum(theta_all[-1], 0) |
| rate = nu * theta_surf * np.exp(-Ed / np.maximum(temperature, 1.0)) |
|
|
| return { |
| 'temperature': temperature.astype(np.float32), |
| 'rate': rate.astype(np.float32), |
| 'time': sol.t.astype(np.float32), |
| 'coverage': theta_surf.astype(np.float32), |
| 'params': { |
| 'mechanism': 'DiffLimited', |
| 'Ed': float(Ed), |
| 'nu': float(nu), |
| 'D0': float(D0), |
| 'E_diff': float(E_diff), |
| 'theta_0': float(theta_0), |
| 'beta': float(beta), |
| 'T_start': float(T_start), |
| 'T_end': float(T_end), |
| 'n_shells': n_shells, |
| }, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def _run_single_tpd(params): |
| """Run a single TPD simulation, dispatching to the correct mechanism.""" |
| mech = params['mechanism'] |
| beta = params['beta'] |
| T_start = params['T_start'] |
| T_end = params['T_end'] |
| n_points = params.get('n_points', N_POINTS_DEFAULT) |
|
|
| if mech == 'FirstOrder': |
| return run_tpd_first_order( |
| params['Ed'], params['nu'], params['theta_0'], |
| beta, T_start, T_end, n_points, |
| ) |
| elif mech == 'SecondOrder': |
| return run_tpd_second_order( |
| params['Ed'], params['nu'], params['theta_0'], |
| beta, T_start, T_end, n_points, |
| ) |
| elif mech == 'LH_Surface': |
| return run_tpd_lh_surface( |
| params['Ea'], params['nu'], params['theta_A0'], params['theta_B0'], |
| beta, T_start, T_end, n_points, |
| ) |
| elif mech == 'MvK': |
| return run_tpd_mvk( |
| params['Ea_red'], params['Ea_reox'], params['nu_red'], |
| params['theta_O0'], beta, T_start, T_end, n_points, |
| ) |
| elif mech == 'FirstOrderCovDep': |
| return run_tpd_first_order_covdep( |
| params['Ed0'], params['alpha_cov'], params['nu'], |
| params['theta_0'], beta, T_start, T_end, n_points, |
| ) |
| elif mech == 'DiffLimited': |
| return run_tpd_diff_limited( |
| params['Ed'], params['nu'], params['D0'], params['E_diff'], |
| params['theta_0'], beta, T_start, T_end, n_points, |
| ) |
| else: |
| raise ValueError(f"Unknown TPD mechanism: {mech}") |
|
|
|
|
| |
| |
| |
|
|
| def _sample_common_tpd_params(rng): |
| """Sample common TPD experiment parameters.""" |
| T_start = rng.uniform(300, 400) |
| T_end = rng.uniform(900, 1200) |
| return T_start, T_end |
|
|
|
|
| def _estimate_T_peak(Ed, log10_nu, beta): |
| """Redhead estimate of peak temperature for first-order TPD. |
| Solves Ed/T_peak^2 ≈ (nu/beta) * exp(-Ed/T_peak) iteratively. |
| Good enough for rejection sampling.""" |
| |
| |
| ln_nu = log10_nu * np.log(10) |
| T_est = Ed / (ln_nu - np.log(max(Ed / max(beta, 0.01), 1.0))) |
| T_est = np.clip(T_est, 100, 2000) |
| |
| for _ in range(3): |
| exp_term = np.exp(-Ed / max(T_est, 1.0)) |
| f = Ed / (T_est ** 2) - (10 ** log10_nu / max(beta, 0.01)) * exp_term |
| df = -2 * Ed / (T_est ** 3) - (10 ** log10_nu / max(beta, 0.01)) * exp_term * Ed / (T_est ** 2) |
| if abs(df) > 1e-30: |
| T_est = T_est - f / df |
| T_est = np.clip(T_est, 100, 2000) |
| return T_est |
|
|
|
|
| def _sample_Ed_nu_beta(rng, T_start, T_end, max_attempts=50): |
| """Sample (Ed, nu, beta) ensuring the estimated peak temperature |
| falls within the measurement window [T_start+margin, T_end-margin].""" |
| T_lo = T_start + 0.15 * (T_end - T_start) |
| T_hi = T_end - 0.10 * (T_end - T_start) |
|
|
| for _ in range(max_attempts): |
| log10_nu = rng.uniform(12, 16) |
| log10_beta = rng.uniform(-0.5, 1.5) |
| beta = 10 ** log10_beta |
|
|
| |
| T_peak_target = rng.uniform(T_lo, T_hi) |
| ln_nu = log10_nu * np.log(10) |
| |
| |
| Ed = T_peak_target * (ln_nu + np.log(T_peak_target) - np.log(beta) - 3.64) |
|
|
| if Ed < 3000 or Ed > 50000: |
| continue |
|
|
| T_est = _estimate_T_peak(Ed, log10_nu, beta) |
| if T_lo <= T_est <= T_hi: |
| return Ed, 10 ** log10_nu, beta |
|
|
| |
| beta = 10 ** rng.uniform(0.0, 1.0) |
| T_peak_target = rng.uniform(T_lo, T_hi) |
| log10_nu = 13.0 |
| Ed = T_peak_target * (13.0 * np.log(10) + np.log(T_peak_target) - np.log(beta) - 3.64) |
| return Ed, 10 ** log10_nu, beta |
|
|
|
|
| def sample_first_order_params(rng): |
| """Sample parameters for first-order desorption.""" |
| T_start, T_end = _sample_common_tpd_params(rng) |
| Ed, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end) |
| theta_0 = rng.uniform(0.1, 1.0) |
| return { |
| 'mechanism': 'FirstOrder', |
| 'Ed': float(Ed), 'nu': float(nu), 'theta_0': float(theta_0), |
| 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), |
| } |
|
|
|
|
| def sample_second_order_params(rng): |
| """Sample parameters for second-order desorption.""" |
| T_start, T_end = _sample_common_tpd_params(rng) |
| Ed, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end) |
| theta_0 = rng.uniform(0.1, 1.0) |
| return { |
| 'mechanism': 'SecondOrder', |
| 'Ed': float(Ed), 'nu': float(nu), 'theta_0': float(theta_0), |
| 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), |
| } |
|
|
|
|
| def sample_lh_surface_params(rng): |
| """Sample parameters for LH bimolecular surface reaction.""" |
| T_start, T_end = _sample_common_tpd_params(rng) |
| Ea, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end) |
| theta_A0 = rng.uniform(0.1, 1.0) |
| theta_B0 = rng.uniform(0.1, 1.0) |
| return { |
| 'mechanism': 'LH_Surface', |
| 'Ea': float(Ea), 'nu': float(nu), |
| 'theta_A0': float(theta_A0), 'theta_B0': float(theta_B0), |
| 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), |
| } |
|
|
|
|
| def sample_mvk_params(rng): |
| """Sample parameters for Mars-van Krevelen mechanism.""" |
| T_start, T_end = _sample_common_tpd_params(rng) |
| Ea_red, nu_red, beta = _sample_Ed_nu_beta(rng, T_start, T_end) |
| |
| Ea_reox, _, _ = _sample_Ed_nu_beta(rng, T_start, T_end) |
| theta_O0 = rng.uniform(0.5, 1.0) |
| return { |
| 'mechanism': 'MvK', |
| 'Ea_red': float(Ea_red), 'Ea_reox': float(Ea_reox), |
| 'nu_red': float(nu_red), 'theta_O0': float(theta_O0), |
| 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), |
| } |
|
|
|
|
| def sample_first_order_covdep_params(rng): |
| """Sample parameters for coverage-dependent first-order desorption. |
| |
| Ed(theta) = Ed0 + alpha_cov * theta. alpha_cov > 0 means repulsive |
| lateral interactions (common for CO on metals). We sample Ed0 via the |
| Redhead constraint at the *initial* coverage so the peak lands in the |
| measurement window, then add a coverage-dependence coefficient that |
| shifts the effective energy by up to ~30% of Ed0. |
| """ |
| T_start, T_end = _sample_common_tpd_params(rng) |
| Ed0, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end) |
| |
| |
| alpha_cov = rng.uniform(0.05, 0.35) * Ed0 |
| theta_0 = rng.uniform(0.3, 1.0) |
| return { |
| 'mechanism': 'FirstOrderCovDep', |
| 'Ed0': float(Ed0), 'alpha_cov': float(alpha_cov), |
| 'nu': float(nu), 'theta_0': float(theta_0), |
| 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), |
| } |
|
|
|
|
| def sample_diff_limited_params(rng): |
| """Sample parameters for diffusion-limited desorption. |
| |
| The key physics: if E_diff is comparable to Ed, diffusion is the |
| rate-limiting step and the TPD peak broadens with a long tail. |
| If E_diff << Ed, diffusion is fast and the curve looks like standard |
| first-order. We sample to ensure a range of diffusion-limitation |
| regimes. |
| """ |
| T_start, T_end = _sample_common_tpd_params(rng) |
| Ed, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end) |
|
|
| |
| |
| |
| log10_D0 = rng.uniform(2.0, 6.0) |
| D0 = 10 ** log10_D0 |
|
|
| |
| |
| E_diff = rng.uniform(0.3, 0.9) * Ed |
|
|
| theta_0 = rng.uniform(0.3, 1.0) |
| return { |
| 'mechanism': 'DiffLimited', |
| 'Ed': float(Ed), 'nu': float(nu), |
| 'D0': float(D0), 'E_diff': float(E_diff), |
| 'theta_0': float(theta_0), |
| 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), |
| } |
|
|
|
|
| def sample_tpd_params(rng, mechanism=None): |
| """Sample TPD parameters, optionally for a specific mechanism.""" |
| if mechanism is None: |
| mechanism = rng.choice(TPD_MECHANISM_LIST) |
|
|
| samplers = { |
| 'FirstOrder': sample_first_order_params, |
| 'SecondOrder': sample_second_order_params, |
| 'LH_Surface': sample_lh_surface_params, |
| 'MvK': sample_mvk_params, |
| 'FirstOrderCovDep': sample_first_order_covdep_params, |
| 'DiffLimited': sample_diff_limited_params, |
| } |
| return samplers[mechanism](rng) |
|
|
|
|
| |
| |
| |
|
|
| def _add_noise(signal, rng, noise_range=(0.001, 0.02)): |
| """Add Gaussian noise to a signal. Returns (noisy_signal, sigma_noise). |
| Clamps result to >= 0 since desorption/reaction rates are non-negative.""" |
| sigma_noise = rng.uniform(*noise_range) |
| peak = np.max(np.abs(signal)) + 1e-20 |
| noise = sigma_noise * peak * rng.standard_normal(signal.shape) |
| noisy = signal + noise.astype(signal.dtype) |
| np.maximum(noisy, 0, out=noisy) |
| return noisy, float(sigma_noise) |
|
|
|
|
| |
| |
| |
|
|
| def _sample_heating_rates(rng, n_rates, log_beta_range=(-0.5, 1.5)): |
| """Sample log-spaced heating rates spanning the given range.""" |
| lo, hi = log_beta_range |
| if n_rates == 1: |
| return np.array([10 ** rng.uniform(lo, hi)]) |
| anchors = np.linspace(lo, hi, n_rates) |
| jitter = (hi - lo) / (n_rates - 1) * 0.3 |
| log_betas = np.array([rng.uniform(a - jitter, a + jitter) for a in anchors]) |
| log_betas = np.clip(log_betas, lo, hi) |
| log_betas.sort() |
| return 10 ** log_betas |
|
|
|
|
| |
| |
| |
|
|
| def generate_sample_single(idx, outdir, seed, mechanism=None, |
| n_heating_rates=1, add_noise=True): |
| """ |
| Generate and save a single TPD sample (single or multi-heating-rate). |
| |
| When n_heating_rates > 1, the same kinetic parameters are simulated at |
| multiple heating rates and saved together. |
| """ |
| rng = np.random.default_rng(seed + idx) |
|
|
| try: |
| params = sample_tpd_params(rng, mechanism=mechanism) |
| actual_mechanism = params['mechanism'] |
| mechanism_id = TPD_MECHANISM_TO_ID[actual_mechanism] |
|
|
| if n_heating_rates <= 1: |
| result = _run_single_tpd(params) |
| rate = result['rate'].copy() |
| sigma_noise = 0.0 |
| if add_noise: |
| rate, sigma_noise = _add_noise(rate, rng) |
|
|
| save_params = dict(params) |
| save_params['sigma_noise'] = sigma_noise |
|
|
| np.savez_compressed( |
| os.path.join(outdir, f"sample_{idx:06d}.npz"), |
| temperature=result['temperature'], |
| rate=rate, |
| time=result['time'], |
| params=save_params, |
| mechanism_id=np.int32(mechanism_id), |
| ) |
| meta = { |
| 'idx': idx, 'success': True, |
| 'mechanism': actual_mechanism, |
| 'mechanism_id': int(mechanism_id), |
| 'n_time': len(result['time']), |
| 'beta': float(params['beta']), |
| 'n_heating_rates': 1, |
| 'sigma_noise': sigma_noise, |
| } |
| return meta |
|
|
| |
| heating_rates = _sample_heating_rates(rng, n_heating_rates) |
|
|
| temperatures, rates, times = [], [], [] |
| for beta in heating_rates: |
| p = dict(params) |
| p['beta'] = float(beta) |
| result = _run_single_tpd(p) |
| temp = result['temperature'].copy() |
| rate = result['rate'].copy() |
| if add_noise: |
| rate, _ = _add_noise(rate, rng) |
| temperatures.append(temp) |
| rates.append(rate) |
| times.append(result['time'].copy()) |
|
|
| |
| max_t = max(len(t) for t in temperatures) |
| n_hr = len(heating_rates) |
| temp_arr = np.zeros((n_hr, max_t), dtype=np.float32) |
| rate_arr = np.zeros((n_hr, max_t), dtype=np.float32) |
| time_arr = np.zeros((n_hr, max_t), dtype=np.float32) |
| lengths = np.zeros(n_hr, dtype=np.int32) |
|
|
| for i in range(n_hr): |
| t_len = len(temperatures[i]) |
| temp_arr[i, :t_len] = temperatures[i] |
| rate_arr[i, :t_len] = rates[i] |
| time_arr[i, :t_len] = times[i] |
| lengths[i] = t_len |
|
|
| save_params = dict(params) |
| |
| save_params.pop('beta', None) |
|
|
| np.savez_compressed( |
| os.path.join(outdir, f"sample_{idx:06d}.npz"), |
| temperature=temp_arr, |
| rate=rate_arr, |
| time=time_arr, |
| heating_rates=heating_rates.astype(np.float32), |
| lengths=lengths, |
| params=save_params, |
| mechanism_id=np.int32(mechanism_id), |
| n_heating_rates=np.int32(n_heating_rates), |
| ) |
|
|
| meta = { |
| 'idx': idx, 'success': True, |
| 'mechanism': actual_mechanism, |
| 'mechanism_id': int(mechanism_id), |
| 'n_time_max': int(max_t), |
| 'heating_rates': [float(b) for b in heating_rates], |
| 'n_heating_rates': n_heating_rates, |
| } |
| return meta |
|
|
| except Exception as e: |
| return { |
| 'idx': idx, |
| 'success': False, |
| 'error': str(e), |
| } |
|
|
|
|
| def _worker_generate(args): |
| """Worker function for multiprocessing (must be at module level).""" |
| idx, outdir, seed, mechanism, n_heating_rates, add_noise = args |
| return generate_sample_single(idx, outdir, seed, mechanism, |
| n_heating_rates, add_noise) |
|
|
|
|
| def generate_dataset( |
| n_samples=1000, |
| outdir="data_tpd/raw", |
| seed=42, |
| n_workers=None, |
| mechanism=None, |
| multi_mechanism=False, |
| n_per_mechanism=None, |
| n_heating_rates=1, |
| add_noise=True, |
| ): |
| """Generate a dataset of TPD simulations.""" |
| os.makedirs(outdir, exist_ok=True) |
|
|
| if n_workers is None: |
| n_workers = max(1, cpu_count() - 1) |
|
|
| if multi_mechanism: |
| if n_per_mechanism is None: |
| n_per_mechanism = n_samples |
| total = n_per_mechanism * len(TPD_MECHANISM_LIST) |
| print(f"Generating multi-mechanism TPD dataset: " |
| f"{n_per_mechanism} per mechanism x {len(TPD_MECHANISM_LIST)} = {total}") |
| args_list = [] |
| for mech_idx, mech in enumerate(TPD_MECHANISM_LIST): |
| offset = mech_idx * n_per_mechanism |
| for i in range(n_per_mechanism): |
| args_list.append((offset + i, outdir, seed, mech, |
| n_heating_rates, add_noise)) |
| n_samples = total |
| else: |
| args_list = [(i, outdir, seed, mechanism, n_heating_rates, add_noise) |
| for i in range(n_samples)] |
|
|
| n_workers = min(n_workers, n_samples) |
|
|
| print(f"Generating {n_samples} TPD samples...") |
| print(f"Output directory: {outdir}") |
| print(f"Heating rates per sample: {n_heating_rates}") |
| print(f"Using {n_workers} worker(s)") |
|
|
| metadata = [] |
|
|
| if n_workers == 1: |
| for args in tqdm(args_list, desc="Generating samples"): |
| meta = _worker_generate(args) |
| metadata.append(meta) |
| if not meta['success']: |
| print(f"\nSample {meta['idx']} failed: {meta.get('error', 'Unknown')}") |
| else: |
| try: |
| with Pool(processes=n_workers) as pool: |
| for meta in tqdm( |
| pool.imap_unordered(_worker_generate, args_list, chunksize=max(1, n_samples // (n_workers * 4))), |
| total=n_samples, |
| desc="Generating samples", |
| ): |
| metadata.append(meta) |
| if not meta['success']: |
| tqdm.write(f"Sample {meta['idx']} failed: " |
| f"{meta.get('error', 'Unknown')}") |
| except (PermissionError, OSError) as e: |
| print(f"\nWarning: Multiprocessing failed ({e}). " |
| "Falling back to sequential...") |
| metadata = [] |
| for args in tqdm(args_list, desc="Generating samples (sequential)"): |
| meta = _worker_generate(args) |
| metadata.append(meta) |
| if not meta['success']: |
| print(f"\nSample {meta['idx']} failed: " |
| f"{meta.get('error', 'Unknown')}") |
|
|
| metadata = sorted(metadata, key=lambda x: x['idx']) |
|
|
| n_success = sum(1 for m in metadata if m['success']) |
| mech_counts = {} |
| for m in metadata: |
| if m['success']: |
| mech = m.get('mechanism', 'Unknown') |
| mech_counts[mech] = mech_counts.get(mech, 0) + 1 |
|
|
| summary = { |
| 'n_samples': n_samples, |
| 'n_success': n_success, |
| 'n_heating_rates': n_heating_rates, |
| 'add_noise': add_noise, |
| 'seed': seed, |
| 'n_workers': n_workers, |
| 'multi_mechanism': multi_mechanism, |
| 'mechanism_counts': mech_counts, |
| 'samples': metadata, |
| } |
|
|
| with open(os.path.join(outdir, "metadata.json"), "w") as f: |
| json.dump(summary, f, indent=2) |
|
|
| print(f"\nGeneration complete: {n_success}/{n_samples} successful") |
| print(f"Mechanism counts: {mech_counts}") |
| print(f"Metadata saved to {os.path.join(outdir, 'metadata.json')}") |
|
|
| return metadata |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser( |
| description="Generate TPD dataset for catalysis mechanism identification" |
| ) |
| parser.add_argument("--n_samples", type=int, default=1000) |
| parser.add_argument("--outdir", type=str, default="data_tpd/raw") |
| parser.add_argument("--seed", type=int, default=42) |
| parser.add_argument("--n_workers", type=int, default=None) |
| parser.add_argument("--mechanism", type=str, default=None, |
| choices=TPD_MECHANISM_LIST) |
| parser.add_argument("--multi_mechanism", action="store_true") |
| parser.add_argument("--n_per_mechanism", type=int, default=None) |
| parser.add_argument("--n_heating_rates", type=int, default=1) |
| parser.add_argument("--no_noise", action="store_true") |
| parser.add_argument("--test", action="store_true", |
| help="Run a single test simulation and plot") |
|
|
| args = parser.parse_args() |
|
|
| if args.test: |
| print(f"Running test TPD simulations for all {len(TPD_MECHANISM_LIST)} mechanisms...\n") |
| rng = np.random.default_rng(42) |
|
|
| for mech in TPD_MECHANISM_LIST: |
| params = sample_tpd_params(rng, mechanism=mech) |
| result = _run_single_tpd(params) |
| peak_rate = np.max(result['rate']) |
| peak_T = result['temperature'][np.argmax(result['rate'])] |
| print(f"{mech}:") |
| print(f" T range: [{result['temperature'][0]:.0f}, " |
| f"{result['temperature'][-1]:.0f}] K") |
| print(f" Peak rate: {peak_rate:.4e} at T = {peak_T:.0f} K") |
| print(f" Time steps: {len(result['time'])}") |
| print(f" Params: {params}") |
| print() |
|
|
| try: |
| import matplotlib.pyplot as plt |
|
|
| fig, axes = plt.subplots(2, 2, figsize=(12, 10)) |
| for ax, mech in zip(axes.flat, TPD_MECHANISM_LIST): |
| params = sample_tpd_params(rng, mechanism=mech) |
| for beta in [0.5, 2.0, 10.0]: |
| p = dict(params) |
| p['beta'] = beta |
| result = _run_single_tpd(p) |
| ax.plot(result['temperature'], result['rate'], |
| label=f'beta={beta:.1f} K/s') |
| ax.set_xlabel('Temperature (K)') |
| ax.set_ylabel('Desorption/Reaction Rate') |
| ax.set_title(mech) |
| ax.legend(fontsize=8) |
|
|
| plt.tight_layout() |
| plt.savefig('test_tpd_simulation.png', dpi=150) |
| print("Plot saved to test_tpd_simulation.png") |
| except ImportError: |
| print("matplotlib not available, skipping plot") |
| else: |
| generate_dataset( |
| n_samples=args.n_samples, |
| outdir=args.outdir, |
| seed=args.seed, |
| n_workers=args.n_workers, |
| mechanism=args.mechanism, |
| multi_mechanism=args.multi_mechanism, |
| n_per_mechanism=args.n_per_mechanism, |
| n_heating_rates=args.n_heating_rates, |
| add_noise=not args.no_noise, |
| ) |
|
|