Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Container Yard Environment Implementation. | |
| Simulates a port container yard where containers arrive sequentially with different | |
| retrieval priorities (1-3). The objective is to place containers into stacks to | |
| minimize rehandles during retrieval operations. | |
| """ | |
| from uuid import uuid4 | |
| from typing import List, Tuple | |
| import random | |
| from pathlib import Path | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import EnvironmentMetadata, State | |
| try: | |
| from ..models import ContainerYardAction, ContainerYardObservation, Container | |
| except ImportError: | |
| from models import ContainerYardAction, ContainerYardObservation, Container | |
| class ContainerYardEnvironment(Environment): | |
| """ | |
| Container Yard environment for the hackathon challenge. | |
| Containers arrive with priorities 1-3 (1=earliest retrieval, 3=latest). | |
| Each stack can hold up to max_stack_height containers. | |
| Agents must place containers to minimize rehandles during retrieval. | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self, task_name: str = "medium"): | |
| """ | |
| Initialize the Container Yard environment. | |
| Args: | |
| task_name: "easy" (5 containers, all priority 1), | |
| "medium" (10 containers, priority 1-2), | |
| "hard" (15 containers, priority 1-3) | |
| """ | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self.task_name = task_name | |
| self._setup_task(task_name) | |
| # Environment state | |
| self.containers: List[Container] = [] | |
| self.stacks: List[List[int]] = [[] for _ in range(self.num_stacks)] | |
| self.current_container_idx = 0 | |
| self.rehandles = 0 | |
| self.placement_history: List[Tuple[int, int]] = [] # (container_id, stack_idx) | |
| def _setup_task(self, task_name: str): | |
| """Configure environment parameters based on task difficulty.""" | |
| tasks = { | |
| "easy": {"num_containers": 5, "num_stacks": 5, "max_height": 5, "priorities": [1, 1, 1, 1, 1]}, | |
| "medium": {"num_containers": 10, "num_stacks": 8, "max_height": 4, | |
| "priorities": [1, 1, 1, 1, 2, 2, 2, 1, 2, 1]}, | |
| "hard": {"num_containers": 15, "num_stacks": 10, "max_height": 3, | |
| "priorities": [1, 2, 1, 3, 2, 1, 3, 2, 1, 2, 3, 1, 2, 3, 1]}, | |
| } | |
| config = tasks.get(task_name, tasks["medium"]) | |
| self.num_containers = config["num_containers"] | |
| self.num_stacks = config["num_stacks"] | |
| self.max_stack_height = config["max_height"] | |
| self.priorities = config["priorities"] | |
| def get_metadata(self) -> EnvironmentMetadata: | |
| """Return environment metadata, including README content for the web UI.""" | |
| readme_content = None | |
| readme_path = Path(__file__).resolve().parents[1] / "README.md" | |
| if readme_path.exists(): | |
| readme_content = readme_path.read_text(encoding="utf-8") | |
| return EnvironmentMetadata( | |
| name="Container Yard", | |
| description=( | |
| "Port container yard simulation where agents place arriving containers " | |
| "to minimize retrieval rehandles." | |
| ), | |
| readme_content=readme_content, | |
| version="0.1.0", | |
| author="Draken1606", | |
| ) | |
| def reset(self) -> ContainerYardObservation: | |
| """ | |
| Reset the environment for a new episode. | |
| Returns: | |
| Initial observation | |
| """ | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| # Initialize containers with priorities | |
| self.containers = [ | |
| Container(i, self.priorities[i]) for i in range(self.num_containers) | |
| ] | |
| # Shuffle container arrival order | |
| random.shuffle(self.containers) | |
| self.stacks = [[] for _ in range(self.num_stacks)] | |
| self.current_container_idx = 0 | |
| self.rehandles = 0 | |
| self.placement_history = [] | |
| return self._get_observation(action_error=None) | |
| def step(self, action: ContainerYardAction) -> ContainerYardObservation: | |
| """ | |
| Execute one step: place current container in specified stack. | |
| Args: | |
| action: ContainerYardAction with stack_index | |
| Returns: | |
| ContainerYardObservation with updated yard state | |
| """ | |
| self._state.step_count += 1 | |
| error = None | |
| stack_idx = action.stack_index | |
| # Validate action | |
| if stack_idx < 0 or stack_idx >= self.num_stacks: | |
| error = f"Invalid stack index {stack_idx}. Valid range: 0-{self.num_stacks-1}" | |
| return self._get_observation(action_error=error) | |
| if len(self.stacks[stack_idx]) >= self.max_stack_height: | |
| error = f"Stack {stack_idx} is full (height={len(self.stacks[stack_idx])})" | |
| return self._get_observation(action_error=error) | |
| # Place container | |
| container = self.containers[self.current_container_idx] | |
| self.stacks[stack_idx].append(container.container_id) | |
| self.placement_history.append((container.container_id, stack_idx)) | |
| # Check for rehandles caused by this placement | |
| rehandles_caused = self._count_rehandles_from_placement(stack_idx, container) | |
| self.rehandles += rehandles_caused | |
| self.current_container_idx += 1 | |
| done = (self.current_container_idx >= self.num_containers) | |
| # Compute reward | |
| reward = self._compute_reward(container, stack_idx, rehandles_caused) | |
| obs = self._get_observation(action_error=error) | |
| obs.reward = reward | |
| obs.done = done | |
| return obs | |
| def _count_rehandles_from_placement(self, stack_idx: int, container: Container) -> int: | |
| """ | |
| Count how many containers in this stack would need to be rehandled | |
| because this container is placed on top of them. | |
| Rehandle: container X is in stack with container Y below it, | |
| but X has LOWER priority (earlier retrieval) than Y. | |
| """ | |
| rehandles = 0 | |
| stack = self.stacks[stack_idx] | |
| if len(stack) < 2: | |
| return rehandles | |
| # Check all containers below the newly placed one | |
| newly_placed_priority = container.retrieval_priority | |
| for i in range(len(stack) - 1): | |
| below_id = stack[i] | |
| # Find the container with this ID to get its priority | |
| below_container = next(c for c in self.containers if c.container_id == below_id) | |
| if below_container.retrieval_priority > newly_placed_priority: | |
| rehandles += 1 | |
| return rehandles | |
| def _compute_reward(self, container: Container, stack_idx: int, rehandles_caused: int) -> float: | |
| """Compute reward for placing a container.""" | |
| reward = 0.0 | |
| # Base reward: successful placement | |
| reward += 0.1 | |
| # Penalty for rehandles | |
| reward -= rehandles_caused * 0.5 | |
| # Bonus for efficient placement | |
| if rehandles_caused == 0: | |
| reward += 0.3 | |
| # Bonus for stacking containers with same priority | |
| stack = self.stacks[stack_idx] | |
| if len(stack) > 1: | |
| below_id = stack[-2] | |
| below_container = next(c for c in self.containers if c.container_id == below_id) | |
| if below_container.retrieval_priority == container.retrieval_priority: | |
| reward += 0.2 | |
| return reward | |
| def _get_observation(self, action_error: str = None) -> ContainerYardObservation: | |
| """Build observation from current state.""" | |
| current_id = -1 | |
| current_priority = 0 | |
| if self.current_container_idx < len(self.containers): | |
| current_id = self.containers[self.current_container_idx].container_id | |
| current_priority = self.containers[self.current_container_idx].retrieval_priority + 1 | |
| # Stacks already contain container IDs directly | |
| stacks_data = [list(stack) for stack in self.stacks] | |
| return ContainerYardObservation( | |
| stacks=stacks_data, | |
| containers_placed=self.current_container_idx, | |
| total_containers=self.num_containers, | |
| current_container_id=current_id, | |
| current_container_priority=current_priority, | |
| rehandles_so_far=self.rehandles, | |
| num_stacks=self.num_stacks, | |
| max_stack_height=self.max_stack_height, | |
| action_error=action_error, | |
| done=(self.current_container_idx >= self.num_containers), | |
| reward=0.0, | |
| ) | |
| def state(self) -> State: | |
| """Get current environment state.""" | |
| return self._state | |