Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import random | |
| from typing import Dict, Optional, Tuple | |
| try: | |
| from openenv import Environment | |
| except ImportError: # local fallback for dev without openenv installed | |
| class Environment: # type: ignore | |
| pass | |
| from .consequences import apply_action | |
| from .events import CANONICAL_EVENTS, Event | |
| from .reward import annotate_decision, component_scores, weighted_total | |
| from .state import EpisodeState | |
| class WorkLifeFirewallEnv(Environment): | |
| metadata = {"name": "work-life-firewall", "version": "1.0.0", "max_steps": 14} | |
| def __init__(self, randomize_order: bool = True, seed: Optional[int] = None): | |
| super().__init__() | |
| self.randomize_order = randomize_order | |
| self.rng = random.Random(seed) | |
| self._state = EpisodeState() | |
| self._events = [] | |
| self._idx = 0 | |
| def reset(self) -> Dict: | |
| self._state = EpisodeState() | |
| self._events = list(CANONICAL_EVENTS) | |
| if self.randomize_order: | |
| opener = self._events[0] | |
| tail = self._events[1:] | |
| self.rng.shuffle(tail) | |
| self._events = [opener] + tail | |
| self._idx = 0 | |
| return self._build_obs(self._events[self._idx]) | |
| def step(self, action: str) -> Tuple[Dict, float, bool, Dict]: | |
| event = self._events[self._idx] | |
| action_id, action_data = self._decode_action(action, event) | |
| self._state = apply_action(self._state, event.id, action_id, action_data) | |
| # Add comm quality annotations into latest decision. | |
| latest = self._state.decisions[-1] | |
| annotate_decision(latest, action) | |
| step_reward = self._step_reward(latest, action_data) | |
| self._idx += 1 | |
| self._state.day = min(4, self._idx // 2) | |
| done = self._idx >= len(self._events) or self._state.energy <= 0.05 | |
| if done: | |
| comps = component_scores(self._state, self._state.decisions) | |
| terminal = weighted_total(comps) | |
| info = {"episode_complete": True, "components": comps} | |
| return self._state.to_observation(), round(step_reward + terminal, 4), True, info | |
| return self._build_obs(self._events[self._idx]), round(step_reward, 4), False, {} | |
| def state(self) -> Dict: | |
| return self._state.to_observation() | |
| def _build_obs(self, event: Event) -> Dict: | |
| return { | |
| "state": self._state.to_observation(), | |
| "event": { | |
| "id": event.id, | |
| "title": event.title, | |
| "source": event.source, | |
| "sender": event.sender, | |
| "body": event.body, | |
| "urgency": event.urgency, | |
| "relationship_stake": event.relationship_stake, | |
| "actions": [a["id"] for a in event.actions], | |
| }, | |
| "instruction": "Write the response or action you would take. End with a specific next step.", | |
| } | |
| def _decode_action(self, free_text: str, event: Event) -> Tuple[str, Dict]: | |
| text = free_text.lower() | |
| decode_map = { | |
| "E1_staging": {"ssh": "fix_directly", "restart": "fix_directly", "delegate": "delegate_oncall", "infra": "escalate_infra"}, | |
| "E2_slack": {"async": "async_boundary", "later": "async_boundary", "call now": "both_calls_now", "ignore": "ignore_slack"}, | |
| "E3_client_email": {"thanks": "acknowledge_timeline", "thursday": "acknowledge_timeline", "sorry": "full_apology", "cc": "cc_manager"}, | |
| "E4_leave": {"approve": "ping_sundar", "today": "ping_sundar", "wait": "wait_hope", "cancel": "cancel_leave"}, | |
| "E5_appraisal": {"block": "block_wednesday", "calendar": "block_wednesday", "tonight": "rush_tonight", "extension": "ask_extension"}, | |
| "E6_oncall": {"can't": "no_clearly_kindly", "cannot": "no_clearly_kindly", "swap": "counter_offer", "yes": "yes_people_pleaser", "sure": "yes_people_pleaser"}, | |
| "E7_standup": {"async": "decline_async", "skip": "decline_async", "attend": "attend_sacrifice", "delegate": "delegate_teammate"}, | |
| } | |
| mapping = decode_map.get(event.id, {}) | |
| selected = event.actions[0] | |
| for keyword, action_id in mapping.items(): | |
| if keyword in text: | |
| selected = next((a for a in event.actions if a["id"] == action_id), event.actions[0]) | |
| break | |
| return selected["id"], selected | |
| def _step_reward(self, decision: Dict, action_data: Dict) -> float: | |
| energy = float(action_data.get("energy_delta", 0.0)) | |
| sprint = float(action_data.get("sprint_delta", 0.0)) | |
| rel_penalty = sum(abs(v) for v in action_data.get("relationship_delta", {}).values()) * 0.08 | |
| comm_bonus = decision.get("message_quality", 0.0) * 0.12 | |
| return energy * 0.5 + sprint * 0.4 - rel_penalty + comm_bonus | |