Spaces:
Sleeping
Sleeping
| """Deterministic graders for OpenEnv email triage tasks.""" | |
| import re | |
| from models import RewardResult, TriageAction | |
| ROUTE_ALIAS_MAP = { | |
| "billing": ["billing", "finance", "payments", "accounts"], | |
| "safety": ["safety", "compliance", "risk"], | |
| "engineering": ["engineering", "eng", "sre", "platform", "on-call"], | |
| "support": ["support", "helpdesk", "customer support"], | |
| "general": ["general", "inbox", "operations"], | |
| } | |
| SCORE_EPSILON = 1e-2 | |
| def _strict_binary_score(is_positive_case: bool) -> float: | |
| """Return strict in-range score for binary outcomes.""" | |
| return 1.0 - SCORE_EPSILON if is_positive_case else SCORE_EPSILON | |
| def _strict_ratio_score(raw_value: float) -> float: | |
| """Return strict in-range score for ratio-like metrics.""" | |
| return _clip_score(raw_value) | |
| def _clip_score(score_value: float) -> float: | |
| """Clip a score to the strict range (0.0, 1.0). | |
| Args: | |
| score_value: Raw score. | |
| Returns: | |
| Clipped score. | |
| """ | |
| clipped = max(0.0, min(1.0, score_value)) | |
| if clipped <= 0.0: | |
| return SCORE_EPSILON | |
| if clipped >= 1.0: | |
| return 1.0 - SCORE_EPSILON | |
| return clipped | |
| def _normalized_text(text_value: str) -> str: | |
| """Return normalized lowercase text for deterministic comparisons. | |
| Args: | |
| text_value: Input text. | |
| Returns: | |
| Normalized text. | |
| """ | |
| return text_value.strip().lower() | |
| def _route_matches(action_route: str, expected_route: str) -> bool: | |
| """Check if action route contains the expected route token. | |
| Args: | |
| action_route: Route provided by agent. | |
| expected_route: Route expected by ground truth. | |
| Returns: | |
| True when expected route is present in the action route. | |
| """ | |
| normalized_expected = _normalized_text(expected_route) | |
| if not normalized_expected: | |
| return False | |
| return normalized_expected in _canonical_route_tokens(action_route) | |
| def _canonical_route_tokens(action_route: str) -> set[str]: | |
| """Map free-form route text to canonical route categories.""" | |
| normalized_action = _normalized_text(action_route) | |
| if not normalized_action: | |
| return set() | |
| route_fragments = [ | |
| fragment.strip() | |
| for fragment in re.split(r"[,;/|]+", normalized_action) | |
| if fragment.strip() | |
| ] | |
| canonical: set[str] = set() | |
| for fragment in route_fragments: | |
| for route_name, aliases in ROUTE_ALIAS_MAP.items(): | |
| if any(alias in fragment for alias in aliases): | |
| canonical.add(route_name) | |
| break | |
| # Fallback for phrases without separators. | |
| if not canonical: | |
| for route_name, aliases in ROUTE_ALIAS_MAP.items(): | |
| if any(alias in normalized_action for alias in aliases): | |
| canonical.add(route_name) | |
| return canonical | |
| def _route_noise_penalty(action_route: str) -> float: | |
| """Penalize over-routing to many teams in one action.""" | |
| route_count = len(_canonical_route_tokens(action_route)) | |
| if route_count <= 2: | |
| return 0.0 | |
| return min(0.24, 0.08 * (route_count - 2)) | |
| def _summary_keyword_score(summary_text: str, ground_truth: dict) -> float: | |
| """Score summary quality using deterministic keyword overlap. | |
| Args: | |
| summary_text: Summary text produced by the agent. | |
| ground_truth: Ground-truth dict that may include summary keywords. | |
| Returns: | |
| Score in [0.0, 1.0] based on matched summary keywords. | |
| """ | |
| raw_keywords = ground_truth.get("summary_keywords", []) | |
| if not isinstance(raw_keywords, list): | |
| return _strict_binary_score(len(summary_text.strip()) >= 10) | |
| keywords = [ | |
| _normalized_text(str(keyword)) | |
| for keyword in raw_keywords | |
| if _normalized_text(str(keyword)) | |
| ] | |
| if not keywords: | |
| return _strict_binary_score(len(summary_text.strip()) >= 10) | |
| normalized_summary = _normalized_text(summary_text) | |
| matches = 0 | |
| for keyword in keywords: | |
| if keyword in normalized_summary: | |
| matches += 1 | |
| base_score = matches / len(keywords) | |
| # Discourage keyword stuffing and overly verbose summaries. | |
| word_count = len(re.findall(r"[a-z0-9'-]+", normalized_summary)) | |
| if word_count < 4: | |
| brevity_factor = 0.6 | |
| elif word_count <= 40: | |
| brevity_factor = 1.0 | |
| else: | |
| brevity_factor = max(0.45, 1.0 - (word_count - 40) * 0.02) | |
| list_like_penalty = 0.85 if normalized_summary.count(",") >= 6 and matches >= 3 else 1.0 | |
| return _clip_score(base_score * brevity_factor * list_like_penalty) | |
| def grade_easy(action: TriageAction, ground_truth: dict) -> RewardResult: | |
| """Grade easy task with deterministic partial credit. | |
| Args: | |
| action: Agent action for one email. | |
| ground_truth: Expected label and route. | |
| Returns: | |
| Deterministic reward result in [0.0, 1.0]. | |
| """ | |
| expected_label = _normalized_text(str(ground_truth.get("label", ""))) | |
| expected_route = _normalized_text(str(ground_truth.get("route_to", ""))) | |
| label_correct = _normalized_text(action.label) == expected_label | |
| route_correct = _route_matches(action.route_to, expected_route) | |
| summary_score = _summary_keyword_score(action.summary, ground_truth) | |
| noise_penalty = _route_noise_penalty(action.route_to) | |
| score_value = (0.6 if label_correct else 0.0) + (0.25 if route_correct else 0.0) | |
| score_value += 0.15 * summary_score | |
| score_value -= noise_penalty | |
| score_value = _clip_score(score_value) | |
| breakdown = { | |
| "label_match": _strict_binary_score(label_correct), | |
| "route_match": _strict_binary_score(route_correct), | |
| "summary_match": _strict_ratio_score(summary_score), | |
| "route_noise_penalty": _strict_ratio_score(noise_penalty), | |
| } | |
| feedback = "Easy-task grading completed with context summary scoring." | |
| return RewardResult(score=score_value, breakdown=breakdown, feedback=feedback) | |
| def grade_medium_step(action: TriageAction, truth: dict) -> RewardResult: | |
| """Grade one medium-task step without cumulative history effects.""" | |
| expected_label = _normalized_text(str(truth.get("label", ""))) | |
| expected_route = _normalized_text(str(truth.get("route_to", ""))) | |
| priority_weight = max(float(truth.get("priority_weight", 1.0)), 0.1) | |
| label_correct = _normalized_text(action.label) == expected_label | |
| route_correct = _route_matches(action.route_to, expected_route) | |
| summary_score = _summary_keyword_score(action.summary, truth) | |
| noise_penalty = _route_noise_penalty(action.route_to) | |
| per_email_score = (0.55 if label_correct else 0.0) + (0.3 if route_correct else 0.0) | |
| per_email_score += 0.15 * summary_score | |
| per_email_score -= noise_penalty | |
| per_email_score = _clip_score(per_email_score) | |
| weighted_step_score = _clip_score(per_email_score * min(priority_weight, 2.0)) | |
| return RewardResult( | |
| score=weighted_step_score, | |
| breakdown={ | |
| "label_match": _strict_binary_score(label_correct), | |
| "route_match": _strict_binary_score(route_correct), | |
| "summary_match": _strict_ratio_score(summary_score), | |
| "priority_weight": _strict_ratio_score(min(priority_weight / 2.0, 1.0)), | |
| "route_noise_penalty": _strict_ratio_score(noise_penalty), | |
| }, | |
| feedback="Medium-task step grading completed.", | |
| ) | |
| def grade_medium(actions: list[TriageAction], ground_truths: list[dict]) -> RewardResult: | |
| """Grade medium task using weighted per-email partial scoring. | |
| Args: | |
| actions: Agent actions for the medium task email queue. | |
| ground_truths: Expected action details for each email. | |
| Returns: | |
| Deterministic reward result in [0.0, 1.0]. | |
| """ | |
| comparable_count = min(len(actions), len(ground_truths)) | |
| if comparable_count == 0: | |
| return RewardResult( | |
| score=SCORE_EPSILON, | |
| breakdown={"emails_scored": SCORE_EPSILON, "weighted_average": SCORE_EPSILON}, | |
| feedback="No actions available for grading.", | |
| ) | |
| weighted_score_sum = 0.0 | |
| weight_sum = 0.0 | |
| label_hits = 0 | |
| route_hits = 0 | |
| summary_total = 0.0 | |
| noise_penalty_total = 0.0 | |
| for index in range(comparable_count): | |
| action = actions[index] | |
| truth = ground_truths[index] | |
| step_result = grade_medium_step(action, truth) | |
| priority_weight = max(float(truth.get("priority_weight", 1.0)), 0.1) | |
| weighted_score_sum += step_result.score | |
| weight_sum += min(priority_weight, 2.0) | |
| expected_label = _normalized_text(str(truth.get("label", ""))) | |
| expected_route = _normalized_text(str(truth.get("route_to", ""))) | |
| label_hits += 1 if _normalized_text(action.label) == expected_label else 0 | |
| route_hits += 1 if _route_matches(action.route_to, expected_route) else 0 | |
| summary_total += float(step_result.breakdown.get("summary_match", SCORE_EPSILON)) | |
| noise_penalty_total += float( | |
| step_result.breakdown.get("route_noise_penalty", SCORE_EPSILON) | |
| ) | |
| weighted_average = weighted_score_sum / weight_sum if weight_sum > 0.0 else 0.0 | |
| score_value = _clip_score(weighted_average) | |
| breakdown = { | |
| "emails_scored": _strict_ratio_score(float(comparable_count) / (comparable_count + 1.0)), | |
| "label_accuracy": _strict_ratio_score(label_hits / comparable_count), | |
| "route_accuracy": _strict_ratio_score(route_hits / comparable_count), | |
| "summary_accuracy": _strict_ratio_score(summary_total / comparable_count), | |
| "avg_route_noise_penalty": _strict_ratio_score(noise_penalty_total / comparable_count), | |
| "weighted_average": score_value, | |
| } | |
| feedback = "Weighted medium-task grading completed." | |
| return RewardResult(score=score_value, breakdown=breakdown, feedback=feedback) | |
| def grade_hard(action: TriageAction, ground_truth: dict) -> RewardResult: | |
| """Grade hard task using weighted policy-sensitive components. | |
| Args: | |
| action: Agent action for hard task case. | |
| ground_truth: Expected routing and urgency intent. | |
| Returns: | |
| Deterministic reward result in [0.0, 1.0]. | |
| """ | |
| expected_label = _normalized_text(str(ground_truth.get("label", "urgent"))) | |
| primary_route = _normalized_text(str(ground_truth.get("route_to", "safety"))) | |
| secondary_route = _normalized_text(str(ground_truth.get("cc_route", "billing"))) | |
| spam_penalty = float(ground_truth.get("penalize_spam", 0.2)) | |
| normalized_route = _normalized_text(action.route_to) | |
| has_primary_route = _route_matches(normalized_route, primary_route) | |
| has_secondary_route = _route_matches(normalized_route, secondary_route) | |
| urgent_label = _normalized_text(action.label) == expected_label | |
| summary_score = _summary_keyword_score(action.summary, ground_truth) | |
| noise_penalty = _route_noise_penalty(action.route_to) | |
| escalation_component = 0.35 if has_primary_route else 0.0 | |
| routing_component = 0.25 if has_secondary_route else 0.0 | |
| urgency_component = 0.25 if urgent_label else 0.0 | |
| summary_component = 0.15 * summary_score | |
| raw_score = escalation_component + routing_component + urgency_component + summary_component | |
| raw_score -= noise_penalty | |
| if _normalized_text(action.label) == "spam": | |
| raw_score -= spam_penalty | |
| score_value = _clip_score(raw_score) | |
| breakdown = { | |
| "escalation_component": _strict_ratio_score(escalation_component), | |
| "routing_component": _strict_ratio_score(routing_component), | |
| "urgency_component": _strict_ratio_score(urgency_component), | |
| "summary_component": _strict_ratio_score(summary_component), | |
| "route_noise_penalty": _strict_ratio_score(noise_penalty), | |
| "spam_penalty": _strict_ratio_score( | |
| spam_penalty if _normalized_text(action.label) == "spam" else SCORE_EPSILON | |
| ), | |
| } | |
| feedback = "Hard-task weighted policy grading completed." | |
| return RewardResult(score=score_value, breakdown=breakdown, feedback=feedback) | |