Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| RhythmEnv Life Simulator β Environment Implementation. | |
| A holistic life resource management RL environment. The agent balances | |
| 5 life meters across a 7-day week (28 steps) while hidden personality | |
| profiles secretly control how actions affect meters and how reward is | |
| computed. The agent must discover these hidden dynamics through experience. | |
| 1 episode = 1 week, 1 step = 1 time slot (4 per day), 28 steps total. | |
| Key design principles for learnability: | |
| - step_history: last 7 steps of (action, reward, deltas) are included | |
| in every observation so the agent can detect personality anomalies | |
| - *_anomaly fields: per-meter deviation from neutral-profile expectation, | |
| giving a direct fingerprint of the hidden profile each step | |
| - adaptation_score: 30% of final grade β late-half mean per-step reward | |
| minus early-half mean (gated by absolute late-half quality). Rewards | |
| the agent for getting better as it learns the user. | |
| - Profile assignment uses a scrambled seed to prevent memorization | |
| of seed β profile mappings during training | |
| """ | |
| import random | |
| from copy import deepcopy | |
| from typing import Any, Dict, List, Optional | |
| from uuid import uuid4 | |
| from openenv.core.env_server import Environment | |
| from openenv.core.env_server.types import EnvironmentMetadata | |
| try: | |
| from ..models import ActionType, RhythmAction, RhythmObservation, RhythmState, StepRecord | |
| except (ImportError, ModuleNotFoundError): | |
| from models import ActionType, RhythmAction, RhythmObservation, RhythmState, StepRecord | |
| # --------------------------------------------------------------------------- | |
| # Constants | |
| # --------------------------------------------------------------------------- | |
| MAX_STEPS = 28 | |
| DAYS = 7 | |
| SLOTS_PER_DAY = 4 | |
| SLOT_NAMES = ["morning", "afternoon", "evening", "night"] | |
| METERS = ["vitality", "cognition", "progress", "serenity", "connection"] | |
| EVENT_PROBABILITY = 0.08 | |
| CRITICAL_THRESHOLD = 0.1 | |
| CRITICAL_PENALTY = -0.3 | |
| REWARD_SCALE = 15.0 | |
| HISTORY_LENGTH = 7 # number of past steps included in every observation | |
| # --------------------------------------------------------------------------- | |
| # Action-Effect Matrix (base deltas per action on each meter) | |
| # --------------------------------------------------------------------------- | |
| ACTION_EFFECTS: Dict[str, Dict[str, float]] = { | |
| "deep_work": {"vitality": -0.12, "cognition": -0.10, "progress": 0.18, "serenity": -0.05, "connection": 0.00}, | |
| "admin_work": {"vitality": -0.06, "cognition": -0.05, "progress": 0.08, "serenity": -0.03, "connection": 0.00}, | |
| "learn": {"vitality": -0.08, "cognition": -0.08, "progress": 0.12, "serenity": 0.02, "connection": 0.00}, | |
| "sleep": {"vitality": 0.20, "cognition": 0.10, "progress": 0.00, "serenity": 0.05, "connection": 0.00}, | |
| "exercise": {"vitality": 0.12, "cognition": 0.05, "progress": 0.00, "serenity": 0.08, "connection": 0.00}, | |
| "meditate": {"vitality": 0.03, "cognition": 0.08, "progress": 0.00, "serenity": 0.15, "connection": 0.00}, | |
| "family_time": {"vitality": -0.04, "cognition": -0.02, "progress": 0.00, "serenity": 0.06, "connection": 0.15}, | |
| "socialize": {"vitality": -0.06, "cognition": -0.03, "progress": 0.00, "serenity": 0.04, "connection": 0.12}, | |
| "me_time": {"vitality": 0.05, "cognition": 0.03, "progress": 0.00, "serenity": 0.10, "connection": -0.02}, | |
| "binge_watch": {"vitality": 0.02, "cognition": -0.05, "progress": -0.02, "serenity": 0.06, "connection": -0.03}, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Time-of-Day Multipliers | |
| # --------------------------------------------------------------------------- | |
| TIME_MULTIPLIERS: Dict[int, Dict[str, float]] = { | |
| 0: {"cognition_gain": 1.2, "vitality_drain": 0.8}, # Morning | |
| 1: {"cognition_gain": 1.0, "vitality_drain": 1.0}, # Afternoon | |
| 2: {"cognition_gain": 0.8, "vitality_drain": 1.1}, # Evening | |
| 3: {"cognition_gain": 0.6, "vitality_drain": 1.3}, # Night | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Random Events | |
| # --------------------------------------------------------------------------- | |
| EVENT_EFFECTS: Dict[str, Dict[str, float]] = { | |
| "prod_crash": {"vitality": -0.08, "cognition": -0.10, "progress": -0.10, "serenity": -0.15, "connection": 0.00}, | |
| "family_emergency": {"vitality": -0.05, "cognition": -0.08, "progress": 0.00, "serenity": -0.12, "connection": -0.10}, | |
| "illness": {"vitality": -0.20, "cognition": -0.10, "progress": 0.00, "serenity": -0.05, "connection": 0.00}, | |
| "good_news": {"vitality": 0.05, "cognition": 0.03, "progress": 0.00, "serenity": 0.10, "connection": 0.05}, | |
| } | |
| EVENT_NAMES: List[str] = list(EVENT_EFFECTS.keys()) | |
| # --------------------------------------------------------------------------- | |
| # Hidden Personality Profiles | |
| # --------------------------------------------------------------------------- | |
| PROFILES: List[Dict[str, Any]] = [ | |
| { | |
| "name": "introvert_morning", | |
| "social_vitality_multiplier": 3.0, | |
| "morning_cognition_bonus": 2.0, | |
| "evening_night_cognition_bonus": None, | |
| "morning_penalty": None, | |
| "binge_shame": True, | |
| "progress_serenity_bonus": 0.0, | |
| "idle_serenity_decay": 0.0, | |
| "vitality_decay_rate": 0.0, | |
| "stress_tolerance": 0.3, | |
| "event_impact_multiplier": 1.0, | |
| "connection_decay_rate": 0.01, | |
| "solo_serenity_bonus": 0.10, | |
| "social_connection_multiplier": 1.0, | |
| "social_serenity_bonus": 0.0, | |
| "work_vitality_recovery": 0.0, | |
| "initial_meters": { | |
| "vitality": 0.7, "cognition": 0.7, "progress": 0.0, | |
| "serenity": 0.7, "connection": 0.5, | |
| }, | |
| "reward_weights": { | |
| "vitality": 0.05, "cognition": 0.05, "progress": 0.20, | |
| "serenity": 0.60, "connection": 0.10, | |
| }, | |
| }, | |
| { | |
| "name": "extrovert_night_owl", | |
| "social_vitality_multiplier": 0.2, | |
| "morning_cognition_bonus": None, | |
| "evening_night_cognition_bonus": 1.8, | |
| "morning_penalty": 0.4, | |
| "binge_shame": False, | |
| "progress_serenity_bonus": 0.0, | |
| "idle_serenity_decay": 0.0, | |
| "vitality_decay_rate": 0.0, | |
| "stress_tolerance": 0.2, | |
| "event_impact_multiplier": 0.8, | |
| "connection_decay_rate": 0.01, | |
| "solo_serenity_bonus": 0.0, | |
| "social_connection_multiplier": 2.0, | |
| "social_serenity_bonus": 0.06, | |
| "work_vitality_recovery": 0.0, | |
| "initial_meters": { | |
| "vitality": 0.7, "cognition": 0.7, "progress": 0.0, | |
| "serenity": 0.7, "connection": 0.5, | |
| }, | |
| "reward_weights": { | |
| "vitality": 0.05, "cognition": 0.05, "progress": 0.10, | |
| "serenity": 0.05, "connection": 0.75, | |
| }, | |
| }, | |
| { | |
| "name": "workaholic_stoic", | |
| "social_vitality_multiplier": 1.0, | |
| "morning_cognition_bonus": None, | |
| "evening_night_cognition_bonus": None, | |
| "morning_penalty": None, | |
| "binge_shame": False, | |
| "progress_serenity_bonus": 0.10, | |
| "idle_serenity_decay": 0.10, | |
| "vitality_decay_rate": 0.04, | |
| "stress_tolerance": 0.15, | |
| "event_impact_multiplier": 0.5, | |
| "connection_decay_rate": 0.02, | |
| "solo_serenity_bonus": 0.0, | |
| "social_connection_multiplier": 1.0, | |
| "social_serenity_bonus": 0.0, | |
| "work_vitality_recovery": 0.06, | |
| "initial_meters": { | |
| "vitality": 0.7, "cognition": 0.7, "progress": 0.0, | |
| "serenity": 0.7, "connection": 0.5, | |
| }, | |
| "reward_weights": { | |
| "vitality": 0.05, "cognition": 0.05, "progress": 0.70, | |
| "serenity": 0.10, "connection": 0.10, | |
| }, | |
| }, | |
| ] | |
| PROFILE_MAP: Dict[str, Dict[str, Any]] = {p["name"]: p for p in PROFILES} | |
| # Social actions for modifier checks | |
| SOCIAL_ACTIONS = {"family_time", "socialize"} | |
| IDLE_ACTIONS = {"me_time", "binge_watch", "sleep"} | |
| WORK_ACTIONS = {"deep_work", "learn", "admin_work"} | |
| # --------------------------------------------------------------------------- | |
| # Continuous profile sampling (meta-RL training distribution) | |
| # --------------------------------------------------------------------------- | |
| # | |
| # Hardcoded profiles are 3 fixed personalities β memorizable, classification-like. | |
| # Continuous sampling draws profile parameters from distributions per episode, | |
| # making memorization impossible and forcing the agent to learn the *skill* of | |
| # profile inference. This is the core meta-learning move. | |
| # | |
| # Belief vector dimensions (for Phase 3 cosine-similarity reward): | |
| # social_pref: 0 = hates social (introvert), 1 = loves social (extrovert) | |
| # morning_pref: 0 = night owl, 1 = morning person | |
| # work_pref: 0 = work-averse, 1 = workaholic | |
| def sample_profile(seed: int) -> Dict[str, Any]: | |
| """Sample a continuous profile deterministically from a seed. | |
| Reward weights drawn from a Dirichlet biased toward non-infrastructure | |
| meters (progress, serenity, connection). Per-action modifiers drawn from | |
| bounded uniforms so any sampled profile is playable. | |
| """ | |
| rng = random.Random(seed ^ 0xA3C5F729) | |
| # Reward weights via Dirichlet (Ξ±<1 produces sparse weights β "personalities") | |
| alphas = [0.5, 0.5, 1.5, 1.5, 1.5] # [vit, cog, prog, ser, conn] | |
| raw = [rng.gammavariate(a, 1.0) for a in alphas] | |
| total = sum(raw) | |
| weights = [w / total for w in raw] | |
| # Cap each weight at 0.45 so every sampled profile weights 3+ meters | |
| # meaningfully. With an 0.80 cap, single-meter-dominant profiles let | |
| # SLEEP-spam (or any single recovery action) be optimal β the env wasn't | |
| # lying, the agent was right to spam. Forcing balance makes belief | |
| # inference matter for action selection. | |
| weights = [max(0.05, min(0.45, w)) for w in weights] | |
| total = sum(weights) | |
| weights = [w / total for w in weights] | |
| return { | |
| "name": f"sampled_{seed}", | |
| "social_vitality_multiplier": rng.uniform(0.2, 3.0), | |
| "morning_cognition_bonus": rng.uniform(0.4, 2.0) if rng.random() < 0.5 else None, | |
| "evening_night_cognition_bonus": rng.uniform(0.6, 1.8) if rng.random() < 0.5 else None, | |
| "morning_penalty": rng.uniform(0.4, 0.9) if rng.random() < 0.3 else None, | |
| "binge_shame": rng.random() < 0.5, | |
| "progress_serenity_bonus": rng.uniform(0.0, 0.10), | |
| "idle_serenity_decay": rng.uniform(0.0, 0.10), | |
| "vitality_decay_rate": rng.uniform(0.0, 0.04), | |
| "stress_tolerance": rng.uniform(0.15, 0.30), | |
| "event_impact_multiplier": rng.uniform(0.5, 1.0), | |
| "connection_decay_rate": rng.uniform(0.005, 0.02), | |
| "solo_serenity_bonus": rng.uniform(0.0, 0.10), | |
| "social_connection_multiplier": rng.uniform(1.0, 2.0), | |
| "social_serenity_bonus": rng.uniform(0.0, 0.06), | |
| "work_vitality_recovery": rng.uniform(0.0, 0.06), | |
| "initial_meters": { | |
| "vitality": 0.7, "cognition": 0.7, "progress": 0.0, | |
| "serenity": 0.7, "connection": 0.5, | |
| }, | |
| "reward_weights": dict(zip(METERS, weights)), | |
| } | |
| def profile_to_belief_vector(profile: Dict[str, Any]) -> List[float]: | |
| """Reduce a profile to a 3-dim trait vector [social, morning, work] in [0, 1]. | |
| Used as the ground-truth target for the agent's belief output (Phase 3). | |
| Aggregates the most-diagnostic modifiers per trait. | |
| """ | |
| # social_pref: low if social drains vitality a lot, high if social bonuses are big | |
| # social_vitality_multiplier in [0.2, 3.0]: lower = more extroverted | |
| sm = profile.get("social_vitality_multiplier", 1.0) | |
| social_drain_norm = 1.0 - max(0.0, min(1.0, (sm - 0.2) / 2.8)) # invert | |
| scm = profile.get("social_connection_multiplier", 1.0) | |
| social_conn_norm = max(0.0, min(1.0, (scm - 1.0) / 1.0)) | |
| ssb = profile.get("social_serenity_bonus", 0.0) | |
| social_ser_norm = max(0.0, min(1.0, ssb / 0.06)) | |
| social_pref = 0.5 * social_drain_norm + 0.3 * social_conn_norm + 0.2 * social_ser_norm | |
| # morning_pref: high if morning bonus exists, low if morning penalty exists | |
| mcb = profile.get("morning_cognition_bonus") | |
| mp = profile.get("morning_penalty") | |
| morning_pref = 0.5 | |
| if mcb is not None: | |
| morning_pref = 0.5 + 0.5 * max(0.0, min(1.0, (mcb - 0.4) / 1.6)) | |
| if mp is not None: | |
| morning_pref = min(morning_pref, 0.5 - 0.5 * (1.0 - mp)) | |
| # work_pref: high if work recovers vitality + progress gives serenity + progress weight high | |
| wvr = profile.get("work_vitality_recovery", 0.0) | |
| wvr_norm = max(0.0, min(1.0, wvr / 0.06)) | |
| psb = profile.get("progress_serenity_bonus", 0.0) | |
| psb_norm = max(0.0, min(1.0, psb / 0.10)) | |
| pw = profile.get("reward_weights", {}).get("progress", 0.2) | |
| pw_norm = max(0.0, min(1.0, (pw - 0.05) / 0.65)) | |
| work_pref = 0.4 * wvr_norm + 0.3 * psb_norm + 0.3 * pw_norm | |
| return [ | |
| max(0.0, min(1.0, social_pref)), | |
| max(0.0, min(1.0, morning_pref)), | |
| max(0.0, min(1.0, work_pref)), | |
| ] | |
| class RhythmEnvironment(Environment): | |
| """ | |
| Life Simulator RL Environment. | |
| The agent manages 5 life meters (Vitality, Cognition, Progress, Serenity, | |
| Connection) across a 7-day week. Hidden personality profiles secretly | |
| control how actions affect meters and how reward is computed. The agent | |
| must discover these hidden dynamics through experience. | |
| Every observation includes: | |
| - Current meter values and temporal context | |
| - Last step's per-meter deltas as first-class fields | |
| - Anomaly signals: actual delta minus neutral-profile expectation | |
| - Rolling step_history (last 7 steps) with actions, rewards, deltas | |
| The final grade rewards profile-appropriate strategy via adaptation_score | |
| (30% of grade): late-half mean per-step reward minus early-half mean. | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self._state = RhythmState() | |
| self._rng = random.Random(0) | |
| self._profile: Dict[str, Any] = PROFILES[0] | |
| # Meters | |
| self._vitality: float = 0.8 | |
| self._cognition: float = 0.7 | |
| self._progress: float = 0.0 | |
| self._serenity: float = 0.7 | |
| self._connection: float = 0.5 | |
| # Tracking | |
| self._timestep: int = 0 | |
| self._crash_count: int = 0 | |
| self._total_reward: float = 0.0 | |
| self._step_history: list = [] | |
| self._step_rewards: list = [] # per-step rewards (for adaptation_score in grader) | |
| # Latest emitted belief vector β set by callers via record_belief() and | |
| # consumed by _grade_episode. Stays None if the agent never emits a belief | |
| # (e.g. heuristic baseline) β that case scores 0 on the belief component. | |
| self._final_belief: Optional[List[float]] = None | |
| # Lazy-built composed Rubric for episode grading. None until the first | |
| # `done=True` step; rebuilt only across env instances, not across episodes. | |
| self._grade_rubric: Optional[Any] = None | |
| def get_metadata(self) -> EnvironmentMetadata: | |
| return EnvironmentMetadata( | |
| name="RhythmEnv", | |
| description=( | |
| "Life Simulator β a holistic resource management RL environment " | |
| "where an agent balances 5 life meters across a 7-day week " | |
| "with hidden personality profiles." | |
| ), | |
| version="0.3.0", | |
| ) | |
| # ------------------------------------------------------------------ | |
| # reset | |
| # ------------------------------------------------------------------ | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> RhythmObservation: | |
| # Determine seed | |
| if seed is not None: | |
| effective_seed = seed | |
| else: | |
| effective_seed = hash(episode_id or str(uuid4())) & 0x7FFFFFFF | |
| self._rng = random.Random(effective_seed) | |
| # Profile selection β two modes: | |
| # 1. Explicit hardcoded profile name β one of the 3 reference profiles | |
| # (used by tests + the legacy 3-profile eval condition) | |
| # 2. Default β sampled continuous profile (meta-RL training distribution) | |
| profile_name = kwargs.get("profile") | |
| if profile_name and profile_name in PROFILE_MAP: | |
| self._profile = deepcopy(PROFILE_MAP[profile_name]) | |
| else: | |
| self._profile = sample_profile(effective_seed) | |
| # Initialize meters from profile defaults | |
| initial = self._profile["initial_meters"] | |
| self._vitality = initial["vitality"] | |
| self._cognition = initial["cognition"] | |
| self._progress = initial["progress"] | |
| self._serenity = initial["serenity"] | |
| self._connection = initial["connection"] | |
| # Reset tracking | |
| self._timestep = 0 | |
| self._crash_count = 0 | |
| self._total_reward = 0.0 | |
| self._step_history = [] | |
| self._step_rewards = [] | |
| self._final_belief = None | |
| self._state = RhythmState( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| profile_name=self._profile["name"], | |
| timestep=0, | |
| day=0, | |
| slot=0, | |
| vitality=self._vitality, | |
| cognition=self._cognition, | |
| progress=self._progress, | |
| serenity=self._serenity, | |
| connection=self._connection, | |
| ) | |
| return self._make_observation(reward=0.0, done=False, active_event=None) | |
| # ------------------------------------------------------------------ | |
| # step | |
| # ------------------------------------------------------------------ | |
| def step( | |
| self, | |
| action: RhythmAction, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> RhythmObservation: | |
| # Save step number before incrementing (used for history record) | |
| current_step = self._timestep | |
| slot = self._timestep % SLOTS_PER_DAY | |
| day = self._timestep // SLOTS_PER_DAY | |
| action_name = action.action_type.value | |
| # --- 1. Roll and apply event --- | |
| active_event = self._roll_event() | |
| if active_event: | |
| self._apply_event(active_event) | |
| # --- 2. Get base action effects --- | |
| effects = dict(ACTION_EFFECTS[action_name]) | |
| # --- 2b. Repetition dampening --- | |
| recent3 = [h["action"] for h in self._step_history[-3:]] | |
| repeat_count = recent3.count(action_name) | |
| if repeat_count > 0: | |
| dampening = 1.0 - 0.25 * repeat_count # 0.75, 0.50, 0.25 | |
| for meter in METERS: | |
| if effects[meter] > 0: | |
| effects[meter] *= dampening | |
| # --- 3. Apply time-of-day multipliers (SLEEP bypasses) --- | |
| if action_name != "sleep": | |
| effects = self._apply_time_multipliers(effects, slot) | |
| # Snapshot expected effects here β after time/dampening but BEFORE profile | |
| # modifiers. The anomaly = actual_delta - expected gives the agent a direct | |
| # per-step fingerprint of the hidden profile modifier. | |
| expected_no_profile = dict(effects) | |
| # --- 4. Apply profile modifiers --- | |
| effects = self._apply_profile_modifiers(effects, action_name, slot) | |
| # --- 5. Apply global vitality factor (low vitality reduces positive effects) --- | |
| vitality_factor = 0.5 + 0.5 * self._vitality | |
| for meter in METERS: | |
| if meter != "vitality" and effects[meter] > 0: | |
| effects[meter] *= vitality_factor | |
| # Apply same vitality factor to expected for fair anomaly comparison | |
| for meter in METERS: | |
| if meter != "vitality" and expected_no_profile[meter] > 0: | |
| expected_no_profile[meter] *= vitality_factor | |
| # --- 6. Apply passive decays --- | |
| self._apply_passive_decays() | |
| # --- 7. Update meters and track deltas --- | |
| deltas: Dict[str, float] = {} | |
| for meter in METERS: | |
| old_val = getattr(self, f"_{meter}") | |
| new_val = max(0.0, min(1.0, old_val + effects[meter])) | |
| deltas[meter] = new_val - old_val | |
| setattr(self, f"_{meter}", new_val) | |
| # --- 8. Compute reward --- | |
| reward = self._compute_reward(deltas) | |
| # --- 9. Check critical thresholds --- | |
| for meter in METERS: | |
| if getattr(self, f"_{meter}") < CRITICAL_THRESHOLD: | |
| reward += CRITICAL_PENALTY | |
| self._crash_count += 1 | |
| # Clamp reward | |
| reward = max(-3.0, min(3.0, round(reward, 4))) | |
| self._total_reward += reward | |
| self._step_rewards.append(reward) | |
| # --- 10. Advance timestep --- | |
| self._timestep += 1 | |
| new_day = self._timestep // SLOTS_PER_DAY | |
| new_slot = self._timestep % SLOTS_PER_DAY | |
| # --- 11. Check done --- | |
| done = self._timestep >= MAX_STEPS | |
| # --- 12. Build reward breakdown --- | |
| # Includes: per-meter deltas, per-meter anomalies (actual - expected), | |
| # event flag, and final_score on the last step. | |
| reward_breakdown: Dict[str, float] = {} | |
| for meter in METERS: | |
| reward_breakdown[f"{meter}_delta"] = round(deltas[meter], 4) | |
| reward_breakdown[f"{meter}_anomaly"] = round( | |
| deltas[meter] - expected_no_profile[meter], 4 | |
| ) | |
| if active_event: | |
| reward_breakdown["event"] = 1.0 | |
| # --- 13. Grade if done --- | |
| if done: | |
| final_score = self._grade_episode() | |
| reward_breakdown["final_score"] = round(final_score, 4) | |
| # Sparse terminal reward: directly supervise on grader final_score. | |
| # Centered on 0.5 (the "average" episode), scaled by 5x to give a | |
| # range of [-2.5, +2.5] β strong enough to dominate any local | |
| # reward-hack the agent might find on per-step shaping alone. | |
| terminal_bonus = (final_score - 0.5) * 5.0 | |
| reward = max(-3.0, min(3.0, reward + terminal_bonus)) | |
| self._total_reward += terminal_bonus # update tracking too | |
| reward_breakdown["terminal_bonus"] = round(terminal_bonus, 4) | |
| # --- 14. Update state --- | |
| self._state.step_count = self._timestep | |
| self._state.timestep = self._timestep | |
| self._state.day = new_day | |
| self._state.slot = new_slot | |
| self._state.vitality = round(self._vitality, 4) | |
| self._state.cognition = round(self._cognition, 4) | |
| self._state.progress = round(self._progress, 4) | |
| self._state.serenity = round(self._serenity, 4) | |
| self._state.connection = round(self._connection, 4) | |
| self._state.active_event = active_event | |
| # --- 15. Append completed step to rolling history --- | |
| # History entries carry per-meter anomalies (actual β expected_under_neutral). | |
| # The prompt builder reads these directly to surface the agent's clearest | |
| # profile-inference signal. | |
| self._step_history.append({ | |
| "step": current_step, | |
| "action": action_name, | |
| "reward": reward, | |
| "vitality_delta": round(deltas["vitality"], 4), | |
| "cognition_delta": round(deltas["cognition"], 4), | |
| "progress_delta": round(deltas["progress"], 4), | |
| "serenity_delta": round(deltas["serenity"], 4), | |
| "connection_delta": round(deltas["connection"], 4), | |
| "vitality_anomaly": round(deltas["vitality"] - expected_no_profile["vitality"], 4), | |
| "cognition_anomaly": round(deltas["cognition"] - expected_no_profile["cognition"], 4), | |
| "progress_anomaly": round(deltas["progress"] - expected_no_profile["progress"], 4), | |
| "serenity_anomaly": round(deltas["serenity"] - expected_no_profile["serenity"], 4), | |
| "connection_anomaly": round(deltas["connection"] - expected_no_profile["connection"], 4), | |
| }) | |
| if len(self._step_history) > HISTORY_LENGTH: | |
| self._step_history.pop(0) | |
| return self._make_observation( | |
| reward=reward, | |
| done=done, | |
| active_event=active_event, | |
| reward_breakdown=reward_breakdown, | |
| deltas=deltas, | |
| last_action=action_name, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # state property | |
| # ------------------------------------------------------------------ | |
| def state(self) -> RhythmState: | |
| return self._state | |
| def get_belief_target(self) -> List[float]: | |
| """Return the 3-dim ground-truth belief vector for the active profile. | |
| Used during training to compute belief-accuracy reward (Phase 3). | |
| Privileged information β not exposed via observation. | |
| """ | |
| return profile_to_belief_vector(self._profile) | |
| def record_belief(self, belief: List[float]) -> None: | |
| """Record the agent's emitted belief for the current step. | |
| The grader (`_grade_episode`) uses the LAST recorded belief to compute | |
| the belief_accuracy component of final_score. Callers should invoke | |
| this once per step after parsing the agent's completion. Heuristic / | |
| random baselines that don't emit a belief never call this, and the | |
| belief component scores 0 for them β that's intentional: the meta-RL | |
| skill is INFERENCE, and only agents that actually try get credit. | |
| """ | |
| if len(belief) != 3: | |
| raise ValueError(f"belief must have 3 elements, got {len(belief)}") | |
| self._final_belief = [max(0.0, min(1.0, float(b))) for b in belief] | |
| def get_profile_hint(self) -> Dict[str, float]: | |
| """Return a coarse profile hint usable in observation during curriculum. | |
| Returns the 3-dim belief vector with descriptive keys. The dataset | |
| generator passes this into the prompt for the fraction of samples | |
| with show_profile_hint=True (the curriculum's "visible" warmup phase). | |
| """ | |
| b = profile_to_belief_vector(self._profile) | |
| return {"social_pref": round(b[0], 3), "morning_pref": round(b[1], 3), "work_pref": round(b[2], 3)} | |
| # ------------------------------------------------------------------ | |
| # Internal helpers | |
| # ------------------------------------------------------------------ | |
| def _roll_event(self) -> Optional[str]: | |
| """Roll for a random event this step.""" | |
| if self._rng.random() < EVENT_PROBABILITY: | |
| return self._rng.choice(EVENT_NAMES) | |
| return None | |
| def _apply_event(self, event_name: str) -> None: | |
| """Apply event effects to meters, modified by profile.""" | |
| effects = EVENT_EFFECTS[event_name] | |
| multiplier = self._profile["event_impact_multiplier"] | |
| for meter in METERS: | |
| delta = effects[meter] | |
| # Only apply multiplier to negative effects | |
| if delta < 0: | |
| delta *= multiplier | |
| old_val = getattr(self, f"_{meter}") | |
| new_val = max(0.0, min(1.0, old_val + delta)) | |
| setattr(self, f"_{meter}", new_val) | |
| def _apply_time_multipliers( | |
| self, effects: Dict[str, float], slot: int | |
| ) -> Dict[str, float]: | |
| """Apply time-of-day multipliers to action effects.""" | |
| multipliers = TIME_MULTIPLIERS[slot] | |
| for meter in effects: | |
| if meter == "cognition" and effects[meter] > 0: | |
| effects[meter] *= multipliers["cognition_gain"] | |
| elif meter == "vitality" and effects[meter] < 0: | |
| effects[meter] *= multipliers["vitality_drain"] | |
| return effects | |
| def _apply_profile_modifiers( | |
| self, effects: Dict[str, float], action_name: str, slot: int | |
| ) -> Dict[str, float]: | |
| """Apply hidden profile modifiers to action effects.""" | |
| profile = self._profile | |
| # Social vitality drain multiplier | |
| if action_name in SOCIAL_ACTIONS and effects["vitality"] < 0: | |
| effects["vitality"] *= profile["social_vitality_multiplier"] | |
| # Introvert morning cognition bonus | |
| bonus = profile.get("morning_cognition_bonus") | |
| if bonus and slot == 0: | |
| if effects["cognition"] > 0: | |
| effects["cognition"] *= bonus | |
| if effects["progress"] > 0: | |
| effects["progress"] *= bonus | |
| # Extrovert evening/night cognition bonus | |
| bonus = profile.get("evening_night_cognition_bonus") | |
| if bonus and slot in (2, 3): | |
| if effects["cognition"] > 0: | |
| effects["cognition"] *= bonus | |
| if effects["progress"] > 0: | |
| effects["progress"] *= bonus | |
| # Extrovert morning penalty | |
| penalty = profile.get("morning_penalty") | |
| if penalty and slot == 0: | |
| if effects["cognition"] > 0: | |
| effects["cognition"] *= penalty | |
| if effects["progress"] > 0: | |
| effects["progress"] *= penalty | |
| # Binge shame spiral | |
| if profile.get("binge_shame") and action_name == "binge_watch": | |
| effects["serenity"] -= 0.15 | |
| effects["cognition"] -= 0.06 | |
| # Workaholic: progress-producing actions give serenity bonus | |
| psb = profile.get("progress_serenity_bonus", 0.0) | |
| if psb > 0 and effects["progress"] > 0: | |
| effects["serenity"] += psb | |
| # Workaholic: idle actions drain serenity | |
| isd = profile.get("idle_serenity_decay", 0.0) | |
| if isd > 0 and action_name in IDLE_ACTIONS: | |
| effects["serenity"] -= isd | |
| # Solo recharge: introvert gets serenity from alone time | |
| ssb = profile.get("solo_serenity_bonus", 0.0) | |
| if ssb > 0 and action_name in ("me_time", "meditate"): | |
| effects["serenity"] += ssb | |
| # Social connection multiplier: extrovert gets more connection from socializing | |
| scm = profile.get("social_connection_multiplier", 1.0) | |
| if scm != 1.0 and action_name in SOCIAL_ACTIONS and effects["connection"] > 0: | |
| effects["connection"] *= scm | |
| # Social serenity bonus: extrovert gets serenity from socializing | |
| ssrb = profile.get("social_serenity_bonus", 0.0) | |
| if ssrb > 0 and action_name in SOCIAL_ACTIONS: | |
| effects["serenity"] += ssrb | |
| # Work vitality recovery: workaholic gets vitality from productive work | |
| wvr = profile.get("work_vitality_recovery", 0.0) | |
| if wvr > 0 and action_name in WORK_ACTIONS: | |
| effects["vitality"] += wvr | |
| # Low serenity amplification (stress spiral) | |
| if self._serenity < profile.get("stress_tolerance", 0.3): | |
| for meter in effects: | |
| if effects[meter] < 0: | |
| effects[meter] *= 1.3 | |
| return effects | |
| def _apply_passive_decays(self) -> None: | |
| """Apply per-step passive meter decays.""" | |
| # Connection always decays if not actively maintained | |
| decay = self._profile["connection_decay_rate"] | |
| self._connection = max(0.0, self._connection - decay) | |
| # Workaholic extra vitality decay | |
| vd = self._profile.get("vitality_decay_rate", 0.0) | |
| if vd > 0: | |
| self._vitality = max(0.0, self._vitality - vd) | |
| def _compute_reward(self, deltas: Dict[str, float]) -> float: | |
| """Pure profile-weighted per-step reward. | |
| Deliberately uncontaminated: the grader-aligned bias (progress + | |
| connection deltas) lives in the TRAINING reward function in | |
| reward_functions.py, not here. Keeping the env's per-step reward | |
| pure means (1) the agent's inference signal stays a clean function | |
| of the hidden profile_weights, (2) the grader's adaptation_score | |
| isn't computed on biased rewards, and (3) the env's reward matches | |
| what an honest deployment would surface to the agent. | |
| """ | |
| weights = self._profile["reward_weights"] | |
| return sum(deltas[m] * weights[m] for m in METERS) * REWARD_SCALE | |
| def _grade_episode(self) -> float: | |
| """ | |
| Compute final episode score in [0, 1]. | |
| Components (meta-learning aligned): | |
| 0.15 β crash_free_ratio: no critical meter drops | |
| 0.20 β progress: career/skill growth | |
| 0.10 β connection: relationship maintained | |
| 0.25 β adaptation_score: agent got better as it learned the user | |
| 0.10 β efficiency: bounded normalized average reward | |
| 0.20 β belief_accuracy: how close last-emitted belief is to true profile | |
| DESIGN NOTE β Acknowledged conformance gap with OpenEnv: | |
| This grader is functionally equivalent to a `WeightedSum` Rubric | |
| (from `openenv.core.rubrics`) over 6 child Rubrics β same | |
| composability, same independent components, same explicit weights. | |
| We did not refactor to use the Rubric class literal because the | |
| grader reads aggregated episode-end state (per-step rewards buffer, | |
| crash_count, terminal belief) while OpenEnv's `Rubric.forward` | |
| expects per-(action, observation) inputs. A clean refactor would | |
| use `TrajectoryRubric` for the cumulative components and the | |
| per-step `Rubric` for crash_free / belief_accuracy. Tracked as | |
| a v2 cleanup task; not blocking on the meta-RL skill we're | |
| evaluating. | |
| Implementation: composes 6 `Rubric` subclasses via OpenEnv's | |
| `WeightedSum` (see `server/rubrics.py`). Each sub-rubric reads | |
| the aggregated episode state (`_step_rewards`, `_crash_count`, | |
| `_final_belief`, `_profile`) of the env it was built with β | |
| RFC 004's recommended pattern for trajectory-summary scoring on | |
| top of the per-(action, observation) Rubric ABC. | |
| belief_accuracy is the explicit meta-RL inference signal: an | |
| agent that doesn't emit a belief scores 0 here, an agent emitting | |
| a belief close to the hidden profile vector scores up to 1. | |
| Without this term, agents that play heuristic-style "keep meters | |
| healthy" score the same as agents that actually infer the profile, | |
| since the other components don't differentiate inference from | |
| reflex. | |
| """ | |
| from server.rubrics import make_grade_rubric | |
| # Build (or reuse) the composed rubric. The Rubric subclasses are | |
| # stateless once built β they read live env state at forward() | |
| # time β so caching is safe. | |
| if self._grade_rubric is None: | |
| self._grade_rubric = make_grade_rubric(self) | |
| # forward(action, observation) β args are unused for episode-end | |
| # scoring; the rubric reads from `self`. | |
| score = self._grade_rubric(action=None, observation=None) | |
| return max(0.0, min(1.0, float(score))) | |
| def _make_observation( | |
| self, | |
| reward: float, | |
| done: bool, | |
| active_event: Optional[str], | |
| reward_breakdown: Optional[Dict[str, float]] = None, | |
| deltas: Optional[Dict[str, float]] = None, | |
| last_action: Optional[str] = None, | |
| ) -> RhythmObservation: | |
| """Build the observation returned to the agent (hides profile).""" | |
| step_records = [ | |
| StepRecord( | |
| step=h["step"], | |
| action=h["action"], | |
| reward=h["reward"], | |
| vitality_delta=h["vitality_delta"], | |
| cognition_delta=h["cognition_delta"], | |
| progress_delta=h["progress_delta"], | |
| serenity_delta=h["serenity_delta"], | |
| connection_delta=h["connection_delta"], | |
| vitality_anomaly=h.get("vitality_anomaly", 0.0), | |
| cognition_anomaly=h.get("cognition_anomaly", 0.0), | |
| progress_anomaly=h.get("progress_anomaly", 0.0), | |
| serenity_anomaly=h.get("serenity_anomaly", 0.0), | |
| connection_anomaly=h.get("connection_anomaly", 0.0), | |
| ) | |
| for h in self._step_history | |
| ] | |
| return RhythmObservation( | |
| timestep=self._timestep, | |
| day=self._timestep // SLOTS_PER_DAY, | |
| slot=self._timestep % SLOTS_PER_DAY, | |
| vitality=round(self._vitality, 4), | |
| cognition=round(self._cognition, 4), | |
| progress=round(self._progress, 4), | |
| serenity=round(self._serenity, 4), | |
| connection=round(self._connection, 4), | |
| active_event=active_event, | |
| remaining_steps=MAX_STEPS - self._timestep, | |
| reward_breakdown=reward_breakdown or {}, | |
| reward=reward, | |
| done=done, | |
| # First-class delta fields (from this step; zero on reset) | |
| vitality_delta=round(deltas["vitality"], 4) if deltas else 0.0, | |
| cognition_delta=round(deltas["cognition"], 4) if deltas else 0.0, | |
| progress_delta=round(deltas["progress"], 4) if deltas else 0.0, | |
| serenity_delta=round(deltas["serenity"], 4) if deltas else 0.0, | |
| connection_delta=round(deltas["connection"], 4) if deltas else 0.0, | |
| last_action=last_action, | |
| # Rolling history of the last HISTORY_LENGTH completed steps | |
| step_history=step_records, | |
| ) | |