core-identity-env / client.py
hirann
Add Core Identity environment for OpenEnv
1f9fc8c
"""Core Identity Environment Client.
Provides the async EnvClient for connecting to a CoreIdentityEnvironment server.
Supports both async (recommended) and synchronous usage via .sync() wrapper.
Example (async):
>>> import asyncio
>>> from core_identity_env import CoreIdentityEnv, CoreIdentityAction, VerificationResult
>>>
>>> async def main():
... async with CoreIdentityEnv(base_url="http://localhost:8000") as env:
... result = await env.reset()
... obs = result.observation
... print(obs.document)
...
... action = CoreIdentityAction(
... verification=VerificationResult(
... verified=True,
... confidence=0.95,
... issues=[],
... ),
... submit=True,
... )
... result = await env.step(action)
... print(result.reward)
...
>>> asyncio.run(main())
Example (from Hugging Face Space):
>>> env = await CoreIdentityEnv.from_env("openenv/core-identity-env")
>>> try:
... result = await env.reset()
... finally:
... await env.close()
"""
from typing import Any, Dict
try:
from openenv.core.env_client import EnvClient
from openenv.core.client_types import StepResult
except ImportError:
from openenv.core.env_client import EnvClient
from openenv.core.client_types import StepResult
from core_identity_env.models import CoreIdentityObservation, CoreIdentityAction
class CoreIdentityEnv(EnvClient[CoreIdentityAction, CoreIdentityObservation, Dict[str, Any]]):
"""Async client for the Core Identity Environment.
Connects to a running CoreIdentityEnvironment server via WebSocket and
provides step/reset/state methods for interacting with the environment.
Inherits from EnvClient:
- Async by default β€” use ``async with`` and ``await``
- Sync wrapper β€” call ``.sync()`` for synchronous usage
- ``from_env(repo_id)`` β€” connect to a Hugging Face Space
- ``from_docker_image(image)`` β€” spin up a local Docker container
"""
def _step_payload(self, action: CoreIdentityAction) -> Dict[str, Any]:
"""Serialise CoreIdentityAction to JSON payload for the server."""
return action.model_dump()
def _parse_result(self, payload: Dict[str, Any]) -> "StepResult[CoreIdentityObservation]":
"""Deserialise server JSON response to a typed StepResult."""
metadata = payload.get("metadata", payload)
obs_data = metadata.get("observation", metadata)
reward_data = metadata.get("reward", {})
done = payload.get("done", metadata.get("done", False))
reward_value = payload.get("reward", reward_data.get("value", 0.0))
try:
observation = CoreIdentityObservation.model_validate(obs_data)
except Exception:
observation = CoreIdentityObservation(
task_id=obs_data.get("task_id", "unknown"),
task_type=obs_data.get("task_type", "document_verification"),
task_name=obs_data.get("task_name", "unknown"),
task_description=obs_data.get("task_description", ""),
difficulty=obs_data.get("difficulty", "medium"),
challenge_data=obs_data.get("challenge_data", {}),
)
return StepResult(
observation=observation,
reward=reward_value,
done=done,
info=metadata.get("info", {}),
)
def _parse_state(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Return raw state dict from server."""
return payload