File size: 3,699 Bytes
1f9fc8c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""Core Identity Environment Client.

Provides the async EnvClient for connecting to a CoreIdentityEnvironment server.
Supports both async (recommended) and synchronous usage via .sync() wrapper.

Example (async):
    >>> import asyncio
    >>> from core_identity_env import CoreIdentityEnv, CoreIdentityAction, VerificationResult
    >>>
    >>> async def main():
    ...     async with CoreIdentityEnv(base_url="http://localhost:8000") as env:
    ...         result = await env.reset()
    ...         obs = result.observation
    ...         print(obs.document)
    ...
    ...         action = CoreIdentityAction(
    ...             verification=VerificationResult(
    ...                 verified=True,
    ...                 confidence=0.95,
    ...                 issues=[],
    ...             ),
    ...             submit=True,
    ...         )
    ...         result = await env.step(action)
    ...         print(result.reward)
    ...
    >>> asyncio.run(main())

Example (from Hugging Face Space):
    >>> env = await CoreIdentityEnv.from_env("openenv/core-identity-env")
    >>> try:
    ...     result = await env.reset()
    ... finally:
    ...     await env.close()
"""

from typing import Any, Dict

try:
    from openenv.core.env_client import EnvClient
    from openenv.core.client_types import StepResult
except ImportError:
    from openenv.core.env_client import EnvClient
    from openenv.core.client_types import StepResult

from core_identity_env.models import CoreIdentityObservation, CoreIdentityAction


class CoreIdentityEnv(EnvClient[CoreIdentityAction, CoreIdentityObservation, Dict[str, Any]]):
    """Async client for the Core Identity Environment.

    Connects to a running CoreIdentityEnvironment server via WebSocket and
    provides step/reset/state methods for interacting with the environment.

    Inherits from EnvClient:
      - Async by default — use ``async with`` and ``await``
      - Sync wrapper — call ``.sync()`` for synchronous usage
      - ``from_env(repo_id)`` — connect to a Hugging Face Space
      - ``from_docker_image(image)`` — spin up a local Docker container
    """

    def _step_payload(self, action: CoreIdentityAction) -> Dict[str, Any]:
        """Serialise CoreIdentityAction to JSON payload for the server."""
        return action.model_dump()

    def _parse_result(self, payload: Dict[str, Any]) -> "StepResult[CoreIdentityObservation]":
        """Deserialise server JSON response to a typed StepResult."""
        metadata = payload.get("metadata", payload)
        obs_data = metadata.get("observation", metadata)
        reward_data = metadata.get("reward", {})
        done = payload.get("done", metadata.get("done", False))
        reward_value = payload.get("reward", reward_data.get("value", 0.0))

        try:
            observation = CoreIdentityObservation.model_validate(obs_data)
        except Exception:
            observation = CoreIdentityObservation(
                task_id=obs_data.get("task_id", "unknown"),
                task_type=obs_data.get("task_type", "document_verification"),
                task_name=obs_data.get("task_name", "unknown"),
                task_description=obs_data.get("task_description", ""),
                difficulty=obs_data.get("difficulty", "medium"),
                challenge_data=obs_data.get("challenge_data", {}),
            )

        return StepResult(
            observation=observation,
            reward=reward_value,
            done=done,
            info=metadata.get("info", {}),
        )

    def _parse_state(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        """Return raw state dict from server."""
        return payload