"""Static PR-review tasks and hidden grading rubrics.""" from __future__ import annotations from dataclasses import dataclass, field from typing import Dict, Iterable, List, Sequence try: from models import Category, Difficulty, Severity, TaskDescriptor, TaskSummary except ModuleNotFoundError: # pragma: no cover from ..models import Category, Difficulty, Severity, TaskDescriptor, TaskSummary @dataclass(frozen=True) class RubricIssue: """One hidden issue that can be matched by the deterministic grader.""" issue_id: str file_path: str line: int category: Category severity: Severity keywords: Sequence[str] min_keyword_hits: int weight: float @dataclass(frozen=True) class TaskSpec: """Complete task definition, including hidden rubric metadata.""" task_id: str difficulty: Difficulty title: str goal: str repo_summary: str visible_diff: str file_contents: Dict[str, str] changed_files: Sequence[str] rubric_issues: Sequence[RubricIssue] max_steps: int @property def available_files(self) -> List[str]: return list(self.file_contents.keys()) def to_descriptor(self) -> TaskDescriptor: return TaskDescriptor( task_id=self.task_id, difficulty=self.difficulty, title=self.title, goal=self.goal, repo_summary=self.repo_summary, changed_files=list(self.changed_files), available_files=self.available_files, max_steps=self.max_steps, ) def to_summary(self) -> TaskSummary: return TaskSummary( task_id=self.task_id, difficulty=self.difficulty, title=self.title, goal=self.goal, ) TASKS: List[TaskSpec] = [ TaskSpec( task_id="py-pr-review-easy", difficulty="easy", title="Retry Delay Regression", goal=( "Review the pull request and identify the real bug introduced in the retry " "delay helper before it ships." ), repo_summary=( "This service computes retry delays for background notification delivery. " "The change is intended to relax validation for legacy callers." ), visible_diff="\n".join( [ "diff --git a/src/notifications/retry.py b/src/notifications/retry.py", "@@", "- if base_delay <= 0:", "+ if base_delay < 0:", " return 0.0", ] ), file_contents={ "src/notifications/retry.py": "\n".join( [ "from __future__ import annotations", "", "def calculate_retry_delay(attempt: int, base_delay: float = 2.0) -> float:", ' """Return the retry delay in seconds."""', " if attempt < 0:", ' raise ValueError(\"attempt must be >= 0\")', " if base_delay < 0:", " return 0.0", " return attempt / base_delay", ] ) }, changed_files=("src/notifications/retry.py",), rubric_issues=( RubricIssue( issue_id="zero-base-delay-divides", file_path="src/notifications/retry.py", line=7, category="bug", severity="warning", keywords=("zero", "division", "base_delay"), min_keyword_hits=2, weight=1.0, ), ), max_steps=4, ), TaskSpec( task_id="py-pr-review-medium", difficulty="medium", title="Coupon Billing Rollout", goal=( "Review the billing change and identify both the production regression and " "the missing coverage that would have caught it." ), repo_summary=( "The billing service is adding coupon support for one-off invoices. The PR " "touches both the service code and its unit tests." ), visible_diff="\n".join( [ "diff --git a/app/billing/invoice_service.py b/app/billing/invoice_service.py", "@@", " def charge_invoice(order: dict, gateway: Gateway) -> str:", "- return gateway.charge(order[\"customer_id\"], order[\"amount_cents\"])", "+ total = order[\"amount_cents\"]", "+ coupon = order.get(\"coupon_code\")", "+ if coupon:", "+ discount = gateway.lookup_discount(coupon)", "+ total = max(total - discount, 0)", "+ return gateway.charge(order[\"customer_id\"], order[\"amount_cents\"])", "", "diff --git a/tests/test_invoice_service.py b/tests/test_invoice_service.py", "@@", " class FakeGateway:", "+ def lookup_discount(self, coupon: str) -> int:", "+ return 250", ] ), file_contents={ "app/billing/invoice_service.py": "\n".join( [ "from gateway import Gateway", "", "def charge_invoice(order: dict, gateway: Gateway) -> str:", ' total = order["amount_cents"]', ' coupon = order.get("coupon_code")', " if coupon:", " discount = gateway.lookup_discount(coupon)", " total = max(total - discount, 0)", ' return gateway.charge(order["customer_id"], order["amount_cents"])', ] ), "tests/test_invoice_service.py": "\n".join( [ "from app.billing.invoice_service import charge_invoice", "", "class FakeGateway:", " def lookup_discount(self, coupon: str) -> int:", " return 250", "", " def charge(self, customer_id: str, amount_cents: int) -> str:", " self.last_charge = (customer_id, amount_cents)", ' return "charge_123"', "", "def test_charge_invoice_without_coupon():", " gateway = FakeGateway()", ' charge_invoice({"customer_id": "cus_1", "amount_cents": 1000}, gateway)', ' assert gateway.last_charge == ("cus_1", 1000)', ] ), }, changed_files=("app/billing/invoice_service.py", "tests/test_invoice_service.py"), rubric_issues=( RubricIssue( issue_id="discount-total-unused", file_path="app/billing/invoice_service.py", line=8, category="bug", severity="warning", keywords=("discount", "total", "charge", "amount"), min_keyword_hits=2, weight=0.6, ), RubricIssue( issue_id="missing-coupon-test", file_path="tests/test_invoice_service.py", line=11, category="testing", severity="warning", keywords=("missing", "test", "coupon", "discount"), min_keyword_hits=2, weight=0.4, ), ), max_steps=5, ), TaskSpec( task_id="py-pr-review-hard", difficulty="hard", title="Async Job Runner Deduplication", goal=( "Review the async job-runner PR and find the subtle concurrency issues " "without inventing extra problems." ), repo_summary=( "A shared webhook backfill service is deduplicating in-flight work with an " "async task cache and writing the latest result for operators to inspect." ), visible_diff="\n".join( [ "diff --git a/app/jobs/runner.py b/app/jobs/runner.py", "@@", " async def run_job(job_id: str, payload: dict, worker) -> str:", " if job_id in ACTIVE_RUNS:", " return await ACTIVE_RUNS[job_id]", "+ lock = asyncio.Lock()", "+ async with lock:", "+ task = asyncio.create_task(worker.run(payload))", "+ ACTIVE_RUNS[job_id] = task", " try:", " result = await task", " finally:", " ACTIVE_RUNS.pop(job_id, None)", "+ Path(\"latest-result.json\").write_text(result)", " return result", ] ), file_contents={ "app/jobs/runner.py": "\n".join( [ "import asyncio", "from pathlib import Path", "", "ACTIVE_RUNS: dict[str, asyncio.Task[str]] = {}", "", "async def run_job(job_id: str, payload: dict, worker) -> str:", " if job_id in ACTIVE_RUNS:", " return await ACTIVE_RUNS[job_id]", "", " lock = asyncio.Lock()", " async with lock:", " task = asyncio.create_task(worker.run(payload))", " ACTIVE_RUNS[job_id] = task", " try:", " result = await task", " finally:", " ACTIVE_RUNS.pop(job_id, None)", "", ' Path("latest-result.json").write_text(result)', " return result", ] ), "tests/test_runner.py": "\n".join( [ "import pytest", "", "from app.jobs.runner import run_job", "", "class FakeWorker:", " async def run(self, payload: dict) -> str:", ' return payload["job_id"]', "", "@pytest.mark.asyncio", "async def test_run_job_returns_worker_result():", " worker = FakeWorker()", ' result = await run_job("job-1", {"job_id": "job-1"}, worker)', ' assert result == "job-1"', ] ), }, changed_files=("app/jobs/runner.py", "tests/test_runner.py"), rubric_issues=( RubricIssue( issue_id="per-call-lock-race", file_path="app/jobs/runner.py", line=9, category="bug", severity="warning", keywords=("lock", "race", "concurrent", "duplicate"), min_keyword_hits=2, weight=0.55, ), RubricIssue( issue_id="shared-output-file-race", file_path="app/jobs/runner.py", line=18, category="maintainability", severity="warning", keywords=("latest", "result", "file", "concurrent", "overwrite"), min_keyword_hits=2, weight=0.45, ), ), max_steps=6, ), ] TASKS_BY_ID: Dict[str, TaskSpec] = {task.task_id: task for task in TASKS} def list_task_descriptors() -> List[TaskDescriptor]: """Return public descriptors for all tasks.""" return [task.to_descriptor() for task in TASKS] def list_task_summaries() -> List[TaskSummary]: """Return task summaries for lightweight route responses.""" return [task.to_summary() for task in TASKS] def get_task(task_id: str) -> TaskSpec: """Return a task by id.""" try: return TASKS_BY_ID[task_id] except KeyError as exc: # pragma: no cover raise ValueError(f"Unknown task_id: {task_id}") from exc def task_ids() -> Iterable[str]: """Return task ids in benchmark order.""" return [task.task_id for task in TASKS]