python_env / server /grading.py
darshanajudiya7's picture
Upload folder using huggingface_hub
d25ab77 verified
"""Deterministic task graders for the code-review benchmark."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional
try:
from ..models import (
ActionType,
CodeReviewSnippet,
GoldIssue,
IssueType,
ReviewComment,
Severity,
)
except ImportError:
from models import ( # type: ignore
ActionType,
CodeReviewSnippet,
GoldIssue,
IssueType,
ReviewComment,
Severity,
)
def _normalize_text(value: Optional[str]) -> str:
return " ".join((value or "").lower().split())
def _keyword_match(text: str, keywords: Iterable[str]) -> bool:
normalized = _normalize_text(text)
return any(_normalize_text(keyword) in normalized for keyword in keywords if keyword)
def _keyword_match_score(text: str, keywords: Iterable[str]) -> float:
"""
FIX: Returns partial score 0.0-1.0 based on how many keywords matched.
Old code: binary match (any keyword → True/False).
New code: count matches → partial credit even with 1 keyword hit.
"""
normalized = _normalize_text(text)
kw_list = [k for k in keywords if k]
if not kw_list:
return 0.0
hits = sum(1 for kw in kw_list if _normalize_text(kw) in normalized)
return hits / len(kw_list)
def _terminal_action(history: List[ReviewComment]) -> Optional[ActionType]:
for item in reversed(history):
if item.action_type in {ActionType.APPROVE, ActionType.REQUEST_CHANGES}:
return item.action_type
return None
@dataclass
class GradeResult:
score: float
precision: float
recall: float
f1: float
true_positives: int
false_positives: int
missed_issues: int
required_found: int
required_total: int
bonus_found: int
matched_issue_ids: List[str]
breakdown: Dict[str, float]
def grade_review(
task_id: str,
snippet: CodeReviewSnippet,
history: List[ReviewComment],
duplicate_comments: int,
) -> GradeResult:
"""Grade a completed or in-progress review deterministically."""
comments = [item for item in history if item.action_type == ActionType.ADD_COMMENT]
if task_id == "task_easy":
return _grade_easy(snippet, comments, history, duplicate_comments)
if task_id == "task_medium":
return _grade_medium(snippet, comments, history, duplicate_comments)
return _grade_hard(snippet, comments, history, duplicate_comments)
def _grade_easy(
snippet: CodeReviewSnippet,
comments: List[ReviewComment],
history: List[ReviewComment],
duplicate_comments: int,
) -> GradeResult:
required_issues = [issue for issue in snippet.gold_issues if issue.required]
required_denominator = max(len(required_issues), 1)
# FIX: Start credit at 0 for every issue
best_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues}
matched_ids: set[str] = set()
false_positives = 0
for comment in comments:
positive = False
comment_text = f"{comment.comment or ''} {comment.suggestion or ''}"
for issue in snippet.gold_issues:
if comment.line_number is None:
continue
distance = abs(comment.line_number - issue.line)
credit = 0.0
if issue.required:
# FIX: More generous distance tolerance + keyword fallback
if comment.issue_type == issue.issue_type:
if distance <= 1:
credit = 0.30 / required_denominator
elif distance <= 3:
credit = 0.15 / required_denominator # FIX: was 0.10
elif distance <= 5:
credit = 0.08 / required_denominator # FIX: new tier
elif _keyword_match(comment_text, getattr(issue, "explanation_keywords", [])):
# FIX: Wrong issue_type but comment mentions the bug → partial credit
if distance <= 3:
credit = 0.08 / required_denominator
else:
# Bonus issues
if distance <= 3:
if comment.issue_type == issue.issue_type:
credit = 0.05
elif _keyword_match(comment_text, getattr(issue, "explanation_keywords", [])):
credit = 0.02 # FIX: small credit for keyword match
if credit > 0:
positive = True
best_credit[issue.issue_id] = max(best_credit[issue.issue_id], credit)
matched_ids.add(issue.issue_id)
if not positive:
false_positives += 1
required_score = sum(best_credit[issue.issue_id] for issue in required_issues)
bonus_score = min(
sum(
best_credit[issue.issue_id]
for issue in snippet.gold_issues
if not issue.required
),
0.15,
)
# FIX: Reduced false positive penalty — was 0.05 per FP, now 0.03
# Prevents over-penalising agents that flag too many issues
false_positive_penalty = min(false_positives * 0.03, 0.15)
final_action = _terminal_action(history)
action_adjustment = 0.0
if snippet.must_reject and final_action == ActionType.REQUEST_CHANGES:
action_adjustment = 0.10
elif snippet.must_reject and final_action == ActionType.APPROVE:
action_adjustment = -0.10
raw_score = required_score + bonus_score - false_positive_penalty + action_adjustment
required_found = sum(1 for issue in required_issues if best_credit[issue.issue_id] > 0)
bonus_found = sum(
1
for issue in snippet.gold_issues
if not issue.required and best_credit[issue.issue_id] > 0
)
return _build_result(
score=raw_score,
matched_issue_ids=sorted(matched_ids),
false_positives=false_positives,
required_found=required_found,
required_total=len(required_issues),
bonus_found=bonus_found,
duplicate_comments=duplicate_comments,
breakdown={
"required_score": required_score,
"bonus_score": bonus_score,
"false_positive_penalty": -false_positive_penalty,
"action_adjustment": action_adjustment,
},
)
def _grade_medium(
snippet: CodeReviewSnippet,
comments: List[ReviewComment],
history: List[ReviewComment],
duplicate_comments: int,
) -> GradeResult:
required_issues = [issue for issue in snippet.gold_issues if issue.required]
required_denominator = max(len(required_issues), 1)
best_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues}
explanation_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues}
matched_ids: set[str] = set()
false_positives = 0
for comment in comments:
positive = False
comment_text = f"{comment.comment or ''} {comment.suggestion or ''}"
for issue in snippet.gold_issues:
if comment.line_number is None:
continue
distance = abs(comment.line_number - issue.line)
# FIX: Relaxed from distance <= 5 to distance <= 8
if distance > 8:
continue
credit = 0.0
keyword_match = _keyword_match(comment_text, issue.explanation_keywords)
# FIX: Old code required BOTH issue_type match AND exact/near line.
# New code: issue_type OR keyword match gives credit, distance tiers.
if comment.issue_type == IssueType.LOGIC and issue.issue_type == IssueType.LOGIC:
if distance <= 1:
# FIX: was "distance == 0" — now ±1 for full credit
credit = 0.25 / required_denominator if issue.required else 0.05
elif distance <= 3:
credit = 0.15 / required_denominator if issue.required else 0.03 # FIX: was 0.10
elif distance <= 8:
credit = 0.08 / required_denominator if issue.required else 0.02 # FIX: new tier
elif keyword_match:
# FIX: keyword match alone is worth more — was 0.05, now 0.10
if distance <= 3:
credit = 0.10 / required_denominator if issue.required else 0.03
elif distance <= 8:
credit = 0.05 / required_denominator if issue.required else 0.01
if credit > 0:
positive = True
best_credit[issue.issue_id] = max(best_credit[issue.issue_id], credit)
matched_ids.add(issue.issue_id)
# FIX: Use partial keyword score instead of binary
kw_score = _keyword_match_score(comment_text, issue.explanation_keywords)
if kw_score > 0:
explanation_credit[issue.issue_id] = max(
explanation_credit[issue.issue_id],
# FIX: Scale explanation bonus by keyword match quality
(0.05 * kw_score) / required_denominator if issue.required else (0.02 * kw_score),
)
if not positive:
false_positives += 1
base_score = sum(best_credit.values()) + sum(explanation_credit.values())
# FIX: Reduced FP penalty — was 0.08 per FP, now 0.05
false_positive_penalty = min(false_positives * 0.05, 0.25)
final_action = _terminal_action(history)
action_adjustment = 0.0
if snippet.must_reject and final_action == ActionType.REQUEST_CHANGES:
action_adjustment = 0.10
elif snippet.must_reject and final_action == ActionType.APPROVE:
action_adjustment = -0.15
required_found = sum(1 for issue in required_issues if best_credit[issue.issue_id] > 0)
bonus_found = sum(
1
for issue in snippet.gold_issues
if not issue.required and best_credit[issue.issue_id] > 0
)
return _build_result(
score=base_score - false_positive_penalty + action_adjustment,
matched_issue_ids=sorted(matched_ids),
false_positives=false_positives,
required_found=required_found,
required_total=len(required_issues),
bonus_found=bonus_found,
duplicate_comments=duplicate_comments,
breakdown={
"logic_score": sum(best_credit.values()),
"explanation_score": sum(explanation_credit.values()),
"false_positive_penalty": -false_positive_penalty,
"action_adjustment": action_adjustment,
},
)
def _grade_hard(
snippet: CodeReviewSnippet,
comments: List[ReviewComment],
history: List[ReviewComment],
duplicate_comments: int,
) -> GradeResult:
required_issues = [issue for issue in snippet.gold_issues if issue.required]
required_denominator = max(len(required_issues), 1)
best_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues}
owasp_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues}
fix_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues}
severity_credit: Dict[str, float] = {issue.issue_id: 0.0 for issue in snippet.gold_issues}
matched_ids: set[str] = set()
false_positives = 0
for comment in comments:
positive = False
comment_text = f"{comment.comment or ''} {comment.suggestion or ''}"
for issue in snippet.gold_issues:
# FIX: Was exact line match only (distance == 0).
# Security vulns span multiple lines — now ±2 tolerance.
if comment.line_number is None:
continue
distance = abs(comment.line_number - issue.line)
if distance > 2: # FIX: was `!= issue.line` (zero tolerance)
continue
credit = 0.0
if comment.issue_type == IssueType.SECURITY and issue.issue_type == IssueType.SECURITY:
if distance == 0:
credit = 0.20 / required_denominator if issue.required else 0.05
else:
# FIX: ±1-2 lines gets partial credit (was zero)
credit = 0.12 / required_denominator if issue.required else 0.03
# FIX: Even if issue_type is wrong, keyword match on SECURITY issue → small credit
elif _keyword_match(comment_text, getattr(issue, "owasp_keywords", []) + getattr(issue, "fix_keywords", [])):
if distance <= 2:
credit = 0.06 / required_denominator if issue.required else 0.02
if credit > 0:
positive = True
matched_ids.add(issue.issue_id)
best_credit[issue.issue_id] = max(best_credit[issue.issue_id], credit)
owasp_kw = list(getattr(issue, "owasp_keywords", []))
owasp_cat = [issue.owasp_category] if getattr(issue, "owasp_category", None) else []
if _keyword_match(comment_text, owasp_kw + owasp_cat):
owasp_credit[issue.issue_id] = max(
owasp_credit[issue.issue_id],
0.10 / required_denominator if issue.required else 0.02,
)
fix_kw = list(getattr(issue, "fix_keywords", []))
if _keyword_match(comment_text, fix_kw):
fix_credit[issue.issue_id] = max(
fix_credit[issue.issue_id],
0.05 / required_denominator if issue.required else 0.02,
)
if comment.severity in {Severity.HIGH, Severity.CRITICAL}:
if comment.severity == issue.severity or (
issue.severity == Severity.HIGH and comment.severity == Severity.CRITICAL
):
severity_credit[issue.issue_id] = max(
severity_credit[issue.issue_id], 0.05 / required_denominator
)
elif issue.severity == Severity.CRITICAL and comment.severity in {
Severity.LOW,
Severity.MEDIUM,
}:
# FIX: Only penalise if we actually matched (was applying even with no match)
if best_credit[issue.issue_id] > 0:
severity_credit[issue.issue_id] = min(
severity_credit[issue.issue_id], -0.05 / required_denominator
)
if not positive:
false_positives += 1
# Missing critical penalty
missing_critical_penalty = 0.0
for issue in required_issues:
if issue.severity == Severity.CRITICAL and best_credit[issue.issue_id] == 0:
missing_critical_penalty += 0.15
# FIX: Reduced FP penalty for hard task — was 0.10, now 0.07
# Hard tasks have many lines so innocent FPs should cost less
false_positive_penalty = min(false_positives * 0.07, 0.35)
final_action = _terminal_action(history)
action_adjustment = 0.0
if snippet.must_reject and final_action == ActionType.REQUEST_CHANGES:
action_adjustment = 0.10
elif snippet.must_reject and final_action == ActionType.APPROVE:
action_adjustment = -0.20
required_found = sum(1 for issue in required_issues if best_credit[issue.issue_id] > 0)
bonus_found = sum(
1
for issue in snippet.gold_issues
if not issue.required and best_credit[issue.issue_id] > 0
)
return _build_result(
score=(
sum(best_credit.values())
+ sum(owasp_credit.values())
+ sum(fix_credit.values())
+ sum(severity_credit.values())
- false_positive_penalty
- missing_critical_penalty
+ action_adjustment
),
matched_issue_ids=sorted(matched_ids),
false_positives=false_positives,
required_found=required_found,
required_total=len(required_issues),
bonus_found=bonus_found,
duplicate_comments=duplicate_comments,
breakdown={
"security_score": sum(best_credit.values()),
"owasp_score": sum(owasp_credit.values()),
"fix_score": sum(fix_credit.values()),
"severity_score": sum(severity_credit.values()),
"false_positive_penalty": -false_positive_penalty,
"missing_critical_penalty": -missing_critical_penalty,
"action_adjustment": action_adjustment,
},
)
def _build_result(
*,
score: float,
matched_issue_ids: List[str],
false_positives: int,
required_found: int,
required_total: int,
bonus_found: int,
duplicate_comments: int,
breakdown: Dict[str, float],
) -> GradeResult:
clamped_score = max(0.0, min(score, 1.0))
true_positives = len(matched_issue_ids)
missed_issues = max(required_total - required_found, 0)
precision = true_positives / max(true_positives + false_positives, 1)
recall = required_found / max(required_total, 1)
f1 = 0.0
if precision + recall:
f1 = 2 * precision * recall / (precision + recall)
breakdown = {
**breakdown,
"duplicate_comments": float(duplicate_comments),
"precision": precision,
"recall": recall,
"f1": f1,
"score": clamped_score,
}
return GradeResult(
score=clamped_score,
precision=precision,
recall=recall,
f1=f1,
true_positives=true_positives,
false_positives=false_positives,
missed_issues=missed_issues,
required_found=required_found,
required_total=required_total,
bonus_found=bonus_found,
matched_issue_ids=matched_issue_ids,
breakdown=breakdown,
)