uvpatel7271's picture
envrionment setup
0695520
"""Deterministic task definitions for the code review environment."""
from __future__ import annotations
from dataclasses import dataclass, field
from textwrap import dedent
from typing import Any, Dict, List, Optional
def _code(value: str) -> str:
return dedent(value).strip() + "\n"
@dataclass(frozen=True)
class CallCase:
"""One executable function call used by graders."""
label: str
args: tuple[Any, ...] = ()
kwargs: Dict[str, Any] = field(default_factory=dict)
expected: Any = None
@dataclass(frozen=True)
class ReviewTask:
"""Static task definition."""
task_id: str
title: str
difficulty: str
task_kind: str
task_description: str
starter_code: str
reference_code: str
function_name: str
visible_tests: List[str]
public_cases: List[CallCase]
hidden_cases: List[CallCase]
repo_summary: str
changed_files: List[str]
available_files: List[str]
goal: str
max_steps: int
benchmark_config: Optional[Dict[str, int]] = None
TASKS: List[ReviewTask] = [
ReviewTask(
task_id="syntax_fix_invoice_totals",
title="Fix the invoice total syntax regression",
difficulty="easy",
task_kind="syntax_fix",
task_description=(
"A recent refactor broke the helper that normalizes invoice totals before "
"daily reconciliation. Repair the Python syntax so the function compiles "
"and returns the correct total for mixed integer and string inputs."
),
starter_code=_code(
"""
def normalize_invoice_totals(records):
cleaned = []
for record in records
if "total" not in record:
continue
value = int(record["total"])
cleaned.append(value)
return sum(cleaned
"""
),
reference_code=_code(
'''
def normalize_invoice_totals(records):
"""Return the sum of invoice totals that are present in the payload."""
cleaned = []
for record in records:
if "total" not in record:
continue
cleaned.append(int(record["total"]))
return sum(cleaned)
'''
),
function_name="normalize_invoice_totals",
visible_tests=[
"normalize_invoice_totals([{'total': '4'}, {'total': 5}, {}]) == 9",
"normalize_invoice_totals([]) == 0",
],
public_cases=[
CallCase(
label="mixed string and int totals",
args=([{"total": "4"}, {"total": 5}, {}],),
expected=9,
),
CallCase(label="empty input", args=([],), expected=0),
],
hidden_cases=[
CallCase(
label="skip missing totals",
args=([{}, {"total": "2"}, {"total": "8"}],),
expected=10,
),
CallCase(
label="handle negative adjustments",
args=([{"total": "11"}, {"total": -3}],),
expected=8,
),
],
repo_summary=(
"services/billing/reconciliation.py computes end-of-day invoice totals for "
"a CPU-only batch job."
),
changed_files=["services/billing/reconciliation.py"],
available_files=["services/billing/reconciliation.py", "tests/test_reconciliation.py"],
goal="Restore a compiling implementation for invoice total normalization.",
max_steps=6,
),
ReviewTask(
task_id="bug_fix_session_windows",
title="Repair session window collapsing logic",
difficulty="medium",
task_kind="bug_fix",
task_description=(
"The session aggregator regressed after a cleanup pass. Public tests expose "
"incorrect boundary handling and the final session is missing. Fix the logic "
"without changing the function contract."
),
starter_code=_code(
"""
def collapse_sessions(events, idle_timeout_minutes):
if not events:
return []
sessions = []
current_start = events[0]["minute"]
current_end = current_start
for event in events[1:]:
minute = event["minute"]
if minute - current_end > idle_timeout_minutes:
sessions.append((current_start, current_end))
current_start = minute
current_end = minute
return sessions
"""
),
reference_code=_code(
'''
def collapse_sessions(events, idle_timeout_minutes):
"""Collapse activity events into inclusive session windows."""
if not events:
return []
sessions = []
current_start = events[0]["minute"]
current_end = current_start
for event in events[1:]:
minute = event["minute"]
if minute - current_end >= idle_timeout_minutes:
sessions.append((current_start, current_end))
current_start = minute
current_end = minute
sessions.append((current_start, current_end))
return sessions
'''
),
function_name="collapse_sessions",
visible_tests=[
"collapse_sessions([{'minute': 1}, {'minute': 3}, {'minute': 8}], 4) == [(1, 3), (8, 8)]",
"collapse_sessions([{'minute': 5}, {'minute': 9}], 4) == [(5, 5), (9, 9)]",
],
public_cases=[
CallCase(
label="split when idle timeout is exceeded",
args=([{"minute": 1}, {"minute": 3}, {"minute": 8}], 4),
expected=[(1, 3), (8, 8)],
),
CallCase(
label="boundary is inclusive",
args=([{"minute": 5}, {"minute": 9}], 4),
expected=[(5, 5), (9, 9)],
),
],
hidden_cases=[
CallCase(
label="single continuous session",
args=([{"minute": 2}, {"minute": 4}, {"minute": 5}], 4),
expected=[(2, 5)],
),
CallCase(label="empty input", args=([], 10), expected=[]),
CallCase(
label="multiple boundaries",
args=([{"minute": 1}, {"minute": 5}, {"minute": 9}, {"minute": 14}], 4),
expected=[(1, 1), (5, 5), (9, 9), (14, 14)],
),
],
repo_summary=(
"analytics/sessionizer.py condenses sorted clickstream events into user "
"sessions for downstream retention reports."
),
changed_files=["analytics/sessionizer.py"],
available_files=["analytics/sessionizer.py", "tests/test_sessionizer.py"],
goal="Make session collapsing match the expected timeout semantics.",
max_steps=8,
),
ReviewTask(
task_id="optimization_rank_active_users",
title="Optimize the active-user ranking pipeline",
difficulty="hard",
task_kind="optimization",
task_description=(
"The reporting job is correct enough for small fixtures but too slow for the "
"daily production export. Preserve the API, keep the output deterministic, "
"and refactor the implementation for speed and readability."
),
starter_code=_code(
"""
def rank_active_users(events):
users = []
for event in events:
if event["status"] == "active":
found = False
for existing in users:
if existing == event["user_id"]:
found = True
if not found:
users.append(event["user_id"])
totals = []
for user in users:
count = 0
for event in events:
if event["status"] == "active" and event["user_id"] == user:
count = count + 1
totals.append((user, count))
totals.sort(key=lambda item: (-item[1], item[0]))
return totals
"""
),
reference_code=_code(
'''
from collections import Counter
def rank_active_users(events):
"""Return users ranked by number of active events."""
counts = Counter(
event["user_id"]
for event in events
if event["status"] == "active"
)
return sorted(counts.items(), key=lambda item: (-item[1], item[0]))
'''
),
function_name="rank_active_users",
visible_tests=[
"rank_active_users([{'user_id': 'b', 'status': 'active'}, {'user_id': 'a', 'status': 'active'}, {'user_id': 'b', 'status': 'inactive'}]) == [('a', 1), ('b', 1)]",
"rank_active_users([{'user_id': 'u1', 'status': 'active'}, {'user_id': 'u1', 'status': 'active'}, {'user_id': 'u2', 'status': 'active'}]) == [('u1', 2), ('u2', 1)]",
],
public_cases=[
CallCase(
label="inactive events are ignored",
args=([{"user_id": "b", "status": "active"}, {"user_id": "a", "status": "active"}, {"user_id": "b", "status": "inactive"}],),
expected=[("a", 1), ("b", 1)],
),
CallCase(
label="counts repeated active users",
args=([{"user_id": "u1", "status": "active"}, {"user_id": "u1", "status": "active"}, {"user_id": "u2", "status": "active"}],),
expected=[("u1", 2), ("u2", 1)],
),
],
hidden_cases=[
CallCase(
label="stable alphabetical tie-break",
args=([{"user_id": "u3", "status": "active"}, {"user_id": "u2", "status": "active"}, {"user_id": "u3", "status": "active"}, {"user_id": "u2", "status": "active"}],),
expected=[("u2", 2), ("u3", 2)],
),
CallCase(label="empty input", args=([],), expected=[]),
CallCase(
label="mixed active and inactive states",
args=([{"user_id": "x", "status": "inactive"}, {"user_id": "x", "status": "active"}, {"user_id": "y", "status": "active"}, {"user_id": "x", "status": "active"}],),
expected=[("x", 2), ("y", 1)],
),
],
repo_summary=(
"reports/activity_rankings.py feeds a nightly export that runs on a small CPU "
"instance and has become too slow after customer growth."
),
changed_files=["reports/activity_rankings.py"],
available_files=["reports/activity_rankings.py", "tests/test_activity_rankings.py"],
goal="Keep the output stable while improving runtime and code quality.",
max_steps=10,
benchmark_config={"user_pool": 240, "events_per_user": 36, "iterations": 8},
),
]
TASK_BY_ID = {task.task_id: task for task in TASKS}
def list_tasks() -> List[ReviewTask]:
"""Return all supported tasks."""
return list(TASKS)
def get_task(task_id: str) -> ReviewTask:
"""Fetch a task by identifier."""
try:
return TASK_BY_ID[task_id]
except KeyError as exc:
raise ValueError(f"Unknown task_id: {task_id}") from exc
def select_task(seed: Optional[int] = None, task_id: Optional[str] = None) -> ReviewTask:
"""Select a task deterministically by explicit id or seed."""
if task_id:
return get_task(task_id)
if seed is None:
return TASKS[0]
return TASKS[seed % len(TASKS)]