ToolUseEnv / tool_use_env /client.py
Clove25's picture
Upload 53 files
18feac5 verified
# # Copyright (c) Meta Platforms, Inc. and affiliates.
# # All rights reserved.
# #
# # This source code is licensed under the BSD-style license found in the
# # LICENSE file in the root directory of this source tree.
# """Tool Use Env Environment Client."""
# from typing import Dict
# from openenv.core import EnvClient
# from openenv.core.client_types import StepResult
# from openenv.core.env_server.types import State
# from .models import ToolUseAction, ToolUseObservation
# class ToolUseEnv(
# EnvClient[ToolUseAction, ToolUseObservation, State]
# ):
# """
# Client for the Tool Use Env Environment.
# This client maintains a persistent WebSocket connection to the environment server,
# enabling efficient multi-step interactions with lower latency.
# Each client instance has its own dedicated environment session on the server.
# Example:
# >>> # Connect to a running server
# >>> with ToolUseEnv(base_url="http://localhost:8000") as client:
# ... result = client.reset()
# ... print(result.observation.echoed_message)
# ...
# ... result = client.step(ToolUseAction(message="Hello!"))
# ... print(result.observation.echoed_message)
# Example with Docker:
# >>> # Automatically start container and connect
# >>> client = ToolUseEnv.from_docker_image("tool_use_env-env:latest")
# >>> try:
# ... result = client.reset()
# ... result = client.step(ToolUseAction(message="Test"))
# ... finally:
# ... client.close()
# """
# def _step_payload(self, action: ToolUseAction) -> Dict:
# """
# Convert ToolUseAction to JSON payload for step message.
# Args:
# action: ToolUseAction instance
# Returns:
# Dictionary representation suitable for JSON encoding
# """
# return {
# "message": action.message,
# }
# def _parse_result(self, payload: Dict) -> StepResult[ToolUseObservation]:
# """
# Parse server response into StepResult[ToolUseObservation].
# Args:
# payload: JSON response data from server
# Returns:
# StepResult with ToolUseObservation
# """
# obs_data = payload.get("observation", {})
# observation = ToolUseObservation(
# echoed_message=obs_data.get("echoed_message", ""),
# message_length=obs_data.get("message_length", 0),
# done=payload.get("done", False),
# reward=payload.get("reward"),
# metadata=obs_data.get("metadata", {}),
# )
# return StepResult(
# observation=observation,
# reward=payload.get("reward"),
# done=payload.get("done", False),
# )
# def _parse_state(self, payload: Dict) -> State:
# """
# Parse server response into State object.
# Args:
# payload: JSON response from state request
# Returns:
# State object with episode_id and step_count
# """
# return State(
# episode_id=payload.get("episode_id"),
# step_count=payload.get("step_count", 0),
# )
from openenv.core.env_client import EnvClient
from openenv.core.client_types import StepResult
from tool_use_env.models import ToolUseAction, ToolUseObservation, ToolUseState
class ToolUseEnv(EnvClient[ToolUseAction, ToolUseObservation, ToolUseState]):
def _step_payload(self, action: ToolUseAction) -> dict:
return {
"action_type": action.action_type,
"artifact_id": action.artifact_id,
"query": action.query,
"message": action.message,
"resolution_code": action.resolution_code,
}
def _parse_result(self, payload: dict) -> StepResult:
obs_data = payload.get("observation", {})
observation = ToolUseObservation(
done=payload.get("done", False),
reward=payload.get("reward"),
task_id=obs_data.get("task_id", ""),
difficulty=obs_data.get("difficulty", "easy"),
objective=obs_data.get("objective", ""),
customer_message=obs_data.get("customer_message", ""),
workspace_summary=obs_data.get("workspace_summary", ""),
available_actions=obs_data.get("available_actions", []),
available_resolution_codes=obs_data.get("available_resolution_codes", []),
collected_evidence=obs_data.get("collected_evidence", []),
last_tool_result=obs_data.get("last_tool_result"),
last_action_error=obs_data.get("last_action_error"),
remaining_steps=obs_data.get("remaining_steps", 0),
current_score=obs_data.get("current_score", 0.0),
metadata=obs_data.get("metadata", {}),
)
return StepResult(
observation=observation,
reward=payload.get("reward"),
done=payload.get("done", False),
)
def _parse_state(self, payload: dict) -> ToolUseState:
return ToolUseState(
episode_id=payload.get("episode_id"),
step_count=payload.get("step_count", 0),
task_id=payload.get("task_id", ""),
task_name=payload.get("task_name", ""),
difficulty=payload.get("difficulty", ""),
objective=payload.get("objective", ""),
cumulative_reward=payload.get("cumulative_reward", 0.0),
final_score=payload.get("final_score", 0.0),
drafted_reply=payload.get("drafted_reply"),
resolution_code=payload.get("resolution_code"),
expected_resolution_code=payload.get("expected_resolution_code", ""),
required_evidence=payload.get("required_evidence", []),
collected_evidence=payload.get("collected_evidence", []),
action_history=payload.get("action_history", []),
repeat_action_count=payload.get("repeat_action_count", 0),
last_action_error=payload.get("last_action_error"),
known_artifacts=payload.get("known_artifacts", {}),
known_policies=payload.get("known_policies", {}),
)