Spaces:
Sleeping
Sleeping
| """Abstract base class and shared types for physical systems. | |
| A physical system is responsible for three things and three things only: | |
| 1. Exposing **stable metadata** (id, tier, state-variable names, NL hint). | |
| 2. Generating a **noisy trajectory** for one episode given an RNG seed. | |
| 3. Reporting its **ground-truth equation** as a canonical SymPy expression | |
| for logging and verification. | |
| Reward computation, simulation of the *agent's* hypotheses, residual analysis, | |
| and natural-language mismatch summarisation all live in :mod:`physix.verifier`. | |
| Systems do not import anything from the verifier; the dependency runs in one | |
| direction. | |
| """ | |
| from __future__ import annotations | |
| from abc import ABC, ABCMeta, abstractmethod | |
| from enum import Enum | |
| import numpy as np | |
| from pydantic import BaseModel, ConfigDict, Field | |
| from pydantic._internal._model_construction import ModelMetaclass | |
| from scipy.integrate import odeint | |
| class SystemTier(str, Enum): | |
| """Curriculum tier. Tier 3 is held out of training to enable a | |
| generalisation claim ("converges on systems it never trained on").""" | |
| TIER_1 = "tier1" | |
| TIER_2 = "tier2" | |
| TIER_3 = "tier3" | |
| class TrajectoryData(BaseModel): | |
| """Numerical trajectory plus its initial conditions. | |
| ``initial_conditions`` is included so the verifier can re-simulate the | |
| agent's hypothesis from the *same* starting point, making residuals | |
| directly comparable. | |
| """ | |
| model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True) | |
| timestamps: np.ndarray | |
| states: dict[str, np.ndarray] | |
| initial_conditions: dict[str, float] | |
| state_variables: tuple[str, ...] | |
| def to_observation_samples(self, decimals: int = 5) -> list[dict[str, float]]: | |
| """Render as a JSON-friendly list of timestep dicts for the agent.""" | |
| samples: list[dict[str, float]] = [] | |
| for i, t in enumerate(self.timestamps): | |
| sample: dict[str, float] = {"t": round(float(t), decimals)} | |
| for var in self.state_variables: | |
| sample[var] = round(float(self.states[var][i]), decimals) | |
| samples.append(sample) | |
| return samples | |
| def stats(self) -> dict[str, float]: | |
| """Aggregate statistics for the agent's stats panel.""" | |
| out: dict[str, float] = { | |
| "duration": float(self.timestamps[-1] - self.timestamps[0]), | |
| "n_timesteps": float(len(self.timestamps)), | |
| "dt": float(self.timestamps[1] - self.timestamps[0]) if len(self.timestamps) > 1 else 0.0, | |
| } | |
| for var in self.state_variables: | |
| arr = self.states[var] | |
| out[f"{var}_min"] = float(np.min(arr)) | |
| out[f"{var}_max"] = float(np.max(arr)) | |
| out[f"{var}_std"] = float(np.std(arr)) | |
| return out | |
| class _AbstractModelMeta(ModelMetaclass, ABCMeta): | |
| """Metaclass union so :class:`PhysicalSystem` is both a pydantic model and | |
| a true ABC (i.e. instantiating the base or a subclass that fails to | |
| implement an abstract method raises ``TypeError``).""" | |
| class PhysicalSystem(BaseModel, ABC, metaclass=_AbstractModelMeta): | |
| """Abstract physical system. | |
| Concrete subclasses must: | |
| - Override :attr:`system_id`, :attr:`tier`, :attr:`state_variables`, and | |
| :attr:`hint_template`. | |
| - Implement :meth:`sample_parameters` to draw random episode parameters. | |
| - Implement :meth:`sample_initial_conditions` to draw random initial state. | |
| - Implement :meth:`rhs` returning the time derivatives. | |
| - Implement :meth:`ground_truth_equation` returning a canonical SymPy | |
| string of the system's equation of motion. | |
| The base class implements :meth:`simulate` once for all subclasses by | |
| delegating to ``scipy.integrate.odeint`` against :meth:`rhs`. | |
| """ | |
| model_config = ConfigDict(arbitrary_types_allowed=True) | |
| system_id: str = "" | |
| tier: SystemTier = SystemTier.TIER_1 | |
| state_variables: tuple[str, ...] = () | |
| hint_template: str = "" | |
| duration: float = 10.0 | |
| n_timesteps: int = 100 | |
| #: Gaussian noise applied to each observed sample, expressed as a fraction | |
| #: of the per-variable *standard deviation* of the clean trajectory. Using | |
| #: std (rather than range) avoids the pathology where a system with a | |
| #: large total excursion (e.g. free fall) produces overwhelming noise. | |
| noise_std: float = 0.02 | |
| parameters: dict[str, float] = Field(default_factory=dict) | |
| initial_conditions: dict[str, float] = Field(default_factory=dict) | |
| # ------------------------------------------------------------------ API | |
| def sample_parameters(self, rng: np.random.Generator) -> dict[str, float]: | |
| """Draw a fresh set of physical parameters for one episode.""" | |
| def sample_initial_conditions(self, rng: np.random.Generator) -> dict[str, float]: | |
| """Draw fresh initial conditions for one episode.""" | |
| def rhs( | |
| self, | |
| t: float, | |
| state: np.ndarray, | |
| params: dict[str, float], | |
| ) -> np.ndarray: | |
| """Time derivatives at time ``t``. | |
| ``state`` is laid out in the order of :attr:`state_variables`. The | |
| return value must be a 1-D array of the same length. | |
| """ | |
| def ground_truth_equation(self) -> str: | |
| """Canonical SymPy-grammar string of the equation of motion. | |
| Used for logging only. Never surfaced to the agent during training | |
| or inference. | |
| """ | |
| def hint(self, parameters: dict[str, float]) -> str: | |
| """Render the natural-language hint for the agent. | |
| Default behaviour formats :attr:`hint_template` against | |
| ``parameters``. Subclasses may override for more flexibility. | |
| """ | |
| try: | |
| return self.hint_template.format(**parameters) | |
| except (KeyError, IndexError): | |
| return self.hint_template | |
| # -------------------------------------------------------------- behaviour | |
| def simulate(self, rng: np.random.Generator) -> TrajectoryData: | |
| """Generate one noisy trajectory for an episode.""" | |
| params = self.sample_parameters(rng) | |
| ic = self.sample_initial_conditions(rng) | |
| timestamps = np.linspace(0.0, self.duration, self.n_timesteps) | |
| initial_state = np.array([ic[var] for var in self.state_variables], dtype=float) | |
| # scipy.integrate.odeint expects f(state, t, *args); we wrap to put | |
| # `t` second and pass params via closure. | |
| def _rhs_wrapper(state: np.ndarray, t: float) -> np.ndarray: | |
| return self.rhs(t, state, params) | |
| clean = odeint(_rhs_wrapper, initial_state, timestamps, full_output=False) | |
| states: dict[str, np.ndarray] = {} | |
| for col, var in enumerate(self.state_variables): | |
| clean_col = clean[:, col] | |
| scale = max(float(np.std(clean_col)), 1e-6) | |
| noise = rng.normal(0.0, self.noise_std * scale, size=clean_col.shape) | |
| states[var] = clean_col + noise | |
| # Cache for downstream access (parameters surfaced to the env). | |
| self.parameters = params | |
| self.initial_conditions = ic | |
| return TrajectoryData( | |
| timestamps=timestamps, | |
| states=states, | |
| initial_conditions=ic, | |
| state_variables=self.state_variables, | |
| ) | |