Spaces:
Sleeping
Sleeping
File size: 4,706 Bytes
e415506 eb340e3 e415506 eb340e3 e415506 eb340e3 e415506 eb340e3 e415506 eb340e3 e415506 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | """Named benchmark tasks and deterministic task graders."""
from __future__ import annotations
from dataclasses import dataclass, field
try:
from .evaluation import EvaluationConfig, EvaluationResult, evaluate_submission
from .models import BenchmarkTaskSpec, MetricSubmissionRow
from .server.data_generator import EpisodeConfig
except ImportError:
from evaluation import EvaluationConfig, EvaluationResult, evaluate_submission
from models import BenchmarkTaskSpec, MetricSubmissionRow
from server.data_generator import EpisodeConfig
DEFAULT_GRADER_NAME = "deterministic_exact_match"
@dataclass(frozen=True)
class TaskSpec:
"""A concrete benchmark task that an agent can solve and be graded on."""
task_id: str
difficulty: str
instruction: str
objective: str
seed: int
scenario_family: str
anomaly_density: str
anomaly_count: int
grader_name: str = DEFAULT_GRADER_NAME
evaluation_config: EvaluationConfig = field(default_factory=EvaluationConfig)
def build_episode_config(self) -> EpisodeConfig:
"""Return the canonical episode configuration for this task."""
return EpisodeConfig(
seed=self.seed,
scenario_family=self.scenario_family,
difficulty=self.difficulty,
anomaly_density=self.anomaly_density,
anomaly_count=self.anomaly_count,
).normalized()
def grade_submission(
self,
submitted_rows: list[dict] | list[MetricSubmissionRow],
expected_rows: list[MetricSubmissionRow],
*,
config: EvaluationConfig | None = None,
include_debug_expected: bool = False,
) -> EvaluationResult:
"""Grade one candidate submission for this task."""
return evaluate_submission(
submitted_rows,
expected_rows,
config=config or self.evaluation_config,
include_debug_expected=include_debug_expected,
)
def to_model(self) -> BenchmarkTaskSpec:
"""Return a typed summary safe to expose in observations."""
return BenchmarkTaskSpec(
task_id=self.task_id,
difficulty=self.difficulty,
instruction=self.instruction,
objective=self.objective,
scenario_family=self.scenario_family,
anomaly_density=self.anomaly_density,
anomaly_count=self.anomaly_count,
grader_name=self.grader_name,
)
TASKS: dict[str, TaskSpec] = {
"easy_single_spike": TaskSpec(
task_id="easy_single_spike",
difficulty="easy",
instruction=(
"Investigate the seeded funnel dataset and submit every anomalous row. "
"Use the shared analysis methods before submitting."
),
objective=(
"Find all anomalies and submit every correctly populated anomaly row."
),
seed=11,
scenario_family="rate_spike_from_median",
anomaly_density="low",
anomaly_count=2,
),
"medium_mixed_pair": TaskSpec(
task_id="medium_mixed_pair",
difficulty="medium",
instruction=(
"Investigate the seeded funnel dataset and submit every anomalous row. "
"Expect both event-count and conversion-rate reasoning."
),
objective=(
"Find the full set of medium-difficulty anomalies without submitting extras."
),
seed=23,
scenario_family="mixed",
anomaly_density="medium",
anomaly_count=3,
),
"hard_mixed_multi": TaskSpec(
task_id="hard_mixed_multi",
difficulty="hard",
instruction=(
"Investigate the seeded funnel dataset and submit every anomalous row. "
"Some anomalies are subtle, so use the analysis methods carefully and avoid over-submitting."
),
objective=(
"Recover the complete set of hard mixed anomalies while preserving precision."
),
seed=37,
scenario_family="mixed",
anomaly_density="high",
anomaly_count=4,
),
}
DEFAULT_TASK_ORDER: tuple[str, ...] = (
"easy_single_spike",
"medium_mixed_pair",
"hard_mixed_multi",
)
DEFAULT_TASK_ID = DEFAULT_TASK_ORDER[0]
def get_task_spec(task_id: str) -> TaskSpec:
"""Return the task spec for a known task id."""
try:
return TASKS[task_id]
except KeyError as exc:
raise ValueError(f"Unsupported task_id: {task_id}") from exc
def available_task_specs() -> list[BenchmarkTaskSpec]:
"""Return typed summaries for all named benchmark tasks."""
return [TASKS[task_id].to_model() for task_id in DEFAULT_TASK_ORDER]
|