"""Core Identity Environment Client. Provides the async EnvClient for connecting to a CoreIdentityEnvironment server. Supports both async (recommended) and synchronous usage via .sync() wrapper. Example (async): >>> import asyncio >>> from core_identity_env import CoreIdentityEnv, CoreIdentityAction, VerificationResult >>> >>> async def main(): ... async with CoreIdentityEnv(base_url="http://localhost:8000") as env: ... result = await env.reset() ... obs = result.observation ... print(obs.document) ... ... action = CoreIdentityAction( ... verification=VerificationResult( ... verified=True, ... confidence=0.95, ... issues=[], ... ), ... submit=True, ... ) ... result = await env.step(action) ... print(result.reward) ... >>> asyncio.run(main()) Example (from Hugging Face Space): >>> env = await CoreIdentityEnv.from_env("openenv/core-identity-env") >>> try: ... result = await env.reset() ... finally: ... await env.close() """ from typing import Any, Dict try: from openenv.core.env_client import EnvClient from openenv.core.client_types import StepResult except ImportError: from openenv.core.env_client import EnvClient from openenv.core.client_types import StepResult from core_identity_env.models import CoreIdentityObservation, CoreIdentityAction class CoreIdentityEnv(EnvClient[CoreIdentityAction, CoreIdentityObservation, Dict[str, Any]]): """Async client for the Core Identity Environment. Connects to a running CoreIdentityEnvironment server via WebSocket and provides step/reset/state methods for interacting with the environment. Inherits from EnvClient: - Async by default — use ``async with`` and ``await`` - Sync wrapper — call ``.sync()`` for synchronous usage - ``from_env(repo_id)`` — connect to a Hugging Face Space - ``from_docker_image(image)`` — spin up a local Docker container """ def _step_payload(self, action: CoreIdentityAction) -> Dict[str, Any]: """Serialise CoreIdentityAction to JSON payload for the server.""" return action.model_dump() def _parse_result(self, payload: Dict[str, Any]) -> "StepResult[CoreIdentityObservation]": """Deserialise server JSON response to a typed StepResult.""" metadata = payload.get("metadata", payload) obs_data = metadata.get("observation", metadata) reward_data = metadata.get("reward", {}) done = payload.get("done", metadata.get("done", False)) reward_value = payload.get("reward", reward_data.get("value", 0.0)) try: observation = CoreIdentityObservation.model_validate(obs_data) except Exception: observation = CoreIdentityObservation( task_id=obs_data.get("task_id", "unknown"), task_type=obs_data.get("task_type", "document_verification"), task_name=obs_data.get("task_name", "unknown"), task_description=obs_data.get("task_description", ""), difficulty=obs_data.get("difficulty", "medium"), challenge_data=obs_data.get("challenge_data", {}), ) return StepResult( observation=observation, reward=reward_value, done=done, info=metadata.get("info", {}), ) def _parse_state(self, payload: Dict[str, Any]) -> Dict[str, Any]: """Return raw state dict from server.""" return payload