Spaces:
Sleeping
Sleeping
| """Deterministic task graders for the code-review benchmark.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import Dict, Iterable, List, Optional | |
| try: | |
| from ..models import ( | |
| ActionType, | |
| CodeReviewSnippet, | |
| GoldIssue, | |
| IssueType, | |
| ReviewComment, | |
| Severity, | |
| ) | |
| except ImportError: | |
| from models import ( # type: ignore | |
| ActionType, | |
| CodeReviewSnippet, | |
| GoldIssue, | |
| IssueType, | |
| ReviewComment, | |
| Severity, | |
| ) | |
| def _normalize_text(value: Optional[str]) -> str: | |
| return " ".join((value or "").lower().split()) | |
| def _keyword_match(text: str, keywords: Iterable[str]) -> bool: | |
| normalized = _normalize_text(text) | |
| return any(_normalize_text(keyword) in normalized for keyword in keywords if keyword) | |
| def _keyword_match_score(text: str, keywords: Iterable[str]) -> float: | |
| """ | |
| FIX: Returns partial score 0.0-1.0 based on how many keywords matched. | |
| Old code: binary match (any keyword → True/False). | |
| New code: count matches → partial credit even with 1 keyword hit. | |
| """ | |
| normalized = _normalize_text(text) | |
| kw_list = [k for k in keywords if k] | |
| if not kw_list: | |
| return 0.0 | |
| hits = sum(1 for kw in kw_list if _normalize_text(kw) in normalized) | |
| return hits / len(kw_list) | |
| def _terminal_action(history: List[ReviewComment]) -> Optional[ActionType]: | |
| for item in reversed(history): | |
| if item.action_type in {ActionType.APPROVE, ActionType.REQUEST_CHANGES}: | |
| return item.action_type | |
| return None | |
| class GradeResult: | |
| score: float | |
| precision: float | |
| recall: float | |
| f1: float | |
| true_positives: int | |
| false_positives: int | |
| missed_issues: int | |
| required_found: int | |
| required_total: int | |
| bonus_found: int | |
| matched_issue_ids: List[str] | |
| breakdown: Dict[str, float] | |
| def grade_review( | |
| task_id: str, | |
| snippet: CodeReviewSnippet, | |
| history: List[ReviewComment], | |
| duplicate_comments: int, | |
| ) -> GradeResult: | |
| """Grade a completed or in-progress review deterministically.""" | |
| comments = [item for item in history if item.action_type == ActionType.ADD_COMMENT] | |
| if task_id == "task_easy": | |
| return _grade_easy(snippet, comments, history, duplicate_comments) | |
| if task_id == "task_medium": | |
| return _grade_medium(snippet, comments, history, duplicate_comments) | |
| return _grade_hard(snippet, comments, history, duplicate_comments) | |
| def _grade_easy( | |
| snippet: CodeReviewSnippet, | |
| comments: List[ReviewComment], | |
| history: List[ReviewComment], | |
| duplicate_comments: int, | |
| ) -> GradeResult: | |
| required_issues = [issue for issue in snippet.gold_issues if issue.required] | |
| required_denominator = max(len(required_issues), 1) | |
| # FIX: Start credit at 0 for every issue | |
| best_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} | |
| matched_ids: set[str] = set() | |
| false_positives = 0 | |
| for comment in comments: | |
| positive = False | |
| comment_text = f"{comment.comment or ''} {comment.suggestion or ''}" | |
| for issue in snippet.gold_issues: | |
| if comment.line_number is None: | |
| continue | |
| distance = abs(comment.line_number - issue.line) | |
| credit = 0.0 | |
| if issue.required: | |
| # FIX: More generous distance tolerance + keyword fallback | |
| if comment.issue_type == issue.issue_type: | |
| if distance <= 1: | |
| credit = 0.30 / required_denominator | |
| elif distance <= 3: | |
| credit = 0.15 / required_denominator # FIX: was 0.10 | |
| elif distance <= 5: | |
| credit = 0.08 / required_denominator # FIX: new tier | |
| elif _keyword_match(comment_text, getattr(issue, "explanation_keywords", [])): | |
| # FIX: Wrong issue_type but comment mentions the bug → partial credit | |
| if distance <= 3: | |
| credit = 0.08 / required_denominator | |
| else: | |
| # Bonus issues | |
| if distance <= 3: | |
| if comment.issue_type == issue.issue_type: | |
| credit = 0.05 | |
| elif _keyword_match(comment_text, getattr(issue, "explanation_keywords", [])): | |
| credit = 0.02 # FIX: small credit for keyword match | |
| if credit > 0: | |
| positive = True | |
| best_credit[issue.issue_id] = max(best_credit[issue.issue_id], credit) | |
| matched_ids.add(issue.issue_id) | |
| if not positive: | |
| false_positives += 1 | |
| required_score = sum(best_credit[issue.issue_id] for issue in required_issues) | |
| bonus_score = min( | |
| sum( | |
| best_credit[issue.issue_id] | |
| for issue in snippet.gold_issues | |
| if not issue.required | |
| ), | |
| 0.15, | |
| ) | |
| # FIX: Reduced false positive penalty — was 0.05 per FP, now 0.03 | |
| # Prevents over-penalising agents that flag too many issues | |
| false_positive_penalty = min(false_positives * 0.03, 0.15) | |
| final_action = _terminal_action(history) | |
| action_adjustment = 0.0 | |
| if snippet.must_reject and final_action == ActionType.REQUEST_CHANGES: | |
| action_adjustment = 0.10 | |
| elif snippet.must_reject and final_action == ActionType.APPROVE: | |
| action_adjustment = -0.10 | |
| raw_score = required_score + bonus_score - false_positive_penalty + action_adjustment | |
| required_found = sum(1 for issue in required_issues if best_credit[issue.issue_id] > 0) | |
| bonus_found = sum( | |
| 1 | |
| for issue in snippet.gold_issues | |
| if not issue.required and best_credit[issue.issue_id] > 0 | |
| ) | |
| return _build_result( | |
| score=raw_score, | |
| matched_issue_ids=sorted(matched_ids), | |
| false_positives=false_positives, | |
| required_found=required_found, | |
| required_total=len(required_issues), | |
| bonus_found=bonus_found, | |
| duplicate_comments=duplicate_comments, | |
| breakdown={ | |
| "required_score": required_score, | |
| "bonus_score": bonus_score, | |
| "false_positive_penalty": -false_positive_penalty, | |
| "action_adjustment": action_adjustment, | |
| }, | |
| ) | |
| def _grade_medium( | |
| snippet: CodeReviewSnippet, | |
| comments: List[ReviewComment], | |
| history: List[ReviewComment], | |
| duplicate_comments: int, | |
| ) -> GradeResult: | |
| required_issues = [issue for issue in snippet.gold_issues if issue.required] | |
| required_denominator = max(len(required_issues), 1) | |
| best_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} | |
| explanation_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} | |
| matched_ids: set[str] = set() | |
| false_positives = 0 | |
| for comment in comments: | |
| positive = False | |
| comment_text = f"{comment.comment or ''} {comment.suggestion or ''}" | |
| for issue in snippet.gold_issues: | |
| if comment.line_number is None: | |
| continue | |
| distance = abs(comment.line_number - issue.line) | |
| # FIX: Relaxed from distance <= 5 to distance <= 8 | |
| if distance > 8: | |
| continue | |
| credit = 0.0 | |
| keyword_match = _keyword_match(comment_text, issue.explanation_keywords) | |
| # FIX: Old code required BOTH issue_type match AND exact/near line. | |
| # New code: issue_type OR keyword match gives credit, distance tiers. | |
| if comment.issue_type == IssueType.LOGIC and issue.issue_type == IssueType.LOGIC: | |
| if distance <= 1: | |
| # FIX: was "distance == 0" — now ±1 for full credit | |
| credit = 0.25 / required_denominator if issue.required else 0.05 | |
| elif distance <= 3: | |
| credit = 0.15 / required_denominator if issue.required else 0.03 # FIX: was 0.10 | |
| elif distance <= 8: | |
| credit = 0.08 / required_denominator if issue.required else 0.02 # FIX: new tier | |
| elif keyword_match: | |
| # FIX: keyword match alone is worth more — was 0.05, now 0.10 | |
| if distance <= 3: | |
| credit = 0.10 / required_denominator if issue.required else 0.03 | |
| elif distance <= 8: | |
| credit = 0.05 / required_denominator if issue.required else 0.01 | |
| if credit > 0: | |
| positive = True | |
| best_credit[issue.issue_id] = max(best_credit[issue.issue_id], credit) | |
| matched_ids.add(issue.issue_id) | |
| # FIX: Use partial keyword score instead of binary | |
| kw_score = _keyword_match_score(comment_text, issue.explanation_keywords) | |
| if kw_score > 0: | |
| explanation_credit[issue.issue_id] = max( | |
| explanation_credit[issue.issue_id], | |
| # FIX: Scale explanation bonus by keyword match quality | |
| (0.05 * kw_score) / required_denominator if issue.required else (0.02 * kw_score), | |
| ) | |
| if not positive: | |
| false_positives += 1 | |
| base_score = sum(best_credit.values()) + sum(explanation_credit.values()) | |
| # FIX: Reduced FP penalty — was 0.08 per FP, now 0.05 | |
| false_positive_penalty = min(false_positives * 0.05, 0.25) | |
| final_action = _terminal_action(history) | |
| action_adjustment = 0.0 | |
| if snippet.must_reject and final_action == ActionType.REQUEST_CHANGES: | |
| action_adjustment = 0.10 | |
| elif snippet.must_reject and final_action == ActionType.APPROVE: | |
| action_adjustment = -0.15 | |
| required_found = sum(1 for issue in required_issues if best_credit[issue.issue_id] > 0) | |
| bonus_found = sum( | |
| 1 | |
| for issue in snippet.gold_issues | |
| if not issue.required and best_credit[issue.issue_id] > 0 | |
| ) | |
| return _build_result( | |
| score=base_score - false_positive_penalty + action_adjustment, | |
| matched_issue_ids=sorted(matched_ids), | |
| false_positives=false_positives, | |
| required_found=required_found, | |
| required_total=len(required_issues), | |
| bonus_found=bonus_found, | |
| duplicate_comments=duplicate_comments, | |
| breakdown={ | |
| "logic_score": sum(best_credit.values()), | |
| "explanation_score": sum(explanation_credit.values()), | |
| "false_positive_penalty": -false_positive_penalty, | |
| "action_adjustment": action_adjustment, | |
| }, | |
| ) | |
| def _grade_hard( | |
| snippet: CodeReviewSnippet, | |
| comments: List[ReviewComment], | |
| history: List[ReviewComment], | |
| duplicate_comments: int, | |
| ) -> GradeResult: | |
| required_issues = [issue for issue in snippet.gold_issues if issue.required] | |
| required_denominator = max(len(required_issues), 1) | |
| best_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} | |
| owasp_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} | |
| fix_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} | |
| severity_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues} | |
| matched_ids: set[str] = set() | |
| false_positives = 0 | |
| for comment in comments: | |
| positive = False | |
| comment_text = f"{comment.comment or ''} {comment.suggestion or ''}" | |
| for issue in snippet.gold_issues: | |
| # FIX: Was exact line match only (distance == 0). | |
| # Security vulns span multiple lines — now ±2 tolerance. | |
| if comment.line_number is None: | |
| continue | |
| distance = abs(comment.line_number - issue.line) | |
| if distance > 2: # FIX: was `!= issue.line` (zero tolerance) | |
| continue | |
| credit = 0.0 | |
| if comment.issue_type == IssueType.SECURITY and issue.issue_type == IssueType.SECURITY: | |
| if distance == 0: | |
| credit = 0.20 / required_denominator if issue.required else 0.05 | |
| else: | |
| # FIX: ±1-2 lines gets partial credit (was zero) | |
| credit = 0.12 / required_denominator if issue.required else 0.03 | |
| # FIX: Even if issue_type is wrong, keyword match on SECURITY issue → small credit | |
| elif _keyword_match(comment_text, getattr(issue, "owasp_keywords", []) + getattr(issue, "fix_keywords", [])): | |
| if distance <= 2: | |
| credit = 0.06 / required_denominator if issue.required else 0.02 | |
| if credit > 0: | |
| positive = True | |
| matched_ids.add(issue.issue_id) | |
| best_credit[issue.issue_id] = max(best_credit[issue.issue_id], credit) | |
| owasp_kw = list(getattr(issue, "owasp_keywords", [])) | |
| owasp_cat = [issue.owasp_category] if getattr(issue, "owasp_category", None) else [] | |
| if _keyword_match(comment_text, owasp_kw + owasp_cat): | |
| owasp_credit[issue.issue_id] = max( | |
| owasp_credit[issue.issue_id], | |
| 0.10 / required_denominator if issue.required else 0.02, | |
| ) | |
| fix_kw = list(getattr(issue, "fix_keywords", [])) | |
| if _keyword_match(comment_text, fix_kw): | |
| fix_credit[issue.issue_id] = max( | |
| fix_credit[issue.issue_id], | |
| 0.05 / required_denominator if issue.required else 0.02, | |
| ) | |
| if comment.severity in {Severity.HIGH, Severity.CRITICAL}: | |
| if comment.severity == issue.severity or ( | |
| issue.severity == Severity.HIGH and comment.severity == Severity.CRITICAL | |
| ): | |
| severity_credit[issue.issue_id] = max( | |
| severity_credit[issue.issue_id], 0.05 / required_denominator | |
| ) | |
| elif issue.severity == Severity.CRITICAL and comment.severity in { | |
| Severity.LOW, | |
| Severity.MEDIUM, | |
| }: | |
| # FIX: Only penalise if we actually matched (was applying even with no match) | |
| if best_credit[issue.issue_id] > 0: | |
| severity_credit[issue.issue_id] = min( | |
| severity_credit[issue.issue_id], -0.05 / required_denominator | |
| ) | |
| if not positive: | |
| false_positives += 1 | |
| # Missing critical penalty | |
| missing_critical_penalty = 0.0 | |
| for issue in required_issues: | |
| if issue.severity == Severity.CRITICAL and best_credit[issue.issue_id] == 0: | |
| missing_critical_penalty += 0.15 | |
| # FIX: Reduced FP penalty for hard task — was 0.10, now 0.07 | |
| # Hard tasks have many lines so innocent FPs should cost less | |
| false_positive_penalty = min(false_positives * 0.07, 0.35) | |
| final_action = _terminal_action(history) | |
| action_adjustment = 0.0 | |
| if snippet.must_reject and final_action == ActionType.REQUEST_CHANGES: | |
| action_adjustment = 0.10 | |
| elif snippet.must_reject and final_action == ActionType.APPROVE: | |
| action_adjustment = -0.20 | |
| required_found = sum(1 for issue in required_issues if best_credit[issue.issue_id] > 0) | |
| bonus_found = sum( | |
| 1 | |
| for issue in snippet.gold_issues | |
| if not issue.required and best_credit[issue.issue_id] > 0 | |
| ) | |
| return _build_result( | |
| score=( | |
| sum(best_credit.values()) | |
| + sum(owasp_credit.values()) | |
| + sum(fix_credit.values()) | |
| + sum(severity_credit.values()) | |
| - false_positive_penalty | |
| - missing_critical_penalty | |
| + action_adjustment | |
| ), | |
| matched_issue_ids=sorted(matched_ids), | |
| false_positives=false_positives, | |
| required_found=required_found, | |
| required_total=len(required_issues), | |
| bonus_found=bonus_found, | |
| duplicate_comments=duplicate_comments, | |
| breakdown={ | |
| "security_score": sum(best_credit.values()), | |
| "owasp_score": sum(owasp_credit.values()), | |
| "fix_score": sum(fix_credit.values()), | |
| "severity_score": sum(severity_credit.values()), | |
| "false_positive_penalty": -false_positive_penalty, | |
| "missing_critical_penalty": -missing_critical_penalty, | |
| "action_adjustment": action_adjustment, | |
| }, | |
| ) | |
| def _build_result( | |
| *, | |
| score: float, | |
| matched_issue_ids: List[str], | |
| false_positives: int, | |
| required_found: int, | |
| required_total: int, | |
| bonus_found: int, | |
| duplicate_comments: int, | |
| breakdown: Dict[str, float], | |
| ) -> GradeResult: | |
| clamped_score = max(0.0, min(score, 1.0)) | |
| true_positives = len(matched_issue_ids) | |
| missed_issues = max(required_total - required_found, 0) | |
| precision = true_positives / max(true_positives + false_positives, 1) | |
| recall = required_found / max(required_total, 1) | |
| f1 = 0.0 | |
| if precision + recall: | |
| f1 = 2 * precision * recall / (precision + recall) | |
| breakdown = { | |
| **breakdown, | |
| "duplicate_comments": float(duplicate_comments), | |
| "precision": precision, | |
| "recall": recall, | |
| "f1": f1, | |
| "score": clamped_score, | |
| } | |
| return GradeResult( | |
| score=clamped_score, | |
| precision=precision, | |
| recall=recall, | |
| f1=f1, | |
| true_positives=true_positives, | |
| false_positives=false_positives, | |
| missed_issues=missed_issues, | |
| required_found=required_found, | |
| required_total=required_total, | |
| bonus_found=bonus_found, | |
| matched_issue_ids=matched_issue_ids, | |
| breakdown=breakdown, | |
| ) |