Spaces:
Sleeping
Sleeping
| """Deterministic graders for the vulnerability triage benchmark.""" | |
| from __future__ import annotations | |
| import re | |
| from typing import Dict, Iterable, List | |
| try: | |
| from ..models import TriageDraft | |
| from .cases import CASE_DEFINITIONS, CaseDefinition, get_case_definition | |
| except ImportError: | |
| from models import TriageDraft | |
| from server.cases import CASE_DEFINITIONS, CaseDefinition, get_case_definition | |
| WEIGHTS: Dict[str, float] = { | |
| "validity": 0.20, | |
| "affected_package": 0.10, | |
| "affected_versions": 0.10, | |
| "severity": 0.20, | |
| "exploitability": 0.15, | |
| "next_action": 0.15, | |
| "missing_information": 0.10, | |
| } | |
| TERMINAL_SCORE_EPSILON = 0.0001 | |
| def normalize_text(value: str) -> str: | |
| return " ".join(value.strip().lower().split()) | |
| def normalize_list(values: Iterable[str]) -> List[str]: | |
| return sorted({normalize_text(value) for value in values if normalize_text(value)}) | |
| def set_similarity(actual: Iterable[str], expected: Iterable[str]) -> float: | |
| actual_set = set(normalize_list(actual)) | |
| expected_set = set(normalize_list(expected)) | |
| if not actual_set and not expected_set: | |
| return 1.0 | |
| if not actual_set or not expected_set: | |
| return 0.0 | |
| union = actual_set | expected_set | |
| return len(actual_set & expected_set) / len(union) | |
| def field_match(actual: str, expected: str) -> float: | |
| return 1.0 if normalize_text(actual) == normalize_text(expected) else 0.0 | |
| def _normalize_version_range(value: str) -> str: | |
| """Canonicalize a version range string for flexible comparison. | |
| Two representations that are treated as equivalent: | |
| - A trivial lower bound ``>=0`` / ``>=0.0`` / ``>=0.0.0`` followed by a | |
| comma is stripped, so ``>=0,<0.1.5`` compares equal to ``<0.1.5``. | |
| - Semicolon-separated multi-branch segments are sorted so submission | |
| order does not matter. | |
| """ | |
| text = normalize_text(value) | |
| segments = [seg.strip() for seg in text.split(";") if seg.strip()] | |
| normalized: List[str] = [] | |
| for seg in segments: | |
| # Remove trivial lower-bound prefix: >=0, >=0.0, >=0.0.0 before comma | |
| seg = re.sub(r">=\s*0(?:\.0)*\s*,\s*", "", seg) | |
| # Collapse whitespace around comparison operators | |
| seg = re.sub(r"\s*([><=!]+)\s*", r"\1", seg).strip() | |
| if seg: | |
| normalized.append(seg) | |
| return " ; ".join(sorted(normalized)) | |
| def version_range_match(actual: str, expected: str) -> float: | |
| """Score 1.0 when version ranges are semantically equivalent.""" | |
| return ( | |
| 1.0 | |
| if _normalize_version_range(actual) == _normalize_version_range(expected) | |
| else 0.0 | |
| ) | |
| def normalize_terminal_score(score: float) -> float: | |
| """Clamp terminal task scores into the validator's required open interval.""" | |
| rounded = round(min(max(float(score), 0.0), 1.0), 4) | |
| if rounded <= 0.0: | |
| return TERMINAL_SCORE_EPSILON | |
| if rounded >= 1.0: | |
| return round(1.0 - TERMINAL_SCORE_EPSILON, 4) | |
| return rounded | |
| def grade_case(case: CaseDefinition, draft: TriageDraft) -> Dict[str, float]: | |
| breakdown = { | |
| "validity": field_match(draft.validity, case.truth.validity), | |
| "affected_package": field_match( | |
| draft.affected_package, case.truth.affected_package | |
| ), | |
| "affected_versions": version_range_match( | |
| draft.affected_versions, case.truth.affected_versions | |
| ), | |
| "severity": field_match(draft.severity, case.truth.severity), | |
| "exploitability": field_match( | |
| draft.exploitability, case.truth.exploitability | |
| ), | |
| "next_action": field_match(draft.next_action, case.truth.next_action), | |
| "missing_information": set_similarity( | |
| draft.missing_information, case.truth.missing_information | |
| ), | |
| } | |
| breakdown["total"] = round( | |
| sum(breakdown[name] * WEIGHTS[name] for name in WEIGHTS), 4 | |
| ) | |
| # Cascading Difficulty Penalties | |
| difficulty = getattr(case, 'difficulty', 'hard') | |
| # HARD Penalty: if truth action is request_info (meaning a patch is missing) | |
| # and the model tries to publish an advisory or patch, it's a critical logic failure. | |
| if difficulty == "hard" and case.truth.next_action == "request_info" and draft.next_action in {"publish_advisory", "patch"}: | |
| breakdown["total"] = round(breakdown["total"] * 0.5, 4) | |
| # MEDIUM Penalty: Medium tasks specifically test the ability to decode | |
| # raw intelligence (code diffs, conflicting EPSS/honeypot numbers) against decoys. | |
| # If a model fails to extract both severity and exploitability accurately, it failed the core challenge. | |
| elif difficulty == "medium" and breakdown["severity"] == 0.0 and breakdown["exploitability"] == 0.0: | |
| breakdown["total"] = round(breakdown["total"] * 0.75, 4) | |
| return breakdown | |
| def grade_task(task_id: str, draft: TriageDraft) -> Dict[str, float]: | |
| return grade_case(get_case_definition(task_id), draft) | |