"""Native OpenEnv adapter for the SENTINEL oversight environment. The main FastAPI app keeps the richer hackathon demo endpoints, while this adapter exposes the same oversight task through OpenEnv's Environment base class so latest OpenEnv clients can discover schemas and use the standard reset/step/state/WebSocket contract. """ from __future__ import annotations from typing import Any, Dict, List, Optional from uuid import uuid4 from pydantic import Field from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import ( Action as OpenEnvAction, EnvironmentMetadata, Observation as OpenEnvObservation, State as OpenEnvState, ) from sentinel.environment import SentinelEnv class SentinelNativeAction(OpenEnvAction): """OpenEnv-native action model for one SENTINEL oversight decision.""" decision: Optional[str] = Field( default=None, description="Oversight decision: APPROVE, BLOCK, REDIRECT, REASSIGN, or FLAG.", ) action: Optional[str] = Field( default=None, description="Alias for decision, accepted for client compatibility.", ) reason: Optional[str] = Field( default=None, description="Misbehavior type or 'safe'.", ) explanation: str = Field( default="", description="Evidence-backed explanation for the oversight decision.", ) redirect_action: Optional[Dict[str, Any]] = Field( default=None, description="Safer replacement action when decision is REDIRECT.", ) reassign_to: Optional[str] = Field( default=None, description="Worker id to receive the work when decision is REASSIGN.", ) flag_severity: Optional[str] = Field( default=None, description="Severity for FLAG decisions.", ) worker_message: str = Field( default="", description="Corrective feedback sent to the worker.", ) required_evidence: List[str] = Field( default_factory=list, description="Evidence required before a corrected proposal can execute.", ) suggested_action_type: Optional[str] = None suggested_target: Optional[str] = None suggested_parameters: Dict[str, Any] = Field(default_factory=dict) constitutional_violations: List[str] = Field(default_factory=list) class SentinelNativeObservation(OpenEnvObservation): """Observation returned by the native OpenEnv SENTINEL adapter.""" task_id: str = "basic_oversight" step_number: int = 0 max_steps: int = 0 proposed_action: Dict[str, Any] = Field(default_factory=dict) worker_id: Optional[str] = None worker_role: Optional[str] = None incident_status: Optional[str] = None available_decisions: List[str] = Field(default_factory=list) corrective_loop_enabled: bool = False audit_log_tail: List[Dict[str, Any]] = Field(default_factory=list) message: str = "" class SentinelNativeState(OpenEnvState): """State snapshot for the native OpenEnv SENTINEL adapter.""" task_id: Optional[str] = None cumulative_reward: float = 0.0 done: bool = False latest_proposal: Dict[str, Any] = Field(default_factory=dict) latest_audit: Optional[Dict[str, Any]] = None worker_records: Dict[str, Any] = Field(default_factory=dict) class SentinelNativeEnvironment( Environment[SentinelNativeAction, SentinelNativeObservation, SentinelNativeState] ): """OpenEnv Environment wrapper around :class:`sentinel.environment.SentinelEnv`.""" SUPPORTS_CONCURRENT_SESSIONS = True def __init__(self) -> None: super().__init__() self._env = SentinelEnv() self._episode_id = str(uuid4()) self._task_id = "basic_oversight" self._has_reset = False def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, task_id: str = "basic_oversight", variant_seed: Optional[int] = None, **_: Any, ) -> SentinelNativeObservation: self._episode_id = episode_id or str(uuid4()) self._task_id = task_id resolved_seed = variant_seed if variant_seed is not None else (seed or 0) obs = self._env.reset(task_id, variant_seed=resolved_seed) self._has_reset = True return self._to_observation(obs, reward=None, done=False) def step( self, action: SentinelNativeAction, timeout_s: Optional[float] = None, **_: Any, ) -> SentinelNativeObservation: if not self._has_reset: self.reset(task_id=self._task_id) payload = action.model_dump(exclude={"metadata"}, exclude_none=True) if not payload.get("decision") and payload.get("action"): payload["decision"] = payload["action"] result = self._env.step(payload) return self._to_observation( result.observation, reward=result.sentinel_reward.total, done=result.done, info=result.info, ) @property def state(self) -> SentinelNativeState: if not self._has_reset: return SentinelNativeState( episode_id=self._episode_id, step_count=0, task_id=self._task_id, ) state = self._env.state() latest_audit = state.audit_log[-1].model_dump(mode="json") if state.audit_log else None latest_proposal = ( state.pending_proposal.model_dump(mode="json") if state.pending_proposal is not None else {} ) return SentinelNativeState( episode_id=self._episode_id, step_count=state.step_number, task_id=state.task_id, cumulative_reward=state.cumulative_reward, done=state.done, latest_proposal=latest_proposal, latest_audit=latest_audit, worker_records={ worker_id: record.model_dump(mode="json") for worker_id, record in state.worker_records.items() }, ) def get_metadata(self) -> EnvironmentMetadata: return EnvironmentMetadata( name="sentinel-oversight-command", description=( "Multi-agent OpenEnv environment where an LLM overseer intercepts " "worker-agent actions before they can affect a production incident." ), version="1.0.0", author="OpenEnv Contributor", documentation_url="https://huggingface.co/spaces/srikrishna2005/openenv", ) def close(self) -> None: self._has_reset = False def _to_observation( self, obs: Any, *, reward: Optional[float], done: bool, info: Optional[Dict[str, Any]] = None, ) -> SentinelNativeObservation: proposal = ( obs.proposed_action.model_dump(mode="json") if getattr(obs, "proposed_action", None) is not None else {} ) audit_tail = [] try: audit_tail = [ item.model_dump(mode="json") for item in self._env.state().audit_log[-3:] ] except RuntimeError: audit_tail = [] return SentinelNativeObservation( done=done, reward=reward, metadata=info or {}, task_id=getattr(obs, "task_id", self._task_id), step_number=getattr(obs, "step_number", 0), max_steps=getattr(obs, "max_steps", 0), proposed_action=proposal, worker_id=getattr(obs, "worker_id", None), worker_role=getattr(obs, "worker_role", None), incident_status=getattr(obs, "incident_status", None), available_decisions=list(getattr(obs, "available_decisions", []) or []), corrective_loop_enabled=bool(getattr(obs, "corrective_loop_enabled", False)), audit_log_tail=audit_tail, message=getattr(obs, "message", ""), )