File size: 4,541 Bytes
c8e832f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""Deterministic grading helpers for PR-review tasks."""

from __future__ import annotations

import re
from dataclasses import dataclass
from typing import Iterable, List, Optional, Sequence, Set

try:
    from models import ReviewFinding, TaskGrade
    from server.task_bank import RubricIssue, TaskSpec
except ModuleNotFoundError:  # pragma: no cover
    from ..models import ReviewFinding, TaskGrade
    from .task_bank import RubricIssue, TaskSpec


FALSE_POSITIVE_PENALTY = 0.10
DUPLICATE_PENALTY = 0.05


@dataclass(frozen=True)
class FindingMatch:
    """Result of matching one finding against the rubric."""

    issue_id: Optional[str]
    duplicate: bool = False


def finding_fingerprint(finding: ReviewFinding) -> str:
    """Build a deterministic fingerprint for duplicate detection."""

    text = " ".join(
        [
            finding.file_path,
            str(finding.line or 0),
            finding.category,
            finding.severity,
            finding.title,
            finding.explanation,
            finding.suggested_fix,
        ]
    )
    return "|".join(sorted(tokens(text)))


def match_finding(

    finding: ReviewFinding,

    task: TaskSpec,

    matched_issue_ids: Set[str],

    seen_fingerprints: Set[str],

) -> FindingMatch:
    """Match one finding against the remaining rubric issues."""

    fingerprint = finding_fingerprint(finding)
    if fingerprint in seen_fingerprints:
        return FindingMatch(issue_id=None, duplicate=True)

    for issue in task.rubric_issues:
        if issue.issue_id in matched_issue_ids:
            continue
        if finding_matches_issue(finding, issue):
            return FindingMatch(issue_id=issue.issue_id)
    return FindingMatch(issue_id=None)


def finding_matches_issue(finding: ReviewFinding, issue: RubricIssue) -> bool:
    """Return True when a finding deterministically matches a rubric issue."""

    if finding.file_path != issue.file_path:
        return False
    if finding.category != issue.category:
        return False
    if finding.severity != issue.severity:
        return False
    if finding.line is None or abs(finding.line - issue.line) > 2:
        return False

    finding_tokens = tokens(
        " ".join([finding.title, finding.explanation, finding.suggested_fix])
    )
    keyword_hits = sum(1 for keyword in issue.keywords if keyword in finding_tokens)
    return keyword_hits >= issue.min_keyword_hits


def score_task(

    task: TaskSpec,

    matched_issue_ids: Iterable[str],

    false_positives: int = 0,

    duplicate_findings: int = 0,

) -> TaskGrade:
    """Score a task from cumulative episode state."""

    matched_set = set(matched_issue_ids)
    matched_weight = sum(
        issue.weight for issue in task.rubric_issues if issue.issue_id in matched_set
    )
    raw_score = matched_weight
    raw_score -= false_positives * FALSE_POSITIVE_PENALTY
    raw_score -= duplicate_findings * DUPLICATE_PENALTY
    score = max(0.0, min(1.0, round(raw_score, 6)))
    return TaskGrade(
        score=score,
        matched_issue_ids=sorted(matched_set),
        false_positives=false_positives,
        duplicate_findings=duplicate_findings,
        matched_weight=min(1.0, round(matched_weight, 6)),
    )


def grade_findings(task: TaskSpec, findings: Sequence[ReviewFinding]) -> TaskGrade:
    """Offline-grade a batch of findings for one task."""

    matched_issue_ids: Set[str] = set()
    seen_fingerprints: Set[str] = set()
    false_positives = 0
    duplicate_findings = 0

    for finding in findings:
        result = match_finding(
            finding=finding,
            task=task,
            matched_issue_ids=matched_issue_ids,
            seen_fingerprints=seen_fingerprints,
        )
        fingerprint = finding_fingerprint(finding)
        if result.duplicate:
            duplicate_findings += 1
            continue
        seen_fingerprints.add(fingerprint)
        if result.issue_id is None:
            false_positives += 1
            continue
        matched_issue_ids.add(result.issue_id)

    return score_task(
        task=task,
        matched_issue_ids=matched_issue_ids,
        false_positives=false_positives,
        duplicate_findings=duplicate_findings,
    )


def tokens(text: str) -> Set[str]:
    """Normalize free text into deterministic comparison tokens."""

    return set(re.findall(r"[a-z0-9_]+", text.lower()))