Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """Code Review Environment Client.""" | |
| from typing import Dict | |
| from openenv.core import EnvClient | |
| from openenv.core.client_types import StepResult | |
| from openenv.core.env_server.types import State | |
| from .models import ( | |
| CodeReviewAction, | |
| CodeReviewObservation, | |
| CodeReviewReward, | |
| CodeReviewPullRequest, | |
| ) | |
| class CodeReviewEnv(EnvClient[CodeReviewAction, CodeReviewObservation, State]): | |
| """ | |
| Client for the Code Review Environment. | |
| This client maintains a persistent WebSocket connection to the environment server, | |
| enabling efficient multi-step interactions with lower latency. | |
| Each client instance has its own dedicated environment session on the server. | |
| Example: | |
| >>> # Connect to a running server | |
| >>> with CodeReviewEnv(base_url="http://localhost:8000") as client: | |
| ... result = client.reset() | |
| ... print(result.observation.echoed_message) | |
| ... | |
| ... result = client.step(CodeReviewAction(message="Hello!")) | |
| ... print(result.observation.echoed_message) | |
| Example with Docker: | |
| >>> # Automatically start container and connect | |
| >>> client = CodeReviewEnv.from_docker_image("code_review-env:latest") | |
| >>> try: | |
| ... result = client.reset() | |
| ... result = client.step(CodeReviewAction(message="Test")) | |
| ... finally: | |
| ... client.close() | |
| """ | |
| def _step_payload(self, action: CodeReviewAction) -> Dict: | |
| # print("Action == ", action) | |
| # Handle dict input | |
| if isinstance(action, dict): | |
| act = { | |
| "action_type": action.get("action_type"), | |
| "comment": action.get("comment"), | |
| "suggested_code": action.get("suggested_code"), | |
| "decision": action.get("decision"), | |
| } | |
| else: | |
| act = { | |
| "action_type": action.action_type, | |
| "comment": action.comment, | |
| "suggested_code": action.suggested_code, | |
| "decision": action.decision, | |
| } | |
| # print("Act == ", act) | |
| return act | |
| def _parse_result(self, payload: Dict) -> StepResult[CodeReviewObservation]: | |
| """ | |
| Parse server response into StepResult[CodeReviewObservation]. | |
| Args: | |
| payload: JSON response data from server | |
| Returns: | |
| StepResult with CodeReviewObservation | |
| """ | |
| """ | |
| return CodeReviewObservation( | |
| #echoed_message="Code Review environment ready!", | |
| pr=self.pr, | |
| previous_comments=self.history, | |
| step_count=self.step_count, | |
| max_steps=self.max_steps, | |
| reward=0.0, | |
| done=False, | |
| ) | |
| """ | |
| # print("Payload ====== ", payload) | |
| obs_data = payload.get("observation") or {} | |
| if "observation" in obs_data: # nested case | |
| obs_data = obs_data["observation"] | |
| if not obs_data or "pr" not in obs_data: | |
| raise ValueError(f"Invalid observation payload: {payload}") | |
| pr_data = obs_data["pr"] | |
| observation = CodeReviewObservation( | |
| pr=CodeReviewPullRequest(**pr_data), | |
| previous_comments=obs_data.get("previous_comments") or [], | |
| step_count=obs_data.get("step_count", 0), | |
| max_steps=obs_data.get("max_steps", 3), | |
| ) | |
| # Handle reward (reset vs step) | |
| reward_data = payload.get("reward") | |
| reward = None | |
| if reward_data is not None: | |
| try: | |
| reward = float(reward_data) | |
| except Exception: | |
| reward = None | |
| return StepResult( | |
| observation=observation, | |
| reward=reward, | |
| done=payload.get("done", False), | |
| ) | |
| def _parse_state(self, payload: Dict) -> State: | |
| """ | |
| Parse server response into State object. | |
| Args: | |
| payload: JSON response from state request | |
| Returns: | |
| State object with episode_id and step_count | |
| """ | |
| return State( | |
| episode_id=payload.get("episode_id"), | |
| step_count=payload.get("step_count", 0), | |
| ) | |