File size: 4,545 Bytes
f577d1f
 
 
 
 
 
 
 
 
 
 
 
 
b44d7b0
f577d1f
 
9737348
 
f577d1f
 
 
 
 
9737348
 
 
 
 
 
f577d1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9737348
f577d1f
 
9737348
f577d1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b44d7b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f577d1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""Grader for the Deceit environment.

Two-stage design:
  Stage 1 — exact match (normalized): handles ~80% of cases, zero cost.
  Stage 2 — GPT-4o-mini semantic match: only when exact fails, results cached.
"""

from __future__ import annotations

import hashlib
import json
import re
import pathlib
import time
from dataclasses import dataclass

import os

try:
    from openai import OpenAI
except ImportError:
    OpenAI = None  # type: ignore[assignment,misc]

def _default_cache_path() -> pathlib.Path:
    """Use DECEIT_GRADER_CACHE env var, falling back to /tmp."""
    env_path = os.environ.get("DECEIT_GRADER_CACHE")
    if env_path:
        return pathlib.Path(env_path)
    return pathlib.Path("/tmp/deceit_grader_cache.json")


@dataclass
class GraderResult:
    correct: bool
    method: str  # "exact" | "semantic" | "abstain"
    explanation: str


def _normalize(text: str) -> str:
    text = text.lower().strip()
    text = re.sub(r"[^\w\s]", "", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text


class Grader:
    """Checks agent answers against ground truth with caching."""

    def __init__(
        self,
        cache_path: str | pathlib.Path | None = None,
        openai_api_key: str | None = None,
    ) -> None:
        self._cache_path = pathlib.Path(cache_path) if cache_path is not None else _default_cache_path()
        self._openai_api_key = openai_api_key
        self._cache: dict[str, bool] = {}
        if self._cache_path.exists():
            try:
                self._cache = json.loads(self._cache_path.read_text(encoding="utf-8"))
            except (json.JSONDecodeError, OSError):
                self._cache = {}

    def check(self, answer: str, ground_truth: str) -> GraderResult:
        """Grade answer against ground_truth. Returns GraderResult."""
        if not answer:
            return GraderResult(correct=False, method="exact", explanation="empty answer")

        if _normalize(answer) == _normalize(ground_truth):
            return GraderResult(correct=True, method="exact", explanation="normalized exact match")

        return self._semantic_check(answer, ground_truth)

    def _semantic_check(self, answer: str, ground_truth: str) -> GraderResult:
        cache_key = hashlib.sha256(f"{answer}|{ground_truth}".encode()).hexdigest()
        if cache_key in self._cache:
            correct = self._cache[cache_key]
            return GraderResult(
                correct=correct,
                method="semantic",
                explanation="cached semantic match" if correct else "cached semantic mismatch",
            )

        if not self._openai_api_key:
            raise RuntimeError(
                "Semantic match required but no OpenAI API key configured. "
                "Pass openai_api_key to Grader() or set OPENAI_API_KEY env var."
            )

        if OpenAI is None:
            raise RuntimeError("openai package is not installed. Run: pip install openai")

        client = OpenAI(api_key=self._openai_api_key)
        prompt = (
            f"Is '{answer}' semantically equivalent to '{ground_truth}'? "
            "Reply YES or NO only."
        )

        max_retries = 3
        for attempt in range(max_retries):
            try:
                response = client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[{"role": "user", "content": prompt}],
                    max_tokens=5,
                    temperature=0,
                )
                break
            except Exception as e:
                if "429" in str(e) or "RateLimitError" in type(e).__name__:
                    print(f"[grader] Rate limit hit (attempt {attempt + 1}/{max_retries}), waiting 25s...")
                    time.sleep(25)
                    if attempt == max_retries - 1:
                        raise
                else:
                    raise

        verdict = response.choices[0].message.content.strip().upper()
        correct = verdict.startswith("YES")

        self._cache[cache_key] = correct
        self._save_cache()

        return GraderResult(
            correct=correct,
            method="semantic",
            explanation="semantic match" if correct else "semantic mismatch",
        )

    def _save_cache(self) -> None:
        self._cache_path.parent.mkdir(parents=True, exist_ok=True)
        tmp = self._cache_path.with_suffix(".tmp")
        tmp.write_text(json.dumps(self._cache, indent=2), encoding="utf-8")
        tmp.replace(self._cache_path)