File size: 3,009 Bytes
6abc8c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
be8eade
 
 
6abc8c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Multi-layer deterministic verifier for CyberSecurity_OWASP."""

from __future__ import annotations

import json
from dataclasses import dataclass
from typing import Any

try:
    from ..models import CyberSecurityOWASPAction, CyberSecurityOWASPState
    from ..validators import (
        patch_quality,
        run_hidden_regression_tests,
        run_hidden_security_tests,
        run_public_route_tests,
        run_visible_tests,
        verify_finding,
    )
    from .authz_oracle import AuthzOracle
except ImportError:  # pragma: no cover
    from models import CyberSecurityOWASPAction, CyberSecurityOWASPState
    from validators import (
        patch_quality,
        run_hidden_regression_tests,
        run_hidden_security_tests,
        run_public_route_tests,
        run_visible_tests,
        verify_finding,
    )
    from server.authz_oracle import AuthzOracle


@dataclass
class MultiLayerVerifier:
    """Aggregates visible, hidden, oracle, regression, and patch-quality checks."""

    oracle: AuthzOracle = AuthzOracle()

    def evaluate_action(
        self,
        state: CyberSecurityOWASPState,
        action: CyberSecurityOWASPAction,
        anti_cheat_flags: list[str] | None = None,
        *,
        invalid_action: bool = False,
    ) -> dict[str, Any]:
        verifier_result: dict[str, Any] = {
            "anti_cheat_flags": anti_cheat_flags or [],
            "invalid_action": invalid_action,
            "repeated_action": self._is_repeated_action(state, action),
        }
        if action.tool_name == "submit_diagnosis":
            verifier_result["diagnosis"] = verify_finding(state, action.arguments)
            verifier_result["finding"] = verifier_result["diagnosis"]
        elif action.tool_name == "run_visible_tests":
            verifier_result["visible"] = run_visible_tests(state)
        elif action.tool_name == "submit_fix":
            verifier_result.update(self.run_terminal_checks(state))
        return verifier_result

    def run_terminal_checks(self, state: CyberSecurityOWASPState) -> dict[str, Any]:
        security = run_hidden_security_tests(state)
        return {
            "visible": run_visible_tests(state),
            "hidden_tests": security,
            "security": security,
            "oracle_matrix": self.oracle.evaluate(state),
            "regression": run_hidden_regression_tests(state),
            "public_routes": run_public_route_tests(state),
            "patch_quality": patch_quality(state),
        }

    def public_summary(self, verifier_result: dict[str, Any]) -> dict[str, Any]:
        """Return verifier fields that are safe for state/debug summaries."""

        return json.loads(json.dumps(verifier_result))

    def _is_repeated_action(
        self, state: CyberSecurityOWASPState, action: CyberSecurityOWASPAction
    ) -> bool:
        current = {"tool_name": action.tool_name, "arguments": action.arguments}
        return sum(1 for item in state.action_history if item == current) > 1