Spaces:
Sleeping
Sleeping
| """Forward-simulate the agent's parsed hypothesis. | |
| Responsibility: | |
| - Given a :class:`ParsedEquation`, the system's state-variable layout, the | |
| agent's parameter dictionary, and a set of initial conditions and | |
| timestamps, run ``scipy.integrate.odeint`` to produce a predicted | |
| trajectory in the same shape as the system's observed data. | |
| What this module does not do: parse text (see :mod:`physix.verifier.parser`), | |
| score the trajectory (see :mod:`physix.verifier.metrics`), or compose rewards | |
| (see :mod:`physix.verifier.reward`). | |
| """ | |
| from __future__ import annotations | |
| from collections.abc import Iterable | |
| from typing import Callable | |
| import numpy as np | |
| import sympy as sp | |
| from scipy.integrate import odeint | |
| from physix.verifier.parser import ParsedEquation | |
| class SimulationError(RuntimeError): | |
| """Raised when the proposed system cannot be integrated numerically. | |
| Common causes: NaN/Inf produced by ``odeint``, mass-matrix singularity, | |
| or stiff dynamics that exhaust the integrator's step budget. | |
| """ | |
| def simulate_hypothesis( | |
| parsed: ParsedEquation, | |
| state_variables: Iterable[str], | |
| parameters: dict[str, float], | |
| initial_conditions: dict[str, float], | |
| timestamps: np.ndarray, | |
| ) -> dict[str, np.ndarray]: | |
| """Integrate the agent's hypothesis forward in time. | |
| Args: | |
| parsed: Parsed equations from :func:`physix.verifier.parser.parse_equation`. | |
| state_variables: Ordering of state variables in the underlying | |
| physical system. Used to derive the integration state vector. | |
| parameters: Numerical parameter substitutions supplied by the agent. | |
| initial_conditions: Initial state values keyed by variable name. | |
| timestamps: 1-D array of times at which to record state. | |
| Returns: | |
| A dict mapping each state variable to its predicted trajectory | |
| (1-D array of the same length as ``timestamps``). | |
| """ | |
| state_vars = tuple(state_variables) | |
| int_layout = _build_integration_layout(parsed, state_vars) | |
| try: | |
| rhs_callable = _compile_rhs(parsed, int_layout, parameters) | |
| except Exception as exc: # noqa: BLE001 — surfaced as SimulationError below | |
| raise SimulationError(f"failed to compile RHS: {exc}") from exc | |
| initial_state = _build_initial_state(int_layout, initial_conditions) | |
| try: | |
| result = odeint( | |
| rhs_callable, | |
| initial_state, | |
| timestamps, | |
| full_output=False, | |
| mxstep=2000, | |
| rtol=1e-6, | |
| atol=1e-9, | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| # ``odeint`` propagates whatever the user-supplied RHS callable | |
| # raises. The model can emit equations that lambdify into Python | |
| # code which then trips ``TypeError`` (e.g. ``np.sqrt`` on a | |
| # SymPy ``Add``), ``ZeroDivisionError``, ``OverflowError``, etc. | |
| # Any of those should be surfaced to the env as a clean | |
| # ``SimulationError`` (= ``r_match=0`` for the turn) rather than | |
| # crashing the route with a 500. | |
| raise SimulationError(f"odeint failed: {exc}") from exc | |
| if not np.all(np.isfinite(result)): | |
| raise SimulationError("Predicted trajectory contains NaN or Inf values.") | |
| return _project_back_to_state_vars(result, int_layout, state_vars) | |
| _IntegrationLayout = tuple[tuple[str, ...], dict[str, int]] | |
| def _build_integration_layout( | |
| parsed: ParsedEquation, | |
| state_vars: tuple[str, ...], | |
| ) -> _IntegrationLayout: | |
| """Return ``(integration_vars, index)`` for the integration state. | |
| For a system with second-order ODEs we need to integrate both ``var`` and | |
| ``dvar/dt``. We prefix-derive the names: e.g. ``y -> [y, vy]``, with the | |
| convention that ``v<name>`` is the first time derivative of ``<name>`` | |
| (matching the conventions used in :mod:`physix.systems.tier1`). | |
| """ | |
| integration_vars: list[str] = [] | |
| eq_by_var = {eq.var: eq for eq in parsed.equations} | |
| for var in state_vars: | |
| # Skip variables that are themselves first derivatives of another | |
| # state variable (e.g. ``vy`` paired with ``y``); those will be added | |
| # as part of the higher-order companion. | |
| if _is_velocity_of(var, state_vars): | |
| continue | |
| eq = eq_by_var.get(var) | |
| if eq is not None and eq.order == 2: | |
| integration_vars.append(var) | |
| integration_vars.append(_velocity_name(var)) | |
| continue | |
| if eq is not None and eq.order == 1: | |
| integration_vars.append(var) | |
| continue | |
| # No matching equation — accept anyway, treating as zero-derivative. | |
| integration_vars.append(var) | |
| # Now also append any equations whose var was not in state_vars (e.g. | |
| # the agent named a derivative directly). We log these but do not crash; | |
| # ``rhs_callable`` will simply not see them. | |
| declared = set(integration_vars) | |
| for eq in parsed.equations: | |
| if eq.var in declared: | |
| continue | |
| if eq.order == 2: | |
| integration_vars.append(eq.var) | |
| integration_vars.append(_velocity_name(eq.var)) | |
| else: | |
| integration_vars.append(eq.var) | |
| index = {name: i for i, name in enumerate(integration_vars)} | |
| return tuple(integration_vars), index | |
| def _is_velocity_of(var: str, state_vars: tuple[str, ...]) -> bool: | |
| """True if ``var`` looks like the first derivative of another state var.""" | |
| if var.startswith("v"): | |
| return var[1:] in state_vars | |
| if var.startswith("d"): | |
| return var[1:] in state_vars | |
| return False | |
| def _velocity_name(var: str) -> str: | |
| """Convention for naming a first time derivative.""" | |
| if var.startswith("theta"): | |
| return "d" + var | |
| return "v" + var | |
| def _compile_rhs( | |
| parsed: ParsedEquation, | |
| layout: _IntegrationLayout, | |
| parameters: dict[str, float], | |
| ) -> Callable[[np.ndarray, float], np.ndarray]: | |
| """Build a Python callable f(state, t) -> dstate/dt for ``odeint``.""" | |
| integration_vars, index = layout | |
| eq_by_var = {eq.var: eq for eq in parsed.equations} | |
| # Lambdify each equation's RHS once. Symbols are bound to integration | |
| # variables (state) and to scalar parameters (substituted). | |
| state_symbols = sp.symbols(" ".join(integration_vars)) | |
| if not isinstance(state_symbols, tuple): | |
| state_symbols = (state_symbols,) | |
| rhs_lambdas: dict[str, Callable[..., float]] = {} | |
| for var, eq in eq_by_var.items(): | |
| rhs = eq.rhs.subs({sp.Symbol(k): v for k, v in parameters.items()}) | |
| rhs_lambdas[var] = sp.lambdify(state_symbols, rhs, modules="numpy") | |
| def _rhs(state: np.ndarray, t: float) -> np.ndarray: | |
| derivs = np.zeros_like(state) | |
| for var, eq in eq_by_var.items(): | |
| if eq.order == 1: | |
| idx = index[var] | |
| derivs[idx] = float(rhs_lambdas[var](*state)) | |
| else: # order == 2 | |
| pos_idx = index[var] | |
| vel_name = _velocity_name(var) | |
| vel_idx = index[vel_name] | |
| derivs[pos_idx] = state[vel_idx] | |
| derivs[vel_idx] = float(rhs_lambdas[var](*state)) | |
| return derivs | |
| return _rhs | |
| def _build_initial_state( | |
| layout: _IntegrationLayout, | |
| initial_conditions: dict[str, float], | |
| ) -> np.ndarray: | |
| """Project the IC dict into the integration variable order.""" | |
| integration_vars, _ = layout | |
| state = np.zeros(len(integration_vars), dtype=float) | |
| for i, var in enumerate(integration_vars): | |
| state[i] = float(initial_conditions.get(var, 0.0)) | |
| return state | |
| def _project_back_to_state_vars( | |
| result: np.ndarray, | |
| layout: _IntegrationLayout, | |
| state_vars: tuple[str, ...], | |
| ) -> dict[str, np.ndarray]: | |
| """Slice the integration output into per-system-variable trajectories.""" | |
| _, index = layout | |
| out: dict[str, np.ndarray] = {} | |
| for var in state_vars: | |
| if var in index: | |
| out[var] = result[:, index[var]] | |
| else: | |
| # The agent never modelled this variable; predict the first | |
| # entry constant. This is rare and kept defensive. | |
| out[var] = np.full(result.shape[0], float(result[0, 0])) | |
| return out | |