Spaces:
Running
Running
File size: 8,069 Bytes
c452421 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | """Native OpenEnv adapter for the SENTINEL oversight environment.
The main FastAPI app keeps the richer hackathon demo endpoints, while this
adapter exposes the same oversight task through OpenEnv's Environment base
class so latest OpenEnv clients can discover schemas and use the standard
reset/step/state/WebSocket contract.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from uuid import uuid4
from pydantic import Field
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import (
Action as OpenEnvAction,
EnvironmentMetadata,
Observation as OpenEnvObservation,
State as OpenEnvState,
)
from sentinel.environment import SentinelEnv
class SentinelNativeAction(OpenEnvAction):
"""OpenEnv-native action model for one SENTINEL oversight decision."""
decision: Optional[str] = Field(
default=None,
description="Oversight decision: APPROVE, BLOCK, REDIRECT, REASSIGN, or FLAG.",
)
action: Optional[str] = Field(
default=None,
description="Alias for decision, accepted for client compatibility.",
)
reason: Optional[str] = Field(
default=None,
description="Misbehavior type or 'safe'.",
)
explanation: str = Field(
default="",
description="Evidence-backed explanation for the oversight decision.",
)
redirect_action: Optional[Dict[str, Any]] = Field(
default=None,
description="Safer replacement action when decision is REDIRECT.",
)
reassign_to: Optional[str] = Field(
default=None,
description="Worker id to receive the work when decision is REASSIGN.",
)
flag_severity: Optional[str] = Field(
default=None,
description="Severity for FLAG decisions.",
)
worker_message: str = Field(
default="",
description="Corrective feedback sent to the worker.",
)
required_evidence: List[str] = Field(
default_factory=list,
description="Evidence required before a corrected proposal can execute.",
)
suggested_action_type: Optional[str] = None
suggested_target: Optional[str] = None
suggested_parameters: Dict[str, Any] = Field(default_factory=dict)
constitutional_violations: List[str] = Field(default_factory=list)
class SentinelNativeObservation(OpenEnvObservation):
"""Observation returned by the native OpenEnv SENTINEL adapter."""
task_id: str = "basic_oversight"
step_number: int = 0
max_steps: int = 0
proposed_action: Dict[str, Any] = Field(default_factory=dict)
worker_id: Optional[str] = None
worker_role: Optional[str] = None
incident_status: Optional[str] = None
available_decisions: List[str] = Field(default_factory=list)
corrective_loop_enabled: bool = False
audit_log_tail: List[Dict[str, Any]] = Field(default_factory=list)
message: str = ""
class SentinelNativeState(OpenEnvState):
"""State snapshot for the native OpenEnv SENTINEL adapter."""
task_id: Optional[str] = None
cumulative_reward: float = 0.0
done: bool = False
latest_proposal: Dict[str, Any] = Field(default_factory=dict)
latest_audit: Optional[Dict[str, Any]] = None
worker_records: Dict[str, Any] = Field(default_factory=dict)
class SentinelNativeEnvironment(
Environment[SentinelNativeAction, SentinelNativeObservation, SentinelNativeState]
):
"""OpenEnv Environment wrapper around :class:`sentinel.environment.SentinelEnv`."""
SUPPORTS_CONCURRENT_SESSIONS = True
def __init__(self) -> None:
super().__init__()
self._env = SentinelEnv()
self._episode_id = str(uuid4())
self._task_id = "basic_oversight"
self._has_reset = False
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
task_id: str = "basic_oversight",
variant_seed: Optional[int] = None,
**_: Any,
) -> SentinelNativeObservation:
self._episode_id = episode_id or str(uuid4())
self._task_id = task_id
resolved_seed = variant_seed if variant_seed is not None else (seed or 0)
obs = self._env.reset(task_id, variant_seed=resolved_seed)
self._has_reset = True
return self._to_observation(obs, reward=None, done=False)
def step(
self,
action: SentinelNativeAction,
timeout_s: Optional[float] = None,
**_: Any,
) -> SentinelNativeObservation:
if not self._has_reset:
self.reset(task_id=self._task_id)
payload = action.model_dump(exclude={"metadata"}, exclude_none=True)
if not payload.get("decision") and payload.get("action"):
payload["decision"] = payload["action"]
result = self._env.step(payload)
return self._to_observation(
result.observation,
reward=result.sentinel_reward.total,
done=result.done,
info=result.info,
)
@property
def state(self) -> SentinelNativeState:
if not self._has_reset:
return SentinelNativeState(
episode_id=self._episode_id,
step_count=0,
task_id=self._task_id,
)
state = self._env.state()
latest_audit = state.audit_log[-1].model_dump(mode="json") if state.audit_log else None
latest_proposal = (
state.pending_proposal.model_dump(mode="json")
if state.pending_proposal is not None
else {}
)
return SentinelNativeState(
episode_id=self._episode_id,
step_count=state.step_number,
task_id=state.task_id,
cumulative_reward=state.cumulative_reward,
done=state.done,
latest_proposal=latest_proposal,
latest_audit=latest_audit,
worker_records={
worker_id: record.model_dump(mode="json")
for worker_id, record in state.worker_records.items()
},
)
def get_metadata(self) -> EnvironmentMetadata:
return EnvironmentMetadata(
name="sentinel-oversight-command",
description=(
"Multi-agent OpenEnv environment where an LLM overseer intercepts "
"worker-agent actions before they can affect a production incident."
),
version="1.0.0",
author="OpenEnv Contributor",
documentation_url="https://huggingface.co/spaces/srikrishna2005/openenv",
)
def close(self) -> None:
self._has_reset = False
def _to_observation(
self,
obs: Any,
*,
reward: Optional[float],
done: bool,
info: Optional[Dict[str, Any]] = None,
) -> SentinelNativeObservation:
proposal = (
obs.proposed_action.model_dump(mode="json")
if getattr(obs, "proposed_action", None) is not None
else {}
)
audit_tail = []
try:
audit_tail = [
item.model_dump(mode="json")
for item in self._env.state().audit_log[-3:]
]
except RuntimeError:
audit_tail = []
return SentinelNativeObservation(
done=done,
reward=reward,
metadata=info or {},
task_id=getattr(obs, "task_id", self._task_id),
step_number=getattr(obs, "step_number", 0),
max_steps=getattr(obs, "max_steps", 0),
proposed_action=proposal,
worker_id=getattr(obs, "worker_id", None),
worker_role=getattr(obs, "worker_role", None),
incident_status=getattr(obs, "incident_status", None),
available_decisions=list(getattr(obs, "available_decisions", []) or []),
corrective_loop_enabled=bool(getattr(obs, "corrective_loop_enabled", False)),
audit_log_tail=audit_tail,
message=getattr(obs, "message", ""),
)
|