| """Reward helpers for SQLEnv dense shaping. |
| |
| Design: per-step clipping with delta-based progress (PBRS). |
| |
| Each step's reward is computed independently and clipped to [-0.10, 0.15]. |
| No cumulative tracking. The hard budget (15 steps) and step cost naturally |
| limit total exploration reward to ~0.15, making cumulative caps redundant. |
| |
| Progress uses delta-from-previous (potential-based reward shaping, Ng et al. |
| 1999): the agent is rewarded for improvement and penalized for regression. |
| This preserves the optimal policy and provides recovery signal. |
| |
| See docs/design-docs/reward-shaping-research.md for full rationale. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import math |
|
|
| try: |
| from sql_env.models import EpisodeContext |
| except ImportError: |
| from models import EpisodeContext |
|
|
|
|
| _EXEC_OK_REWARD = 0.02 |
| _NEW_INFO_REWARD = 0.01 |
| _REPEAT_PENALTY = 0.03 |
| _STEP_COST = 0.02 |
| _LAYER2_CARDINALITY_WEIGHT = 0.25 |
| _LAYER2_VALUE_OVERLAP_WEIGHT = 0.50 |
| _LAYER2_NUMERIC_RANGE_WEIGHT = 0.25 |
| _LAYER2_IMPROVEMENT_SCALE = 0.15 |
| _PER_STEP_FLOOR = -0.10 |
| _PER_STEP_CAP = 0.15 |
|
|
|
|
| def compute_step_reward( |
| ctx: EpisodeContext, |
| action_type: str, |
| sql: str, |
| rows: list[tuple] | None, |
| error: str | None, |
| ) -> float: |
| """Compute one dense step reward with per-step clipping. |
| |
| Combines Layer 1 operational shaping with Layer 2 delta-based progress |
| for successful QUERY actions. The result is clipped per step to |
| [-0.10, 0.15]. No cumulative tracking. |
| """ |
|
|
| step_reward = _layer1_operational(ctx, action_type, sql, rows, error) |
|
|
| if action_type.upper() == "QUERY" and rows is not None and error is None: |
| step_reward += _layer2_progress(ctx, rows) |
|
|
| return max(_PER_STEP_FLOOR, min(_PER_STEP_CAP, step_reward)) |
|
|
|
|
| def _layer1_operational( |
| ctx: EpisodeContext, |
| action_type: str, |
| sql: str, |
| rows: list[tuple] | None, |
| error: str | None, |
| ) -> float: |
| """Compute Layer 1 operational reward signals. |
| |
| - ``+0.02`` for successful execution (``error is None``) |
| - ``+0.01`` for first-seen successful QUERY (unique SQL hash) |
| - ``-0.03`` repeat penalty for duplicate QUERY SQL |
| - ``-0.02`` step cost on every call |
| """ |
|
|
| reward = -_STEP_COST |
|
|
| is_query = action_type.upper() == "QUERY" |
| query_hash: str | None = None |
| is_repeat = False |
|
|
| if is_query and sql: |
| query_hash = hashlib.sha256(sql.encode("utf-8")).hexdigest() |
| is_repeat = query_hash in ctx.query_hashes |
|
|
| if is_repeat: |
| reward -= _REPEAT_PENALTY |
| elif error is None: |
| reward += _EXEC_OK_REWARD |
|
|
| if ( |
| is_query |
| and error is None |
| and rows is not None |
| and query_hash is not None |
| and not is_repeat |
| ): |
| ctx.query_hashes.add(query_hash) |
| reward += _NEW_INFO_REWARD |
|
|
| return reward |
|
|
|
|
| def _cardinality_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float: |
| """Compute row-count similarity score in [0.0, 1.0].""" |
|
|
| pred_count = len(pred_rows) |
| gold_count = len(gold_rows) |
| denominator = max(pred_count, gold_count, 1) |
| score = 1.0 - (abs(pred_count - gold_count) / denominator) |
| return max(0.0, min(1.0, score)) |
|
|
|
|
| def _value_overlap_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float: |
| """Compute Jaccard overlap of flattened cell values as strings.""" |
|
|
| pred_values = {str(cell) for row in pred_rows for cell in row} |
| gold_values = {str(cell) for row in gold_rows for cell in row} |
|
|
| union = pred_values | gold_values |
| if not union: |
| return 0.0 |
|
|
| intersection = pred_values & gold_values |
| return len(intersection) / len(union) |
|
|
|
|
| def _numeric_range_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float: |
| """Compute log-distance proximity for numeric cell values.""" |
|
|
| def _is_numeric(value: object) -> bool: |
| return isinstance(value, (int, float)) and not isinstance(value, bool) |
|
|
| pred_numerics = [ |
| float(cell) for row in pred_rows for cell in row if _is_numeric(cell) |
| ] |
| gold_numerics = [ |
| float(cell) for row in gold_rows for cell in row if _is_numeric(cell) |
| ] |
|
|
| if not gold_numerics: |
| return 1.0 |
| if not pred_numerics: |
| return 0.0 |
|
|
| total = 0.0 |
| for gold_value in gold_numerics: |
| closest_distance = min( |
| abs(pred_value - gold_value) for pred_value in pred_numerics |
| ) |
| total += 1.0 / (1.0 + math.log1p(closest_distance)) |
|
|
| return total / len(gold_numerics) |
|
|
|
|
| def _bin_progress(raw_score: float) -> float: |
| """Bin raw progress to one of {0.0, 0.25, 0.5, 0.75, 1.0}.""" |
|
|
| clamped_score = max(0.0, min(1.0, raw_score)) |
| if clamped_score < 0.125: |
| return 0.0 |
| if clamped_score < 0.375: |
| return 0.25 |
| if clamped_score < 0.625: |
| return 0.5 |
| if clamped_score < 0.875: |
| return 0.75 |
| return 1.0 |
|
|
|
|
| def _layer2_progress(ctx: EpisodeContext, rows: list[tuple]) -> float: |
| """Compute Layer 2 progress reward using delta-from-previous (PBRS). |
| |
| Rewards improvement and penalizes regression relative to the previous |
| step's progress score. This is potential-based reward shaping with |
| gamma=1 (Ng et al. 1999), which provably preserves the optimal policy. |
| """ |
|
|
| if not ctx.gold_rows: |
| return 0.0 |
|
|
| cardinality = _cardinality_score(rows, ctx.gold_rows) |
| value_overlap = _value_overlap_score(rows, ctx.gold_rows) |
| numeric_range = _numeric_range_score(rows, ctx.gold_rows) |
|
|
| raw_progress = ( |
| _LAYER2_CARDINALITY_WEIGHT * cardinality |
| + _LAYER2_VALUE_OVERLAP_WEIGHT * value_overlap |
| + _LAYER2_NUMERIC_RANGE_WEIGHT * numeric_range |
| ) |
| binned_progress = _bin_progress(raw_progress) |
|
|
| delta = binned_progress - ctx.previous_progress |
| ctx.previous_progress = binned_progress |
| return delta * _LAYER2_IMPROVEMENT_SCALE |
|
|