sql_env / server /reward.py
hjerpe's picture
Upload folder using huggingface_hub
9e64e71 verified
"""Reward helpers for SQLEnv dense shaping.
Design: per-step clipping with delta-based progress (PBRS).
Each step's reward is computed independently and clipped to [-0.10, 0.15].
No cumulative tracking. The hard budget (15 steps) and step cost naturally
limit total exploration reward to ~0.15, making cumulative caps redundant.
Progress uses delta-from-previous (potential-based reward shaping, Ng et al.
1999): the agent is rewarded for improvement and penalized for regression.
This preserves the optimal policy and provides recovery signal.
See docs/design-docs/reward-shaping-research.md for full rationale.
"""
from __future__ import annotations
import hashlib
import math
try:
from sql_env.models import EpisodeContext
except ImportError: # pragma: no cover - Docker fallback import path
from models import EpisodeContext # type: ignore[no-redef]
_EXEC_OK_REWARD = 0.02
_NEW_INFO_REWARD = 0.01
_REPEAT_PENALTY = 0.03
_STEP_COST = 0.02
_LAYER2_CARDINALITY_WEIGHT = 0.25
_LAYER2_VALUE_OVERLAP_WEIGHT = 0.50
_LAYER2_NUMERIC_RANGE_WEIGHT = 0.25
_LAYER2_IMPROVEMENT_SCALE = 0.15
_PER_STEP_FLOOR = -0.10
_PER_STEP_CAP = 0.15
def compute_step_reward(
ctx: EpisodeContext,
action_type: str,
sql: str,
rows: list[tuple] | None,
error: str | None,
) -> float:
"""Compute one dense step reward with per-step clipping.
Combines Layer 1 operational shaping with Layer 2 delta-based progress
for successful QUERY actions. The result is clipped per step to
[-0.10, 0.15]. No cumulative tracking.
"""
step_reward = _layer1_operational(ctx, action_type, sql, rows, error)
if action_type.upper() == "QUERY" and rows is not None and error is None:
step_reward += _layer2_progress(ctx, rows)
return max(_PER_STEP_FLOOR, min(_PER_STEP_CAP, step_reward))
def _layer1_operational(
ctx: EpisodeContext,
action_type: str,
sql: str,
rows: list[tuple] | None,
error: str | None,
) -> float:
"""Compute Layer 1 operational reward signals.
- ``+0.02`` for successful execution (``error is None``)
- ``+0.01`` for first-seen successful QUERY (unique SQL hash)
- ``-0.03`` repeat penalty for duplicate QUERY SQL
- ``-0.02`` step cost on every call
"""
reward = -_STEP_COST
is_query = action_type.upper() == "QUERY"
query_hash: str | None = None
is_repeat = False
if is_query and sql:
query_hash = hashlib.sha256(sql.encode("utf-8")).hexdigest()
is_repeat = query_hash in ctx.query_hashes
if is_repeat:
reward -= _REPEAT_PENALTY
elif error is None:
reward += _EXEC_OK_REWARD
if (
is_query
and error is None
and rows is not None
and query_hash is not None
and not is_repeat
):
ctx.query_hashes.add(query_hash)
reward += _NEW_INFO_REWARD
return reward
def _cardinality_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float:
"""Compute row-count similarity score in [0.0, 1.0]."""
pred_count = len(pred_rows)
gold_count = len(gold_rows)
denominator = max(pred_count, gold_count, 1)
score = 1.0 - (abs(pred_count - gold_count) / denominator)
return max(0.0, min(1.0, score))
def _value_overlap_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float:
"""Compute Jaccard overlap of flattened cell values as strings."""
pred_values = {str(cell) for row in pred_rows for cell in row}
gold_values = {str(cell) for row in gold_rows for cell in row}
union = pred_values | gold_values
if not union:
return 0.0
intersection = pred_values & gold_values
return len(intersection) / len(union)
def _numeric_range_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float:
"""Compute log-distance proximity for numeric cell values."""
def _is_numeric(value: object) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
pred_numerics = [
float(cell) for row in pred_rows for cell in row if _is_numeric(cell)
]
gold_numerics = [
float(cell) for row in gold_rows for cell in row if _is_numeric(cell)
]
if not gold_numerics:
return 1.0
if not pred_numerics:
return 0.0
total = 0.0
for gold_value in gold_numerics:
closest_distance = min(
abs(pred_value - gold_value) for pred_value in pred_numerics
)
total += 1.0 / (1.0 + math.log1p(closest_distance))
return total / len(gold_numerics)
def _bin_progress(raw_score: float) -> float:
"""Bin raw progress to one of {0.0, 0.25, 0.5, 0.75, 1.0}."""
clamped_score = max(0.0, min(1.0, raw_score))
if clamped_score < 0.125:
return 0.0
if clamped_score < 0.375:
return 0.25
if clamped_score < 0.625:
return 0.5
if clamped_score < 0.875:
return 0.75
return 1.0
def _layer2_progress(ctx: EpisodeContext, rows: list[tuple]) -> float:
"""Compute Layer 2 progress reward using delta-from-previous (PBRS).
Rewards improvement and penalizes regression relative to the previous
step's progress score. This is potential-based reward shaping with
gamma=1 (Ng et al. 1999), which provably preserves the optimal policy.
"""
if not ctx.gold_rows:
return 0.0
cardinality = _cardinality_score(rows, ctx.gold_rows)
value_overlap = _value_overlap_score(rows, ctx.gold_rows)
numeric_range = _numeric_range_score(rows, ctx.gold_rows)
raw_progress = (
_LAYER2_CARDINALITY_WEIGHT * cardinality
+ _LAYER2_VALUE_OVERLAP_WEIGHT * value_overlap
+ _LAYER2_NUMERIC_RANGE_WEIGHT * numeric_range
)
binned_progress = _bin_progress(raw_progress)
delta = binned_progress - ctx.previous_progress
ctx.previous_progress = binned_progress
return delta * _LAYER2_IMPROVEMENT_SCALE