"""Deterministic task graders for the code-review benchmark.""" from __future__ import annotations from dataclasses import dataclass from typing import Dict, Iterable, List, Optional try: from ..models import ( ActionType, CodeReviewSnippet, GoldIssue, IssueType, ReviewComment, Severity, ) except ImportError: from models import ( # type: ignore ActionType, CodeReviewSnippet, GoldIssue, IssueType, ReviewComment, Severity, ) def _normalize_text(value: Optional[str]) -> str: return " ".join((value or "").lower().split()) def _keyword_match(text: str, keywords: Iterable[str]) -> bool: normalized = _normalize_text(text) return any(_normalize_text(keyword) in normalized for keyword in keywords if keyword) def _keyword_match_score(text: str, keywords: Iterable[str]) -> float: """ FIX: Returns partial score 0.0-1.0 based on how many keywords matched. Old code: binary match (any keyword → True/False). New code: count matches → partial credit even with 1 keyword hit. """ normalized = _normalize_text(text) kw_list = [k for k in keywords if k] if not kw_list: return 0.0 hits = sum(1 for kw in kw_list if _normalize_text(kw) in normalized) return hits / len(kw_list) def _terminal_action(history: List[ReviewComment]) -> Optional[ActionType]: for item in reversed(history): if item.action_type in {ActionType.APPROVE, ActionType.REQUEST_CHANGES}: return item.action_type return None @dataclass class GradeResult: score: float precision: float recall: float f1: float true_positives: int false_positives: int missed_issues: int required_found: int required_total: int bonus_found: int matched_issue_ids: List[str] breakdown: Dict[str, float] def grade_review( task_id: str, snippet: CodeReviewSnippet, history: List[ReviewComment], duplicate_comments: int, ) -> GradeResult: """Grade a completed or in-progress review deterministically.""" comments = [item for item in history if item.action_type == ActionType.ADD_COMMENT] if task_id == "task_easy": return _grade_easy(snippet, comments, history, duplicate_comments) if task_id == "task_medium": return _grade_medium(snippet, comments, history, duplicate_comments) return _grade_hard(snippet, comments, history, duplicate_comments) def _grade_easy( snippet: CodeReviewSnippet, comments: List[ReviewComment], history: List[ReviewComment], duplicate_comments: int, ) -> GradeResult: required_issues = [issue for issue in snippet.gold_issues if issue.required] required_denominator = max(len(required_issues), 1) # FIX: Start credit at 0 for every issue best_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} matched_ids: set[str] = set() false_positives = 0 for comment in comments: positive = False comment_text = f"{comment.comment or ''} {comment.suggestion or ''}" for issue in snippet.gold_issues: if comment.line_number is None: continue distance = abs(comment.line_number - issue.line) credit = 0.0 if issue.required: # FIX: More generous distance tolerance + keyword fallback if comment.issue_type == issue.issue_type: if distance <= 1: credit = 0.30 / required_denominator elif distance <= 3: credit = 0.15 / required_denominator # FIX: was 0.10 elif distance <= 5: credit = 0.08 / required_denominator # FIX: new tier elif _keyword_match(comment_text, getattr(issue, "explanation_keywords", [])): # FIX: Wrong issue_type but comment mentions the bug → partial credit if distance <= 3: credit = 0.08 / required_denominator else: # Bonus issues if distance <= 3: if comment.issue_type == issue.issue_type: credit = 0.05 elif _keyword_match(comment_text, getattr(issue, "explanation_keywords", [])): credit = 0.02 # FIX: small credit for keyword match if credit > 0: positive = True best_credit[issue.issue_id] = max(best_credit[issue.issue_id], credit) matched_ids.add(issue.issue_id) if not positive: false_positives += 1 required_score = sum(best_credit[issue.issue_id] for issue in required_issues) bonus_score = min( sum( best_credit[issue.issue_id] for issue in snippet.gold_issues if not issue.required ), 0.15, ) # FIX: Reduced false positive penalty — was 0.05 per FP, now 0.03 # Prevents over-penalising agents that flag too many issues false_positive_penalty = min(false_positives * 0.03, 0.15) final_action = _terminal_action(history) action_adjustment = 0.0 if snippet.must_reject and final_action == ActionType.REQUEST_CHANGES: action_adjustment = 0.10 elif snippet.must_reject and final_action == ActionType.APPROVE: action_adjustment = -0.10 raw_score = required_score + bonus_score - false_positive_penalty + action_adjustment required_found = sum(1 for issue in required_issues if best_credit[issue.issue_id] > 0) bonus_found = sum( 1 for issue in snippet.gold_issues if not issue.required and best_credit[issue.issue_id] > 0 ) return _build_result( score=raw_score, matched_issue_ids=sorted(matched_ids), false_positives=false_positives, required_found=required_found, required_total=len(required_issues), bonus_found=bonus_found, duplicate_comments=duplicate_comments, breakdown={ "required_score": required_score, "bonus_score": bonus_score, "false_positive_penalty": -false_positive_penalty, "action_adjustment": action_adjustment, }, ) def _grade_medium( snippet: CodeReviewSnippet, comments: List[ReviewComment], history: List[ReviewComment], duplicate_comments: int, ) -> GradeResult: required_issues = [issue for issue in snippet.gold_issues if issue.required] required_denominator = max(len(required_issues), 1) best_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} explanation_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} matched_ids: set[str] = set() false_positives = 0 for comment in comments: positive = False comment_text = f"{comment.comment or ''} {comment.suggestion or ''}" for issue in snippet.gold_issues: if comment.line_number is None: continue distance = abs(comment.line_number - issue.line) # FIX: Relaxed from distance <= 5 to distance <= 8 if distance > 8: continue credit = 0.0 keyword_match = _keyword_match(comment_text, issue.explanation_keywords) # FIX: Old code required BOTH issue_type match AND exact/near line. # New code: issue_type OR keyword match gives credit, distance tiers. if comment.issue_type == IssueType.LOGIC and issue.issue_type == IssueType.LOGIC: if distance <= 1: # FIX: was "distance == 0" — now ±1 for full credit credit = 0.25 / required_denominator if issue.required else 0.05 elif distance <= 3: credit = 0.15 / required_denominator if issue.required else 0.03 # FIX: was 0.10 elif distance <= 8: credit = 0.08 / required_denominator if issue.required else 0.02 # FIX: new tier elif keyword_match: # FIX: keyword match alone is worth more — was 0.05, now 0.10 if distance <= 3: credit = 0.10 / required_denominator if issue.required else 0.03 elif distance <= 8: credit = 0.05 / required_denominator if issue.required else 0.01 if credit > 0: positive = True best_credit[issue.issue_id] = max(best_credit[issue.issue_id], credit) matched_ids.add(issue.issue_id) # FIX: Use partial keyword score instead of binary kw_score = _keyword_match_score(comment_text, issue.explanation_keywords) if kw_score > 0: explanation_credit[issue.issue_id] = max( explanation_credit[issue.issue_id], # FIX: Scale explanation bonus by keyword match quality (0.05 * kw_score) / required_denominator if issue.required else (0.02 * kw_score), ) if not positive: false_positives += 1 base_score = sum(best_credit.values()) + sum(explanation_credit.values()) # FIX: Reduced FP penalty — was 0.08 per FP, now 0.05 false_positive_penalty = min(false_positives * 0.05, 0.25) final_action = _terminal_action(history) action_adjustment = 0.0 if snippet.must_reject and final_action == ActionType.REQUEST_CHANGES: action_adjustment = 0.10 elif snippet.must_reject and final_action == ActionType.APPROVE: action_adjustment = -0.15 required_found = sum(1 for issue in required_issues if best_credit[issue.issue_id] > 0) bonus_found = sum( 1 for issue in snippet.gold_issues if not issue.required and best_credit[issue.issue_id] > 0 ) return _build_result( score=base_score - false_positive_penalty + action_adjustment, matched_issue_ids=sorted(matched_ids), false_positives=false_positives, required_found=required_found, required_total=len(required_issues), bonus_found=bonus_found, duplicate_comments=duplicate_comments, breakdown={ "logic_score": sum(best_credit.values()), "explanation_score": sum(explanation_credit.values()), "false_positive_penalty": -false_positive_penalty, "action_adjustment": action_adjustment, }, ) def _grade_hard( snippet: CodeReviewSnippet, comments: List[ReviewComment], history: List[ReviewComment], duplicate_comments: int, ) -> GradeResult: required_issues = [issue for issue in snippet.gold_issues if issue.required] required_denominator = max(len(required_issues), 1) best_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} owasp_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} fix_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} severity_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} matched_ids: set[str] = set() false_positives = 0 for comment in comments: positive = False comment_text = f"{comment.comment or ''} {comment.suggestion or ''}" for issue in snippet.gold_issues: # FIX: Was exact line match only (distance == 0). # Security vulns span multiple lines — now ±2 tolerance. if comment.line_number is None: continue distance = abs(comment.line_number - issue.line) if distance > 2: # FIX: was `!= issue.line` (zero tolerance) continue credit = 0.0 if comment.issue_type == IssueType.SECURITY and issue.issue_type == IssueType.SECURITY: if distance == 0: credit = 0.20 / required_denominator if issue.required else 0.05 else: # FIX: ±1-2 lines gets partial credit (was zero) credit = 0.12 / required_denominator if issue.required else 0.03 # FIX: Even if issue_type is wrong, keyword match on SECURITY issue → small credit elif _keyword_match(comment_text, getattr(issue, "owasp_keywords", []) + getattr(issue, "fix_keywords", [])): if distance <= 2: credit = 0.06 / required_denominator if issue.required else 0.02 if credit > 0: positive = True matched_ids.add(issue.issue_id) best_credit[issue.issue_id] = max(best_credit[issue.issue_id], credit) owasp_kw = list(getattr(issue, "owasp_keywords", [])) owasp_cat = [issue.owasp_category] if getattr(issue, "owasp_category", None) else [] if _keyword_match(comment_text, owasp_kw + owasp_cat): owasp_credit[issue.issue_id] = max( owasp_credit[issue.issue_id], 0.10 / required_denominator if issue.required else 0.02, ) fix_kw = list(getattr(issue, "fix_keywords", [])) if _keyword_match(comment_text, fix_kw): fix_credit[issue.issue_id] = max( fix_credit[issue.issue_id], 0.05 / required_denominator if issue.required else 0.02, ) if comment.severity in {Severity.HIGH, Severity.CRITICAL}: if comment.severity == issue.severity or ( issue.severity == Severity.HIGH and comment.severity == Severity.CRITICAL ): severity_credit[issue.issue_id] = max( severity_credit[issue.issue_id], 0.05 / required_denominator ) elif issue.severity == Severity.CRITICAL and comment.severity in { Severity.LOW, Severity.MEDIUM, }: # FIX: Only penalise if we actually matched (was applying even with no match) if best_credit[issue.issue_id] > 0: severity_credit[issue.issue_id] = min( severity_credit[issue.issue_id], -0.05 / required_denominator ) if not positive: false_positives += 1 # Missing critical penalty missing_critical_penalty = 0.0 for issue in required_issues: if issue.severity == Severity.CRITICAL and best_credit[issue.issue_id] == 0: missing_critical_penalty += 0.15 # FIX: Reduced FP penalty for hard task — was 0.10, now 0.07 # Hard tasks have many lines so innocent FPs should cost less false_positive_penalty = min(false_positives * 0.07, 0.35) final_action = _terminal_action(history) action_adjustment = 0.0 if snippet.must_reject and final_action == ActionType.REQUEST_CHANGES: action_adjustment = 0.10 elif snippet.must_reject and final_action == ActionType.APPROVE: action_adjustment = -0.20 required_found = sum(1 for issue in required_issues if best_credit[issue.issue_id] > 0) bonus_found = sum( 1 for issue in snippet.gold_issues if not issue.required and best_credit[issue.issue_id] > 0 ) return _build_result( score=( sum(best_credit.values()) + sum(owasp_credit.values()) + sum(fix_credit.values()) + sum(severity_credit.values()) - false_positive_penalty - missing_critical_penalty + action_adjustment ), matched_issue_ids=sorted(matched_ids), false_positives=false_positives, required_found=required_found, required_total=len(required_issues), bonus_found=bonus_found, duplicate_comments=duplicate_comments, breakdown={ "security_score": sum(best_credit.values()), "owasp_score": sum(owasp_credit.values()), "fix_score": sum(fix_credit.values()), "severity_score": sum(severity_credit.values()), "false_positive_penalty": -false_positive_penalty, "missing_critical_penalty": -missing_critical_penalty, "action_adjustment": action_adjustment, }, ) def _build_result( *, score: float, matched_issue_ids: List[str], false_positives: int, required_found: int, required_total: int, bonus_found: int, duplicate_comments: int, breakdown: Dict[str, float], ) -> GradeResult: clamped_score = max(0.0, min(score, 1.0)) true_positives = len(matched_issue_ids) missed_issues = max(required_total - required_found, 0) precision = true_positives / max(true_positives + false_positives, 1) recall = required_found / max(required_total, 1) f1 = 0.0 if precision + recall: f1 = 2 * precision * recall / (precision + recall) breakdown = { **breakdown, "duplicate_comments": float(duplicate_comments), "precision": precision, "recall": recall, "f1": f1, "score": clamped_score, } return GradeResult( score=clamped_score, precision=precision, recall=recall, f1=f1, true_positives=true_positives, false_positives=false_positives, missed_issues=missed_issues, required_found=required_found, required_total=required_total, bonus_found=bonus_found, matched_issue_ids=matched_issue_ids, breakdown=breakdown, )