Spaces:
Runtime error
Runtime error
File size: 10,939 Bytes
02e973e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 | """Deterministic graders for OpenEnv email triage tasks."""
import re
from models import RewardResult, TriageAction
ROUTE_ALIAS_MAP = {
"billing": ["billing", "finance", "payments", "accounts"],
"safety": ["safety", "compliance", "risk"],
"engineering": ["engineering", "eng", "sre", "platform", "on-call"],
"support": ["support", "helpdesk", "customer support"],
"general": ["general", "inbox", "operations"],
}
def _clip_score(score_value: float) -> float:
"""Clip a score to the inclusive range [0.0, 1.0].
Args:
score_value: Raw score.
Returns:
Clipped score.
"""
return max(0.0, min(1.0, score_value))
def _normalized_text(text_value: str) -> str:
"""Return normalized lowercase text for deterministic comparisons.
Args:
text_value: Input text.
Returns:
Normalized text.
"""
return text_value.strip().lower()
def _route_matches(action_route: str, expected_route: str) -> bool:
"""Check if action route contains the expected route token.
Args:
action_route: Route provided by agent.
expected_route: Route expected by ground truth.
Returns:
True when expected route is present in the action route.
"""
normalized_expected = _normalized_text(expected_route)
if not normalized_expected:
return False
return normalized_expected in _canonical_route_tokens(action_route)
def _canonical_route_tokens(action_route: str) -> set[str]:
"""Map free-form route text to canonical route categories."""
normalized_action = _normalized_text(action_route)
if not normalized_action:
return set()
route_fragments = [
fragment.strip()
for fragment in re.split(r"[,;/|]+", normalized_action)
if fragment.strip()
]
canonical: set[str] = set()
for fragment in route_fragments:
for route_name, aliases in ROUTE_ALIAS_MAP.items():
if any(alias in fragment for alias in aliases):
canonical.add(route_name)
break
# Fallback for phrases without separators.
if not canonical:
for route_name, aliases in ROUTE_ALIAS_MAP.items():
if any(alias in normalized_action for alias in aliases):
canonical.add(route_name)
return canonical
def _route_noise_penalty(action_route: str) -> float:
"""Penalize over-routing to many teams in one action."""
route_count = len(_canonical_route_tokens(action_route))
if route_count <= 2:
return 0.0
return min(0.24, 0.08 * (route_count - 2))
def _summary_keyword_score(summary_text: str, ground_truth: dict) -> float:
"""Score summary quality using deterministic keyword overlap.
Args:
summary_text: Summary text produced by the agent.
ground_truth: Ground-truth dict that may include summary keywords.
Returns:
Score in [0.0, 1.0] based on matched summary keywords.
"""
raw_keywords = ground_truth.get("summary_keywords", [])
if not isinstance(raw_keywords, list):
return 1.0 if len(summary_text.strip()) >= 10 else 0.0
keywords = [
_normalized_text(str(keyword))
for keyword in raw_keywords
if _normalized_text(str(keyword))
]
if not keywords:
return 1.0 if len(summary_text.strip()) >= 10 else 0.0
normalized_summary = _normalized_text(summary_text)
matches = 0
for keyword in keywords:
if keyword in normalized_summary:
matches += 1
base_score = matches / len(keywords)
# Discourage keyword stuffing and overly verbose summaries.
word_count = len(re.findall(r"[a-z0-9'-]+", normalized_summary))
if word_count < 4:
brevity_factor = 0.6
elif word_count <= 40:
brevity_factor = 1.0
else:
brevity_factor = max(0.45, 1.0 - (word_count - 40) * 0.02)
list_like_penalty = 0.85 if normalized_summary.count(",") >= 6 and matches >= 3 else 1.0
return _clip_score(base_score * brevity_factor * list_like_penalty)
def grade_easy(action: TriageAction, ground_truth: dict) -> RewardResult:
"""Grade easy task with deterministic partial credit.
Args:
action: Agent action for one email.
ground_truth: Expected label and route.
Returns:
Deterministic reward result in [0.0, 1.0].
"""
expected_label = _normalized_text(str(ground_truth.get("label", "")))
expected_route = _normalized_text(str(ground_truth.get("route_to", "")))
label_correct = _normalized_text(action.label) == expected_label
route_correct = _route_matches(action.route_to, expected_route)
summary_score = _summary_keyword_score(action.summary, ground_truth)
noise_penalty = _route_noise_penalty(action.route_to)
score_value = (0.6 if label_correct else 0.0) + (0.25 if route_correct else 0.0)
score_value += 0.15 * summary_score
score_value -= noise_penalty
score_value = _clip_score(score_value)
breakdown = {
"label_match": 1.0 if label_correct else 0.0,
"route_match": 1.0 if route_correct else 0.0,
"summary_match": round(summary_score, 4),
"route_noise_penalty": round(noise_penalty, 4),
}
feedback = "Easy-task grading completed with context summary scoring."
return RewardResult(score=score_value, breakdown=breakdown, feedback=feedback)
def grade_medium_step(action: TriageAction, truth: dict) -> RewardResult:
"""Grade one medium-task step without cumulative history effects."""
expected_label = _normalized_text(str(truth.get("label", "")))
expected_route = _normalized_text(str(truth.get("route_to", "")))
priority_weight = max(float(truth.get("priority_weight", 1.0)), 0.1)
label_correct = _normalized_text(action.label) == expected_label
route_correct = _route_matches(action.route_to, expected_route)
summary_score = _summary_keyword_score(action.summary, truth)
noise_penalty = _route_noise_penalty(action.route_to)
per_email_score = (0.55 if label_correct else 0.0) + (0.3 if route_correct else 0.0)
per_email_score += 0.15 * summary_score
per_email_score -= noise_penalty
per_email_score = _clip_score(per_email_score)
weighted_step_score = _clip_score(per_email_score * min(priority_weight, 2.0))
return RewardResult(
score=weighted_step_score,
breakdown={
"label_match": 1.0 if label_correct else 0.0,
"route_match": 1.0 if route_correct else 0.0,
"summary_match": round(summary_score, 4),
"priority_weight": round(priority_weight, 4),
"route_noise_penalty": round(noise_penalty, 4),
},
feedback="Medium-task step grading completed.",
)
def grade_medium(actions: list[TriageAction], ground_truths: list[dict]) -> RewardResult:
"""Grade medium task using weighted per-email partial scoring.
Args:
actions: Agent actions for the medium task email queue.
ground_truths: Expected action details for each email.
Returns:
Deterministic reward result in [0.0, 1.0].
"""
comparable_count = min(len(actions), len(ground_truths))
if comparable_count == 0:
return RewardResult(
score=0.0,
breakdown={"emails_scored": 0.0, "weighted_average": 0.0},
feedback="No actions available for grading.",
)
weighted_score_sum = 0.0
weight_sum = 0.0
label_hits = 0
route_hits = 0
summary_total = 0.0
noise_penalty_total = 0.0
for index in range(comparable_count):
action = actions[index]
truth = ground_truths[index]
step_result = grade_medium_step(action, truth)
priority_weight = float(step_result.breakdown.get("priority_weight", 1.0))
weighted_score_sum += step_result.score
weight_sum += min(priority_weight, 2.0)
label_hits += 1 if step_result.breakdown.get("label_match", 0.0) > 0 else 0
route_hits += 1 if step_result.breakdown.get("route_match", 0.0) > 0 else 0
summary_total += float(step_result.breakdown.get("summary_match", 0.0))
noise_penalty_total += float(step_result.breakdown.get("route_noise_penalty", 0.0))
weighted_average = weighted_score_sum / weight_sum if weight_sum > 0.0 else 0.0
score_value = _clip_score(weighted_average)
breakdown = {
"emails_scored": float(comparable_count),
"label_accuracy": label_hits / comparable_count,
"route_accuracy": route_hits / comparable_count,
"summary_accuracy": summary_total / comparable_count,
"avg_route_noise_penalty": noise_penalty_total / comparable_count,
"weighted_average": score_value,
}
feedback = "Weighted medium-task grading completed."
return RewardResult(score=score_value, breakdown=breakdown, feedback=feedback)
def grade_hard(action: TriageAction, ground_truth: dict) -> RewardResult:
"""Grade hard task using weighted policy-sensitive components.
Args:
action: Agent action for hard task case.
ground_truth: Expected routing and urgency intent.
Returns:
Deterministic reward result in [0.0, 1.0].
"""
expected_label = _normalized_text(str(ground_truth.get("label", "urgent")))
primary_route = _normalized_text(str(ground_truth.get("route_to", "safety")))
secondary_route = _normalized_text(str(ground_truth.get("cc_route", "billing")))
spam_penalty = float(ground_truth.get("penalize_spam", 0.2))
normalized_route = _normalized_text(action.route_to)
has_primary_route = _route_matches(normalized_route, primary_route)
has_secondary_route = _route_matches(normalized_route, secondary_route)
urgent_label = _normalized_text(action.label) == expected_label
summary_score = _summary_keyword_score(action.summary, ground_truth)
noise_penalty = _route_noise_penalty(action.route_to)
escalation_component = 0.35 if has_primary_route else 0.0
routing_component = 0.25 if has_secondary_route else 0.0
urgency_component = 0.25 if urgent_label else 0.0
summary_component = 0.15 * summary_score
raw_score = escalation_component + routing_component + urgency_component + summary_component
raw_score -= noise_penalty
if _normalized_text(action.label) == "spam":
raw_score -= spam_penalty
score_value = _clip_score(raw_score)
breakdown = {
"escalation_component": escalation_component,
"routing_component": routing_component,
"urgency_component": urgency_component,
"summary_component": round(summary_component, 4),
"route_noise_penalty": round(noise_penalty, 4),
"spam_penalty": spam_penalty if _normalized_text(action.label) == "spam" else 0.0,
}
feedback = "Hard-task weighted policy grading completed."
return RewardResult(score=score_value, breakdown=breakdown, feedback=feedback)
|