File size: 4,838 Bytes
11b949a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from __future__ import annotations

import random
from typing import Dict, Optional, Tuple

try:
    from openenv import Environment
except ImportError:  # local fallback for dev without openenv installed
    class Environment:  # type: ignore
        pass

from .consequences import apply_action
from .events import CANONICAL_EVENTS, Event
from .reward import annotate_decision, component_scores, weighted_total
from .state import EpisodeState


class WorkLifeFirewallEnv(Environment):
    metadata = {"name": "work-life-firewall", "version": "1.0.0", "max_steps": 14}

    def __init__(self, randomize_order: bool = True, seed: Optional[int] = None):
        super().__init__()
        self.randomize_order = randomize_order
        self.rng = random.Random(seed)
        self._state = EpisodeState()
        self._events = []
        self._idx = 0

    def reset(self) -> Dict:
        self._state = EpisodeState()
        self._events = list(CANONICAL_EVENTS)
        if self.randomize_order:
            opener = self._events[0]
            tail = self._events[1:]
            self.rng.shuffle(tail)
            self._events = [opener] + tail
        self._idx = 0
        return self._build_obs(self._events[self._idx])

    def step(self, action: str) -> Tuple[Dict, float, bool, Dict]:
        event = self._events[self._idx]
        action_id, action_data = self._decode_action(action, event)
        self._state = apply_action(self._state, event.id, action_id, action_data)

        # Add comm quality annotations into latest decision.
        latest = self._state.decisions[-1]
        annotate_decision(latest, action)
        step_reward = self._step_reward(latest, action_data)

        self._idx += 1
        self._state.day = min(4, self._idx // 2)

        done = self._idx >= len(self._events) or self._state.energy <= 0.05
        if done:
            comps = component_scores(self._state, self._state.decisions)
            terminal = weighted_total(comps)
            info = {"episode_complete": True, "components": comps}
            return self._state.to_observation(), round(step_reward + terminal, 4), True, info

        return self._build_obs(self._events[self._idx]), round(step_reward, 4), False, {}

    def state(self) -> Dict:
        return self._state.to_observation()

    def _build_obs(self, event: Event) -> Dict:
        return {
            "state": self._state.to_observation(),
            "event": {
                "id": event.id,
                "title": event.title,
                "source": event.source,
                "sender": event.sender,
                "body": event.body,
                "urgency": event.urgency,
                "relationship_stake": event.relationship_stake,
                "actions": [a["id"] for a in event.actions],
            },
            "instruction": "Write the response or action you would take. End with a specific next step.",
        }

    def _decode_action(self, free_text: str, event: Event) -> Tuple[str, Dict]:
        text = free_text.lower()
        decode_map = {
            "E1_staging": {"ssh": "fix_directly", "restart": "fix_directly", "delegate": "delegate_oncall", "infra": "escalate_infra"},
            "E2_slack": {"async": "async_boundary", "later": "async_boundary", "call now": "both_calls_now", "ignore": "ignore_slack"},
            "E3_client_email": {"thanks": "acknowledge_timeline", "thursday": "acknowledge_timeline", "sorry": "full_apology", "cc": "cc_manager"},
            "E4_leave": {"approve": "ping_sundar", "today": "ping_sundar", "wait": "wait_hope", "cancel": "cancel_leave"},
            "E5_appraisal": {"block": "block_wednesday", "calendar": "block_wednesday", "tonight": "rush_tonight", "extension": "ask_extension"},
            "E6_oncall": {"can't": "no_clearly_kindly", "cannot": "no_clearly_kindly", "swap": "counter_offer", "yes": "yes_people_pleaser", "sure": "yes_people_pleaser"},
            "E7_standup": {"async": "decline_async", "skip": "decline_async", "attend": "attend_sacrifice", "delegate": "delegate_teammate"},
        }
        mapping = decode_map.get(event.id, {})
        selected = event.actions[0]
        for keyword, action_id in mapping.items():
            if keyword in text:
                selected = next((a for a in event.actions if a["id"] == action_id), event.actions[0])
                break
        return selected["id"], selected

    def _step_reward(self, decision: Dict, action_data: Dict) -> float:
        energy = float(action_data.get("energy_delta", 0.0))
        sprint = float(action_data.get("sprint_delta", 0.0))
        rel_penalty = sum(abs(v) for v in action_data.get("relationship_delta", {}).values()) * 0.08
        comm_bonus = decision.get("message_quality", 0.0) * 0.12
        return energy * 0.5 + sprint * 0.4 - rel_penalty + comm_bonus