Spaces:
Runtime error
Runtime error
| """Deterministic task definitions for the code review environment.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from textwrap import dedent | |
| from typing import Any, Dict, List, Optional | |
| def _code(value: str) -> str: | |
| return dedent(value).strip() + "\n" | |
| class CallCase: | |
| """One executable function call used by graders.""" | |
| label: str | |
| args: tuple[Any, ...] = () | |
| kwargs: Dict[str, Any] = field(default_factory=dict) | |
| expected: Any = None | |
| class ReviewTask: | |
| """Static task definition.""" | |
| task_id: str | |
| title: str | |
| difficulty: str | |
| task_kind: str | |
| task_description: str | |
| starter_code: str | |
| reference_code: str | |
| function_name: str | |
| visible_tests: List[str] | |
| public_cases: List[CallCase] | |
| hidden_cases: List[CallCase] | |
| repo_summary: str | |
| changed_files: List[str] | |
| available_files: List[str] | |
| goal: str | |
| max_steps: int | |
| benchmark_config: Optional[Dict[str, int]] = None | |
| TASKS: List[ReviewTask] = [ | |
| ReviewTask( | |
| task_id="syntax_fix_invoice_totals", | |
| title="Fix the invoice total syntax regression", | |
| difficulty="easy", | |
| task_kind="syntax_fix", | |
| task_description=( | |
| "A recent refactor broke the helper that normalizes invoice totals before " | |
| "daily reconciliation. Repair the Python syntax so the function compiles " | |
| "and returns the correct total for mixed integer and string inputs." | |
| ), | |
| starter_code=_code( | |
| """ | |
| def normalize_invoice_totals(records): | |
| cleaned = [] | |
| for record in records | |
| if "total" not in record: | |
| continue | |
| value = int(record["total"]) | |
| cleaned.append(value) | |
| return sum(cleaned | |
| """ | |
| ), | |
| reference_code=_code( | |
| ''' | |
| def normalize_invoice_totals(records): | |
| """Return the sum of invoice totals that are present in the payload.""" | |
| cleaned = [] | |
| for record in records: | |
| if "total" not in record: | |
| continue | |
| cleaned.append(int(record["total"])) | |
| return sum(cleaned) | |
| ''' | |
| ), | |
| function_name="normalize_invoice_totals", | |
| visible_tests=[ | |
| "normalize_invoice_totals([{'total': '4'}, {'total': 5}, {}]) == 9", | |
| "normalize_invoice_totals([]) == 0", | |
| ], | |
| public_cases=[ | |
| CallCase( | |
| label="mixed string and int totals", | |
| args=([{"total": "4"}, {"total": 5}, {}],), | |
| expected=9, | |
| ), | |
| CallCase(label="empty input", args=([],), expected=0), | |
| ], | |
| hidden_cases=[ | |
| CallCase( | |
| label="skip missing totals", | |
| args=([{}, {"total": "2"}, {"total": "8"}],), | |
| expected=10, | |
| ), | |
| CallCase( | |
| label="handle negative adjustments", | |
| args=([{"total": "11"}, {"total": -3}],), | |
| expected=8, | |
| ), | |
| ], | |
| repo_summary=( | |
| "services/billing/reconciliation.py computes end-of-day invoice totals for " | |
| "a CPU-only batch job." | |
| ), | |
| changed_files=["services/billing/reconciliation.py"], | |
| available_files=["services/billing/reconciliation.py", "tests/test_reconciliation.py"], | |
| goal="Restore a compiling implementation for invoice total normalization.", | |
| max_steps=6, | |
| ), | |
| ReviewTask( | |
| task_id="bug_fix_session_windows", | |
| title="Repair session window collapsing logic", | |
| difficulty="medium", | |
| task_kind="bug_fix", | |
| task_description=( | |
| "The session aggregator regressed after a cleanup pass. Public tests expose " | |
| "incorrect boundary handling and the final session is missing. Fix the logic " | |
| "without changing the function contract." | |
| ), | |
| starter_code=_code( | |
| """ | |
| def collapse_sessions(events, idle_timeout_minutes): | |
| if not events: | |
| return [] | |
| sessions = [] | |
| current_start = events[0]["minute"] | |
| current_end = current_start | |
| for event in events[1:]: | |
| minute = event["minute"] | |
| if minute - current_end > idle_timeout_minutes: | |
| sessions.append((current_start, current_end)) | |
| current_start = minute | |
| current_end = minute | |
| return sessions | |
| """ | |
| ), | |
| reference_code=_code( | |
| ''' | |
| def collapse_sessions(events, idle_timeout_minutes): | |
| """Collapse activity events into inclusive session windows.""" | |
| if not events: | |
| return [] | |
| sessions = [] | |
| current_start = events[0]["minute"] | |
| current_end = current_start | |
| for event in events[1:]: | |
| minute = event["minute"] | |
| if minute - current_end >= idle_timeout_minutes: | |
| sessions.append((current_start, current_end)) | |
| current_start = minute | |
| current_end = minute | |
| sessions.append((current_start, current_end)) | |
| return sessions | |
| ''' | |
| ), | |
| function_name="collapse_sessions", | |
| visible_tests=[ | |
| "collapse_sessions([{'minute': 1}, {'minute': 3}, {'minute': 8}], 4) == [(1, 3), (8, 8)]", | |
| "collapse_sessions([{'minute': 5}, {'minute': 9}], 4) == [(5, 5), (9, 9)]", | |
| ], | |
| public_cases=[ | |
| CallCase( | |
| label="split when idle timeout is exceeded", | |
| args=([{"minute": 1}, {"minute": 3}, {"minute": 8}], 4), | |
| expected=[(1, 3), (8, 8)], | |
| ), | |
| CallCase( | |
| label="boundary is inclusive", | |
| args=([{"minute": 5}, {"minute": 9}], 4), | |
| expected=[(5, 5), (9, 9)], | |
| ), | |
| ], | |
| hidden_cases=[ | |
| CallCase( | |
| label="single continuous session", | |
| args=([{"minute": 2}, {"minute": 4}, {"minute": 5}], 4), | |
| expected=[(2, 5)], | |
| ), | |
| CallCase(label="empty input", args=([], 10), expected=[]), | |
| CallCase( | |
| label="multiple boundaries", | |
| args=([{"minute": 1}, {"minute": 5}, {"minute": 9}, {"minute": 14}], 4), | |
| expected=[(1, 1), (5, 5), (9, 9), (14, 14)], | |
| ), | |
| ], | |
| repo_summary=( | |
| "analytics/sessionizer.py condenses sorted clickstream events into user " | |
| "sessions for downstream retention reports." | |
| ), | |
| changed_files=["analytics/sessionizer.py"], | |
| available_files=["analytics/sessionizer.py", "tests/test_sessionizer.py"], | |
| goal="Make session collapsing match the expected timeout semantics.", | |
| max_steps=8, | |
| ), | |
| ReviewTask( | |
| task_id="optimization_rank_active_users", | |
| title="Optimize the active-user ranking pipeline", | |
| difficulty="hard", | |
| task_kind="optimization", | |
| task_description=( | |
| "The reporting job is correct enough for small fixtures but too slow for the " | |
| "daily production export. Preserve the API, keep the output deterministic, " | |
| "and refactor the implementation for speed and readability." | |
| ), | |
| starter_code=_code( | |
| """ | |
| def rank_active_users(events): | |
| users = [] | |
| for event in events: | |
| if event["status"] == "active": | |
| found = False | |
| for existing in users: | |
| if existing == event["user_id"]: | |
| found = True | |
| if not found: | |
| users.append(event["user_id"]) | |
| totals = [] | |
| for user in users: | |
| count = 0 | |
| for event in events: | |
| if event["status"] == "active" and event["user_id"] == user: | |
| count = count + 1 | |
| totals.append((user, count)) | |
| totals.sort(key=lambda item: (-item[1], item[0])) | |
| return totals | |
| """ | |
| ), | |
| reference_code=_code( | |
| ''' | |
| from collections import Counter | |
| def rank_active_users(events): | |
| """Return users ranked by number of active events.""" | |
| counts = Counter( | |
| event["user_id"] | |
| for event in events | |
| if event["status"] == "active" | |
| ) | |
| return sorted(counts.items(), key=lambda item: (-item[1], item[0])) | |
| ''' | |
| ), | |
| function_name="rank_active_users", | |
| visible_tests=[ | |
| "rank_active_users([{'user_id': 'b', 'status': 'active'}, {'user_id': 'a', 'status': 'active'}, {'user_id': 'b', 'status': 'inactive'}]) == [('a', 1), ('b', 1)]", | |
| "rank_active_users([{'user_id': 'u1', 'status': 'active'}, {'user_id': 'u1', 'status': 'active'}, {'user_id': 'u2', 'status': 'active'}]) == [('u1', 2), ('u2', 1)]", | |
| ], | |
| public_cases=[ | |
| CallCase( | |
| label="inactive events are ignored", | |
| args=([{"user_id": "b", "status": "active"}, {"user_id": "a", "status": "active"}, {"user_id": "b", "status": "inactive"}],), | |
| expected=[("a", 1), ("b", 1)], | |
| ), | |
| CallCase( | |
| label="counts repeated active users", | |
| args=([{"user_id": "u1", "status": "active"}, {"user_id": "u1", "status": "active"}, {"user_id": "u2", "status": "active"}],), | |
| expected=[("u1", 2), ("u2", 1)], | |
| ), | |
| ], | |
| hidden_cases=[ | |
| CallCase( | |
| label="stable alphabetical tie-break", | |
| args=([{"user_id": "u3", "status": "active"}, {"user_id": "u2", "status": "active"}, {"user_id": "u3", "status": "active"}, {"user_id": "u2", "status": "active"}],), | |
| expected=[("u2", 2), ("u3", 2)], | |
| ), | |
| CallCase(label="empty input", args=([],), expected=[]), | |
| CallCase( | |
| label="mixed active and inactive states", | |
| args=([{"user_id": "x", "status": "inactive"}, {"user_id": "x", "status": "active"}, {"user_id": "y", "status": "active"}, {"user_id": "x", "status": "active"}],), | |
| expected=[("x", 2), ("y", 1)], | |
| ), | |
| ], | |
| repo_summary=( | |
| "reports/activity_rankings.py feeds a nightly export that runs on a small CPU " | |
| "instance and has become too slow after customer growth." | |
| ), | |
| changed_files=["reports/activity_rankings.py"], | |
| available_files=["reports/activity_rankings.py", "tests/test_activity_rankings.py"], | |
| goal="Keep the output stable while improving runtime and code quality.", | |
| max_steps=10, | |
| benchmark_config={"user_pool": 240, "events_per_user": 36, "iterations": 8}, | |
| ), | |
| ] | |
| TASK_BY_ID = {task.task_id: task for task in TASKS} | |
| def list_tasks() -> List[ReviewTask]: | |
| """Return all supported tasks.""" | |
| return list(TASKS) | |
| def get_task(task_id: str) -> ReviewTask: | |
| """Fetch a task by identifier.""" | |
| try: | |
| return TASK_BY_ID[task_id] | |
| except KeyError as exc: | |
| raise ValueError(f"Unknown task_id: {task_id}") from exc | |
| def select_task(seed: Optional[int] = None, task_id: Optional[str] = None) -> ReviewTask: | |
| """Select a task deterministically by explicit id or seed.""" | |
| if task_id: | |
| return get_task(task_id) | |
| if seed is None: | |
| return TASKS[0] | |
| return TASKS[seed % len(TASKS)] | |