File size: 8,388 Bytes
b3ee507
0e7f59c
 
b3ee507
 
0e7f59c
b3ee507
 
 
0e7f59c
b3ee507
 
2eada22
b3ee507
 
0e7f59c
 
b3ee507
 
 
 
 
 
be8eade
b3ee507
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7b8ac6
 
 
 
b3ee507
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2eada22
 
 
 
 
 
 
 
 
 
632c145
 
2eada22
 
 
 
 
 
 
 
 
 
 
632c145
 
2eada22
 
 
 
 
632c145
 
2eada22
 
632c145
2eada22
632c145
0e7f59c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7b8ac6
0e7f59c
 
 
 
 
f7b8ac6
0e7f59c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import json
import sys
import types

from CyberSecurity_OWASP.models import CyberSecurityOWASPAction
from CyberSecurity_OWASP.reward_config import load_reward_settings
from training.trackio_utils import (
    CANONICAL_TRACKIO_SIGNALS,
    DERIVED_TRACKIO_METRICS,
    REWARD_CONFIG_TABLE_COLUMNS,
    aggregate_episode_metrics,
    episode_record_from_state,
    episode_trace_fingerprint,
    episode_to_trace_row,
    episode_to_tracking_fields,
    log_reward_config,
    reward_config_scalar_metrics,
)

from .helpers import apply_secure_patch, make_env, secure_invoice_source, submit_valid_finding


def test_canonical_tracking_fields_exist_and_are_numeric_where_expected():
    assert len(CANONICAL_TRACKIO_SIGNALS) >= 57

    env = make_env(70)
    try:
        submit_valid_finding(env)
        apply_secure_patch(env)
        env.step(CyberSecurityOWASPAction(tool_name="run_visible_tests"))
        final = env.step(CyberSecurityOWASPAction(tool_name="submit_fix"))

        record = episode_record_from_state(env.state, final_observation=final.model_dump())
        fields = episode_to_tracking_fields(record)

        for key in (*CANONICAL_TRACKIO_SIGNALS, *DERIVED_TRACKIO_METRICS):
            assert key in fields

        assert isinstance(fields["reward/hidden_authz_pass_rate"], float)
        assert isinstance(fields["reward/normal_flow_pass_rate"], float)
        assert isinstance(fields["reward/public_hidden_gap"], float)
        assert isinstance(fields["reward/dense_to_terminal_ratio"], float)
        assert isinstance(fields["episode/time_to_first_patch"], float)
        assert isinstance(fields["episode/repeated_action_rate"], float)
        assert isinstance(fields["episode/patch_to_hidden_success_conversion_rate"], float)
        assert isinstance(fields["skill/exploit_to_patch_alignment"], float)

        metrics = aggregate_episode_metrics([record])
        assert metrics["reward/hidden_authz_pass_rate"] == 1.0
        assert metrics["skill/patch_success"] == 1.0
    finally:
        env.close()


def test_tracking_cheat_score_flags_hardcoded_identifier():
    env = make_env(71)
    try:
        submit_valid_finding(env)
        hidden = env.state.hidden_facts
        source = secure_invoice_source(env) + f"\n# hardcoded {hidden['owner_invoice_id']}\n"
        env.step(
            CyberSecurityOWASPAction(
                tool_name="patch_file",
                arguments={"path": "app/routes/invoices.py", "content": source},
            )
        )
        final = env.step(CyberSecurityOWASPAction(tool_name="submit_fix"))

        record = episode_record_from_state(env.state, final_observation=final.model_dump())
        fields = episode_to_tracking_fields(record)

        assert fields["cheat/hardcoded_scenario_id"] == 1.0
        assert fields["cheat/score"] >= 50.0
    finally:
        env.close()


def test_trace_rows_redact_hidden_values_from_action_arguments():
    env = make_env(72)
    try:
        hidden = dict(env.state.hidden_facts)
        submit_valid_finding(env)
        apply_secure_patch(env)
        env.step(CyberSecurityOWASPAction(tool_name="run_visible_tests"))
        final = env.step(CyberSecurityOWASPAction(tool_name="submit_fix"))

        record = episode_record_from_state(env.state, final_observation=final.model_dump())
        row = episode_to_trace_row(record)
        row_text = json.dumps(row, sort_keys=True)

        for key in (
            "owner_user_id",
            "intruder_user_id",
            "admin_user_id",
            "owner_invoice_id",
            "other_invoice_id",
            "foreign_invoice_id",
            "tenant_a",
            "tenant_b",
        ):
            value = str(hidden.get(key, ""))
            assert not value or value not in row_text
    finally:
        env.close()


def test_trace_fingerprint_ignores_episode_id_but_tracks_action_changes():
    base_record = {
        "episode_id": "episode-a",
        "task_id": "task-1",
        "scenario/seed": 123,
        "scenario/split": "train",
        "scenario/difficulty": 0,
        "scenario/bug_type": "bola_idor",
        "scenario/template_id": "fastapi_basic",
        "scenario_hash": "scenario-a",
        "action_history": [
            {
                "tool_name": "read_file",
                "arguments": {"path": "app/routes/invoices.py"},
            }
        ],
        "observation_history": [{"last_action_valid": True}],
        "reward_breakdown": {"total": 0.0},
    }
    same_trace = dict(base_record)
    same_trace["episode_id"] = "episode-b"
    token_only_reward_change = dict(base_record)
    token_only_reward_change["reward_total"] = -0.25
    changed_trace = dict(base_record)
    changed_trace["action_history"] = [
        *base_record["action_history"],
        {"tool_name": "submit_fix", "arguments": {}},
    ]
    different_scenario = dict(base_record)
    different_scenario["scenario_hash"] = "scenario-b"

    assert episode_trace_fingerprint(base_record) == episode_trace_fingerprint(same_trace)
    assert episode_trace_fingerprint(base_record) == episode_trace_fingerprint(token_only_reward_change)
    assert episode_trace_fingerprint(base_record) != episode_trace_fingerprint(changed_trace)
    assert episode_trace_fingerprint(base_record) != episode_trace_fingerprint(different_scenario)


def test_log_reward_config_emits_scalar_values_and_table(monkeypatch):
    logged: list[tuple[dict, int | None]] = []

    class FakeTable:
        def __init__(self, *, columns, data=None, rows=None, allow_mixed_types=False):
            self.columns = columns
            self.rows = data if data is not None else rows
            self.data = self.rows
            self.allow_mixed_types = allow_mixed_types

    fake_trackio = types.SimpleNamespace(config={}, Table=FakeTable)

    def fake_log(payload, step=None):
        logged.append((payload, step))

    fake_trackio.log = fake_log
    monkeypatch.setitem(sys.modules, "trackio", fake_trackio)
    monkeypatch.setenv("CYBERSECURITY_OWASP_REWARD_MODE", "dense_train")
    monkeypatch.setenv("CYBERSECURITY_OWASP_REWARD_STAGE", "early")
    monkeypatch.setenv("CYBERSECURITY_OWASP_REWARD_VARIANT", "abl-test")

    settings = load_reward_settings()
    summary = log_reward_config(settings, step=0)

    assert fake_trackio.config["reward_config_hash"] == summary["reward_config_hash"]
    assert fake_trackio.config["reward_variant"] == "abl-test"
    assert fake_trackio.config["reward_config_values"]["policy_inspected"]["value"] == 0.30
    assert fake_trackio.config["reward_config__policy_inspected__value"] == 0.30
    scalar_payload = next(payload for payload, _step in logged if "reward_config/policy_inspected/value" in payload)
    assert scalar_payload["reward_config/policy_inspected/value"] == 0.30
    assert scalar_payload["reward_config/shaping_weight/resolved"] == 1.0
    assert scalar_payload["reward_config/invalid_action/value"] == -0.20
    assert scalar_payload["reward_config/progressive_cap/value"] == 5.0
    assert scalar_payload["reward_config/oversized_patch/severe_value"] == -1.0

    table = next(payload["reward_config"] for payload, _step in logged if "reward_config" in payload)
    assert table.columns == list(REWARD_CONFIG_TABLE_COLUMNS)
    assert table.allow_mixed_types is True
    rows = {row[0]: row for row in table.rows}
    assert rows["policy_inspected"][1] == 0.30
    assert rows["shaping_weight"][2] == 1.0
    assert rows["hidden_file_probe"][6] is True

    logged_text = json.dumps(
        {
            "summary": summary,
            "scalar_payload": scalar_payload,
            "table_rows": table.rows,
        },
        sort_keys=True,
        default=str,
    )
    assert "owner_invoice_id" not in logged_text
    assert "foreign_invoice_id" not in logged_text


def test_reward_config_scalar_metrics_uses_stage_resolved_values(monkeypatch):
    monkeypatch.setenv("CYBERSECURITY_OWASP_REWARD_MODE", "dense_train")
    monkeypatch.setenv("CYBERSECURITY_OWASP_REWARD_STAGE", "late")

    metrics = reward_config_scalar_metrics(load_reward_settings())

    assert metrics["reward_config/shaping_weight/resolved"] == 0.4
    assert metrics["reward_config/shaping_weight/stage_value"] == 0.4
    assert metrics["reward_config/step_penalty/stage_value"] == -0.02
    assert metrics["reward_config/token_penalty/target_tokens"] == 350.0