| |
| |
|
|
| """ |
| RANSEnv — OpenEnv client for the RANS spacecraft navigation environment. |
| |
| Usage (synchronous):: |
| |
| from rans_env import RANSEnv, SpacecraftAction |
| |
| with RANSEnv(base_url="http://localhost:8000").sync() as env: |
| result = env.reset() |
| n = len(result.observation.thruster_masks) |
| result = env.step(SpacecraftAction(thrusters=[1, 0, 0, 0, 0, 0, 0, 0])) |
| print(result.reward, result.done) |
| |
| Usage (async):: |
| |
| import asyncio |
| from rans_env import RANSEnv, SpacecraftAction |
| |
| async def main(): |
| async with RANSEnv(base_url="http://localhost:8000") as env: |
| result = await env.reset() |
| result = await env.step(SpacecraftAction(thrusters=[0.0] * 8)) |
| print(result.reward, result.done) |
| |
| asyncio.run(main()) |
| |
| Docker:: |
| |
| env = RANSEnv.from_docker_image("rans-env:latest", env={"RANS_TASK": "GoToPose"}) |
| |
| HuggingFace Spaces:: |
| |
| env = RANSEnv.from_env("dpang/rans-env") |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import Any, Dict |
|
|
| try: |
| from openenv.core.env_client import EnvClient, StepResult |
| _OPENENV_AVAILABLE = True |
| except ImportError: |
| EnvClient = object |
| StepResult = None |
| _OPENENV_AVAILABLE = False |
|
|
| from rans_env.models import SpacecraftAction, SpacecraftObservation, SpacecraftState |
|
|
|
|
| class RANSEnv(EnvClient): |
| """ |
| Client for the RANS spacecraft navigation OpenEnv environment. |
| |
| Implements the three ``EnvClient`` abstract methods that handle |
| JSON serialisation of actions and deserialisation of observations. |
| |
| Parameters |
| ---------- |
| base_url: |
| HTTP/WebSocket URL of the running server, |
| e.g. ``"http://localhost:8000"`` or ``"ws://localhost:8000"``. |
| """ |
|
|
| |
| |
| |
|
|
| def _step_payload(self, action: SpacecraftAction) -> Dict[str, Any]: |
| """Serialise SpacecraftAction → JSON dict for the WebSocket message.""" |
| return {"thrusters": action.thrusters} |
|
|
| def _parse_result(self, payload: Dict[str, Any]) -> "StepResult[SpacecraftObservation]": |
| """ |
| Deserialise the server response into a typed StepResult. |
| |
| The server sends:: |
| |
| { |
| "observation": { "state_obs": [...], "thruster_transforms": [...], |
| "thruster_masks": [...], "mass": 10.0, "inertia": 0.5, |
| "task": "GoToPosition", "reward": 0.42, "done": false, |
| "info": {...} }, |
| "reward": 0.42, |
| "done": false |
| } |
| """ |
| obs_dict = payload.get("observation", payload) |
| observation = SpacecraftObservation( |
| state_obs=obs_dict.get("state_obs", []), |
| thruster_transforms=obs_dict.get("thruster_transforms", []), |
| thruster_masks=obs_dict.get("thruster_masks", []), |
| mass=obs_dict.get("mass", 10.0), |
| inertia=obs_dict.get("inertia", 0.5), |
| task=obs_dict.get("task", "GoToPosition"), |
| reward=float(obs_dict.get("reward") or 0.0), |
| done=bool(obs_dict.get("done", False)), |
| info=obs_dict.get("info", {}), |
| ) |
| return StepResult( |
| observation=observation, |
| reward=payload.get("reward") or observation.reward, |
| done=payload.get("done", observation.done), |
| ) |
|
|
| def _parse_state(self, payload: Dict[str, Any]) -> SpacecraftState: |
| """Deserialise the /state response into a SpacecraftState.""" |
| return SpacecraftState( |
| episode_id=payload.get("episode_id", ""), |
| step_count=payload.get("step_count", 0), |
| task=payload.get("task", "GoToPosition"), |
| x=payload.get("x", 0.0), |
| y=payload.get("y", 0.0), |
| heading_rad=payload.get("heading_rad", 0.0), |
| vx=payload.get("vx", 0.0), |
| vy=payload.get("vy", 0.0), |
| angular_velocity_rads=payload.get("angular_velocity_rads", 0.0), |
| total_reward=payload.get("total_reward", 0.0), |
| goal_reached=payload.get("goal_reached", False), |
| ) |
|
|