Spaces:
Sleeping
Sleeping
| """ | |
| Reward functions for RhythmEnv GRPO training (meta-RL version). | |
| Four-layer reward stack: | |
| 1. format_valid β does the LLM output have a parseable belief + action format? | |
| 2. action_legal β is the action one of the 10 valid actions? | |
| 3. env_reward β actual environment reward (seed-replay) for the chosen action, | |
| plus grader-aligned bias and diversity/exploration shaping | |
| 4. belief_accuracy β how close is the belief vector to the hidden profile's true vector? | |
| Output format: "S M W ACTION_NAME" (belief first) | |
| - S, M, W: single digits 0-9 representing the agent's belief about the user | |
| S = social preference (0=hates social, 9=loves social) | |
| M = morning preference (0=night owl, 9=morning person) | |
| W = work preference (0=avoids work, 9=workaholic) | |
| - ACTION_NAME: one of 10 valid actions | |
| Example: "3 8 7 DEEP_WORK" β belief=[0.33, 0.89, 0.78], action=DEEP_WORK | |
| Belief-first matters because tokens generated earlier condition tokens generated | |
| later in causal LMs β the action ends up causally conditioned on the belief, so | |
| the belief is functionally useful for action selection rather than a post-hoc | |
| afterthought. The parser also accepts a legacy "ACTION S M W" ordering as fallback. | |
| Each function returns a list of floats (one per completion). | |
| """ | |
| import os | |
| import re | |
| import sys | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) | |
| from models import ActionType, RhythmAction | |
| from server.rhythm_environment import RhythmEnvironment | |
| # All valid action names (uppercase for matching) | |
| VALID_ACTIONS = {at.value.upper(): at for at in ActionType} | |
| # Default belief returned when the LLM doesn't provide one (neutral) | |
| DEFAULT_BELIEF = [0.5, 0.5, 0.5] | |
| _ANSWER_PATTERN = re.compile( | |
| r"(\d)\s+(\d)\s+(\d)\s+(" + "|".join(at.value.upper() for at in ActionType) + r")\b", | |
| re.IGNORECASE, | |
| ) | |
| def extract_action_and_belief(text: str) -> tuple[ActionType | None, list[float], bool]: | |
| """Parse the agent's output and extract (action, belief, belief_provided). | |
| Supports both formats: | |
| - Plain answer line: "S M W ACTION_NAME" | |
| - With CoT prefix: "<reasoning>...</reasoning>\nS M W ACTION_NAME" | |
| Strategy: search the entire response for the LAST occurrence of the | |
| "<digit> <digit> <digit> <ACTION>" pattern. Taking the last match handles | |
| cases where the model mentions an example mid-reasoning then commits to | |
| a different answer at the end. | |
| Returns: | |
| (action, belief, belief_provided): | |
| action: parsed ActionType or None if no valid action anywhere | |
| belief: 3-dim vector in [0, 1], DEFAULT_BELIEF if no belief parsed | |
| belief_provided: True iff a belief was extracted (full S M W ACTION pattern) | |
| """ | |
| if not text: | |
| return None, list(DEFAULT_BELIEF), False | |
| # Primary path: full belief+action pattern, take the LAST occurrence. | |
| matches = list(_ANSWER_PATTERN.finditer(text)) | |
| if matches: | |
| last = matches[-1] | |
| s, m, w, action_name = last.groups() | |
| try: | |
| belief = [int(s) / 9.0, int(m) / 9.0, int(w) / 9.0] | |
| action = ActionType(action_name.lower()) | |
| return action, belief, True | |
| except (ValueError, KeyError): | |
| pass # fall through to action-only fallback | |
| # Fallback: action-only output (no belief digits). Search for any valid | |
| # ACTION_NAME token in the response and return that with default belief. | |
| for line in text.strip().split("\n"): | |
| for token in line.upper().replace(",", " ").split(): | |
| token = token.strip(".") | |
| if token in VALID_ACTIONS: | |
| return VALID_ACTIONS[token], list(DEFAULT_BELIEF), False | |
| for name, at in VALID_ACTIONS.items(): | |
| if name in token: | |
| return at, list(DEFAULT_BELIEF), False | |
| return None, list(DEFAULT_BELIEF), False | |
| def extract_action(text: str) -> ActionType | None: | |
| """Backward-compatible action-only extractor (used by env_reward replay).""" | |
| action, _, _ = extract_action_and_belief(text) | |
| return action | |
| def format_valid(completions, **kwargs) -> list[float]: | |
| """ | |
| Layer 1: Is the output a parseable action + (optionally) belief digits? | |
| +1.0 if action + 3 belief digits, +0.5 if action only, -2.0 if no action. | |
| Graduated reward pushes the model toward emitting belief without harshly | |
| punishing action-only output during early training. | |
| """ | |
| scores = [] | |
| for completion in completions: | |
| response = completion[0]["content"] if isinstance(completion, list) else completion | |
| action, _, belief_provided = extract_action_and_belief(response) | |
| if action is None: | |
| scores.append(-2.0) | |
| elif belief_provided: | |
| scores.append(1.0) | |
| else: | |
| scores.append(0.5) | |
| return scores | |
| def action_legal(completions, **kwargs) -> list[float]: | |
| """ | |
| Layer 2: Is the parsed action one of the 10 valid actions? | |
| All 10 actions are always legal in this env (no state-dependent validity), | |
| so this layer is a pure penalty: 0 for any valid action, -1 for unparseable | |
| output. Returning 0 (rather than a positive constant) is intentional β | |
| a constant reward across all completions in a GRPO group contributes | |
| exactly zero to the advantage and was a major contributor to mode collapse. | |
| """ | |
| scores = [] | |
| for completion in completions: | |
| response = completion[0]["content"] if isinstance(completion, list) else completion | |
| action = extract_action(response) | |
| scores.append(0.0 if action is not None else -1.0) | |
| return scores | |
| def _replay_env(seed: int, history: list, profile_mode: str = "continuous") -> RhythmEnvironment: | |
| """Build a replayed env at the given seed/history/mode.""" | |
| env = RhythmEnvironment() | |
| env.reset(seed=seed, profile_mode=profile_mode) | |
| for past_action_name in history: | |
| env.step(RhythmAction(action_type=ActionType(past_action_name))) | |
| return env | |
| def env_reward( | |
| completions, | |
| prompts=None, | |
| seed=None, | |
| step_index=None, | |
| action_history=None, | |
| profile_mode=None, | |
| **kwargs, | |
| ) -> list[float]: | |
| """ | |
| Layer 3: Actual environment reward from stepping with the chosen action. | |
| Replays the episode to the correct state, then steps with the model's action. | |
| Returns the raw env reward (already scaled by REWARD_SCALE). | |
| seed/step_index/action_history are passed as dataset columns by GRPOTrainer. | |
| profile_mode column controls whether replay uses continuous or discrete profile. | |
| """ | |
| scores = [] | |
| for i, completion in enumerate(completions): | |
| response = completion[0]["content"] if isinstance(completion, list) else completion | |
| action_type = extract_action(response) | |
| if action_type is None: | |
| scores.append(-3.0) | |
| continue | |
| # Per-row dataset values (preferred path) | |
| if seed is not None and i < len(seed): | |
| ep_seed = seed[i] | |
| ep_history = action_history[i] if action_history is not None else [] | |
| ep_mode = profile_mode[i] if (profile_mode is not None and i < len(profile_mode)) else "continuous" | |
| else: | |
| prompt_data = prompts[i] if prompts and i < len(prompts) else None | |
| if prompt_data and isinstance(prompt_data, dict) and "seed" in prompt_data: | |
| ep_seed = prompt_data["seed"] | |
| ep_history = prompt_data.get("action_history", []) | |
| ep_mode = prompt_data.get("profile_mode", "continuous") | |
| else: | |
| # Fallback seed mixes index with a prime to break deterministic | |
| # seed clusters β without it, all completions in a position-class | |
| # land on the same env_reward and contribute zero to GRPO advantage. | |
| ep_seed = (i * 17) ^ 0xBEEF | |
| ep_history = [] | |
| ep_mode = "continuous" | |
| try: | |
| env = _replay_env(ep_seed, ep_history, ep_mode) | |
| # Capture pre-step meters so we can compute deltas for the bias | |
| pre_progress = env._progress | |
| pre_connection = env._connection | |
| obs = env.step(RhythmAction(action_type=action_type)) | |
| reward = obs.reward | |
| chosen = action_type.value | |
| # Grader-aligned bias (progress + connection deltas): shapes only | |
| # the GRPO-visible training reward, not the env's per-step reward. | |
| # Lives here rather than in env.step() so that the env's per-step | |
| # reward (used by adaptation_score in the grader) stays pure. | |
| progress_delta = env._progress - pre_progress | |
| connection_delta = env._connection - pre_connection | |
| reward += 0.5 * progress_delta + 0.4 * connection_delta | |
| # Diversity shaping: small nudges (~1/3 the magnitude of the env signal) | |
| # so they don't dominate it. Three terms: | |
| # - repetition penalty (action β₯2Γ in last 3) | |
| # - low-entropy window penalty (last 6 actions β€2 unique) | |
| # - new-action exploration bonus (until 6 distinct actions tried) | |
| if ep_history and len(ep_history) >= 2: | |
| recent3 = ep_history[-3:] | |
| if recent3.count(chosen) >= 2: | |
| reward -= 0.10 | |
| if ep_history and len(ep_history) >= 5: | |
| last6 = ep_history[-5:] + [chosen] | |
| if len(set(last6)) <= 2: | |
| reward -= 0.15 | |
| if ep_history is not None: | |
| seen = set(ep_history) | |
| if chosen not in seen and len(seen) < 6: | |
| reward += 0.07 | |
| # Belief-action coupling: rewards consistency between the agent's | |
| # emitted belief and its chosen action. Without this term, the | |
| # belief-first format only enforces consistency via causal attention | |
| # (weak signal); this provides an explicit gradient. | |
| _, b, b_provided = extract_action_and_belief(response) | |
| if b_provided: | |
| s_pref, m_pref, w_pref = b | |
| # High social β social actions; low social β solo actions | |
| if s_pref > 0.65 and chosen in {"socialize", "family_time"}: | |
| reward += 0.15 | |
| elif s_pref < 0.35 and chosen in {"meditate", "me_time"}: | |
| reward += 0.15 | |
| elif s_pref > 0.65 and chosen in {"meditate", "me_time"}: | |
| reward -= 0.10 # belief says extrovert, action says solo | |
| elif s_pref < 0.35 and chosen in {"socialize", "family_time"}: | |
| reward -= 0.10 # belief says introvert, action says social | |
| # High morning + morning slot + work β bonus | |
| slot = obs.slot if hasattr(obs, "slot") else 0 | |
| if m_pref > 0.65 and slot == 0 and chosen in {"deep_work", "learn"}: | |
| reward += 0.15 | |
| elif m_pref < 0.35 and slot in (2, 3) and chosen in {"deep_work", "learn"}: | |
| reward += 0.15 | |
| # High work β work actions | |
| if w_pref > 0.65 and chosen in {"deep_work", "learn", "admin_work"}: | |
| reward += 0.15 | |
| scores.append(reward) | |
| except Exception: | |
| scores.append(-3.0) | |
| return scores | |
| def belief_accuracy( | |
| completions, | |
| seed=None, | |
| step_index=None, | |
| action_history=None, | |
| profile_mode=None, | |
| **kwargs, | |
| ) -> list[float]: | |
| """ | |
| Layer 4: Belief-vector accuracy reward (the meta-learning signal). | |
| Reward = (1 β belief_mae) β constant_baseline_similarity, where the | |
| constant baseline is what a "5 5 5" emission would score for THIS profile. | |
| Subtracting the baseline matters: without it, a constant neutral emission | |
| earns positive reward on every step (~+0.34 Γ the layer weight) for zero | |
| actual learning, which silently re-creates the iter-1 mode-collapse pull. | |
| With it: constant emission β 0, better-than-baseline > 0, worse < 0. | |
| Belief reward is also skipped at step 0 β the agent has no information yet, | |
| so penalizing belief-vs-target there just biases toward a constant prior. | |
| """ | |
| scores = [] | |
| for i, completion in enumerate(completions): | |
| response = completion[0]["content"] if isinstance(completion, list) else completion | |
| _, belief, belief_provided = extract_action_and_belief(response) | |
| if not belief_provided: | |
| scores.append(-0.1) # weak push toward emitting belief | |
| continue | |
| # Resolve seed/mode/step for replay | |
| if seed is not None and i < len(seed): | |
| ep_seed = seed[i] | |
| ep_history = action_history[i] if action_history is not None else [] | |
| ep_mode = profile_mode[i] if (profile_mode is not None and i < len(profile_mode)) else "continuous" | |
| ep_step = step_index[i] if (step_index is not None and i < len(step_index)) else 0 | |
| else: | |
| scores.append(0.0) | |
| continue | |
| # Step 0: agent has no information yet β committing a belief here | |
| # would just pull the policy toward whatever constant prior the | |
| # base model emits. Skip the reward for that step. | |
| if ep_step == 0: | |
| scores.append(0.0) | |
| continue | |
| try: | |
| env = _replay_env(ep_seed, ep_history, ep_mode) | |
| true_belief = env.get_belief_target() | |
| mae = sum(abs(b - t) for b, t in zip(belief, true_belief)) / 3.0 | |
| similarity = 1.0 - mae | |
| baseline_mae = sum(abs(0.5 - t) for t in true_belief) / 3.0 | |
| baseline_similarity = 1.0 - baseline_mae | |
| # Reward = how much better than the constant-emit baseline | |
| scores.append(similarity - baseline_similarity) | |
| except Exception: | |
| scores.append(0.0) | |
| return scores | |