File size: 9,182 Bytes
cc75d6e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f59c3d3
cc75d6e
 
f59c3d3
cc75d6e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f59c3d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc75d6e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""
Container Yard Environment Implementation.

Simulates a port container yard where containers arrive sequentially with different
retrieval priorities (1-3). The objective is to place containers into stacks to
minimize rehandles during retrieval operations.
"""

from uuid import uuid4
from typing import List, Tuple
import random
from pathlib import Path

from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import EnvironmentMetadata, State

try:
    from ..models import ContainerYardAction, ContainerYardObservation, Container
except ImportError:
    from models import ContainerYardAction, ContainerYardObservation, Container


class ContainerYardEnvironment(Environment):
    """
    Container Yard environment for the hackathon challenge.
    
    Containers arrive with priorities 1-3 (1=earliest retrieval, 3=latest).
    Each stack can hold up to max_stack_height containers.
    Agents must place containers to minimize rehandles during retrieval.
    """

    SUPPORTS_CONCURRENT_SESSIONS: bool = True

    def __init__(self, task_name: str = "medium"):
        """
        Initialize the Container Yard environment.
        
        Args:
            task_name: "easy" (5 containers, all priority 1), 
                      "medium" (10 containers, priority 1-2),
                      "hard" (15 containers, priority 1-3)
        """
        self._state = State(episode_id=str(uuid4()), step_count=0)
        self.task_name = task_name
        self._setup_task(task_name)
        
        # Environment state
        self.containers: List[Container] = []
        self.stacks: List[List[int]] = [[] for _ in range(self.num_stacks)]
        self.current_container_idx = 0
        self.rehandles = 0
        self.placement_history: List[Tuple[int, int]] = []  # (container_id, stack_idx)
        
    def _setup_task(self, task_name: str):
        """Configure environment parameters based on task difficulty."""
        tasks = {
            "easy": {"num_containers": 5, "num_stacks": 5, "max_height": 5, "priorities": [1, 1, 1, 1, 1]},
            "medium": {"num_containers": 10, "num_stacks": 8, "max_height": 4, 
                      "priorities": [1, 1, 1, 1, 2, 2, 2, 1, 2, 1]},
            "hard": {"num_containers": 15, "num_stacks": 10, "max_height": 3,
                    "priorities": [1, 2, 1, 3, 2, 1, 3, 2, 1, 2, 3, 1, 2, 3, 1]},
        }
        
        config = tasks.get(task_name, tasks["medium"])
        self.num_containers = config["num_containers"]
        self.num_stacks = config["num_stacks"]
        self.max_stack_height = config["max_height"]
        self.priorities = config["priorities"]

    def get_metadata(self) -> EnvironmentMetadata:
        """Return environment metadata, including README content for the web UI."""
        readme_content = None
        readme_path = Path(__file__).resolve().parents[1] / "README.md"
        if readme_path.exists():
            readme_content = readme_path.read_text(encoding="utf-8")

        return EnvironmentMetadata(
            name="Container Yard",
            description=(
                "Port container yard simulation where agents place arriving containers "
                "to minimize retrieval rehandles."
            ),
            readme_content=readme_content,
            version="0.1.0",
            author="Draken1606",
        )

    def reset(self) -> ContainerYardObservation:
        """
        Reset the environment for a new episode.
        
        Returns:
            Initial observation
        """
        self._state = State(episode_id=str(uuid4()), step_count=0)
        
        # Initialize containers with priorities
        self.containers = [
            Container(i, self.priorities[i]) for i in range(self.num_containers)
        ]
        
        # Shuffle container arrival order
        random.shuffle(self.containers)
        
        self.stacks = [[] for _ in range(self.num_stacks)]
        self.current_container_idx = 0
        self.rehandles = 0
        self.placement_history = []
        
        return self._get_observation(action_error=None)

    def step(self, action: ContainerYardAction) -> ContainerYardObservation:
        """
        Execute one step: place current container in specified stack.
        
        Args:
            action: ContainerYardAction with stack_index
            
        Returns:
            ContainerYardObservation with updated yard state
        """
        self._state.step_count += 1
        error = None
        
        stack_idx = action.stack_index
        
        # Validate action
        if stack_idx < 0 or stack_idx >= self.num_stacks:
            error = f"Invalid stack index {stack_idx}. Valid range: 0-{self.num_stacks-1}"
            return self._get_observation(action_error=error)
        
        if len(self.stacks[stack_idx]) >= self.max_stack_height:
            error = f"Stack {stack_idx} is full (height={len(self.stacks[stack_idx])})"
            return self._get_observation(action_error=error)
        
        # Place container
        container = self.containers[self.current_container_idx]
        self.stacks[stack_idx].append(container.container_id)
        self.placement_history.append((container.container_id, stack_idx))
        
        # Check for rehandles caused by this placement
        rehandles_caused = self._count_rehandles_from_placement(stack_idx, container)
        self.rehandles += rehandles_caused
        
        self.current_container_idx += 1
        done = (self.current_container_idx >= self.num_containers)
        
        # Compute reward
        reward = self._compute_reward(container, stack_idx, rehandles_caused)
        
        obs = self._get_observation(action_error=error)
        obs.reward = reward
        obs.done = done
        
        return obs

    def _count_rehandles_from_placement(self, stack_idx: int, container: Container) -> int:
        """
        Count how many containers in this stack would need to be rehandled
        because this container is placed on top of them.
        
        Rehandle: container X is in stack with container Y below it,
        but X has LOWER priority (earlier retrieval) than Y.
        """
        rehandles = 0
        stack = self.stacks[stack_idx]
        
        if len(stack) < 2:
            return rehandles
        
        # Check all containers below the newly placed one
        newly_placed_priority = container.retrieval_priority
        for i in range(len(stack) - 1):
            below_id = stack[i]
            # Find the container with this ID to get its priority
            below_container = next(c for c in self.containers if c.container_id == below_id)
            if below_container.retrieval_priority > newly_placed_priority:
                rehandles += 1
        
        return rehandles

    def _compute_reward(self, container: Container, stack_idx: int, rehandles_caused: int) -> float:
        """Compute reward for placing a container."""
        reward = 0.0
        
        # Base reward: successful placement
        reward += 0.1
        
        # Penalty for rehandles
        reward -= rehandles_caused * 0.5
        
        # Bonus for efficient placement
        if rehandles_caused == 0:
            reward += 0.3
        
        # Bonus for stacking containers with same priority
        stack = self.stacks[stack_idx]
        if len(stack) > 1:
            below_id = stack[-2]
            below_container = next(c for c in self.containers if c.container_id == below_id)
            if below_container.retrieval_priority == container.retrieval_priority:
                reward += 0.2
        
        return reward

    def _get_observation(self, action_error: str = None) -> ContainerYardObservation:
        """Build observation from current state."""
        current_id = -1
        current_priority = 0
        
        if self.current_container_idx < len(self.containers):
            current_id = self.containers[self.current_container_idx].container_id
            current_priority = self.containers[self.current_container_idx].retrieval_priority + 1
        
        # Stacks already contain container IDs directly
        stacks_data = [list(stack) for stack in self.stacks]
        
        return ContainerYardObservation(
            stacks=stacks_data,
            containers_placed=self.current_container_idx,
            total_containers=self.num_containers,
            current_container_id=current_id,
            current_container_priority=current_priority,
            rehandles_so_far=self.rehandles,
            num_stacks=self.num_stacks,
            max_stack_height=self.max_stack_height,
            action_error=action_error,
            done=(self.current_container_idx >= self.num_containers),
            reward=0.0,
        )

    @property
    def state(self) -> State:
        """Get current environment state."""
        return self._state