# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """Container Yard Environment Client.""" from typing import Dict from openenv.core import EnvClient from openenv.core.client_types import StepResult from openenv.core.env_server.types import State from .models import ContainerYardAction, ContainerYardObservation class ContainerYardEnv( EnvClient[ContainerYardAction, ContainerYardObservation, State] ): """ Client for the Container Yard Environment. This client maintains a persistent WebSocket connection to the environment server, enabling efficient multi-step interactions with lower latency. Each client instance has its own dedicated environment session on the server. Example: >>> # Connect to a running server >>> with ContainerYardEnv(base_url="http://localhost:8000") as client: ... result = client.reset() ... print(result.observation.echoed_message) ... ... result = client.step(ContainerYardAction(message="Hello!")) ... print(result.observation.echoed_message) Example with Docker: >>> # Automatically start container and connect >>> client = ContainerYardEnv.from_docker_image("Container_Yard-env:latest") >>> try: ... result = client.reset() ... result = client.step(ContainerYardAction(message="Test")) ... finally: ... client.close() """ def _step_payload(self, action: ContainerYardAction) -> Dict: """ Convert ContainerYardAction to JSON payload for step message. Args: action: ContainerYardAction instance Returns: Dictionary representation suitable for JSON encoding """ return { "stack_index": action.stack_index, } def _parse_result(self, payload: Dict) -> StepResult[ContainerYardObservation]: """ Parse server response into StepResult[ContainerYardObservation]. Args: payload: JSON response data from server Returns: StepResult with ContainerYardObservation """ obs_data = payload.get("observation", {}) observation = ContainerYardObservation( stacks=obs_data.get("stacks", []), containers_placed=obs_data.get("containers_placed", 0), total_containers=obs_data.get("total_containers", 0), current_container_id=obs_data.get("current_container_id", -1), current_container_priority=obs_data.get("current_container_priority", 0), rehandles_so_far=obs_data.get("rehandles_so_far", 0), num_stacks=obs_data.get("num_stacks", 10), max_stack_height=obs_data.get("max_stack_height", 5), action_error=obs_data.get("action_error"), done=payload.get("done", False), reward=payload.get("reward", 0.0), ) return StepResult( observation=observation, reward=payload.get("reward", 0.0), done=payload.get("done", False), ) def _parse_state(self, payload: Dict) -> State: """ Parse server response into State object. Args: payload: JSON response from state request Returns: State object with episode_id and step_count """ return State( episode_id=payload.get("episode_id"), step_count=payload.get("step_count", 0), )