File size: 7,718 Bytes
be8eade
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""Configuration for scenario authoring, curriculum, and cache-backed reset."""

from __future__ import annotations

import json
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Literal


ScenarioCacheMode = Literal["fallback", "require", "disabled"]


DEFAULT_SCENARIO_CONFIG_PATH = (
    Path(__file__).resolve().parent / "configs" / "scenario_authoring.small.json"
)


@dataclass(frozen=True)
class ScenarioAuthorConfig:
    provider: str = "huggingface"
    model_id: str = "deepseek-ai/DeepSeek-V4-Pro"
    thinking_mode: str = "thinking"
    reasoning_effort: str = "high"
    temperature: float = 1.0
    top_p: float = 1.0
    max_context_tokens: int = 131072


@dataclass(frozen=True)
class CurriculumCacheConfig:
    difficulty_bucket_count: int = 4
    difficulty_labels: list[str] = field(default_factory=lambda: ["D0", "D1", "D2", "D3"])
    train_scenarios_per_bucket: int = 25
    validation_scenarios_per_bucket: int = 10
    heldout_eval_scenarios_per_bucket: int = 10
    target_cache_hit_rate: float = 0.95
    target_reset_latency_ms: int = 200
    scenario_refresh_rate_per_epoch: float = 0.05
    difficulty_calibration_strategy: str = "baseline_agent_pass_rate"
    pass_rate_thresholds: dict[str, tuple[float, float]] = field(
        default_factory=lambda: {
            "D0": (0.8, 1.0),
            "D1": (0.6, 0.8),
            "D2": (0.4, 0.6),
            "D3": (0.2, 0.4),
        }
    )

    def minimum_for_split(self, split: str) -> int:
        if split == "hidden_eval":
            return self.heldout_eval_scenarios_per_bucket
        if split == "validation":
            return self.validation_scenarios_per_bucket
        return self.train_scenarios_per_bucket


@dataclass(frozen=True)
class ScenarioRuntimeConfig:
    cache_mode: ScenarioCacheMode = "fallback"
    cache_dir: str = "scenario_cache"
    generator_version: str = "scenario_generator_v1"
    verifier_version: str = "verifier_v1"


@dataclass(frozen=True)
class ScenarioAuthoringSettings:
    scenario_author: ScenarioAuthorConfig = field(default_factory=ScenarioAuthorConfig)
    curriculum: CurriculumCacheConfig = field(default_factory=CurriculumCacheConfig)
    runtime: ScenarioRuntimeConfig = field(default_factory=ScenarioRuntimeConfig)
    source_path: str = ""


def load_scenario_authoring_config(path: str | Path | None = None) -> ScenarioAuthoringSettings:
    """Load and validate the small scenario-authoring config with env overrides."""

    configured_path = Path(
        path
        or os.getenv("CYBERSECURITY_OWASP_SCENARIO_CONFIG", "")
        or DEFAULT_SCENARIO_CONFIG_PATH
    )
    raw = json.loads(configured_path.read_text(encoding="utf-8"))
    raw = _apply_env_overrides(raw)
    settings = ScenarioAuthoringSettings(
        scenario_author=ScenarioAuthorConfig(**raw.get("scenario_author", {})),
        curriculum=_curriculum_from_raw(raw.get("curriculum", {})),
        runtime=ScenarioRuntimeConfig(**raw.get("runtime", {})),
        source_path=str(configured_path),
    )
    _validate_settings(settings)
    return settings


def _apply_env_overrides(raw: dict[str, Any]) -> dict[str, Any]:
    data = json.loads(json.dumps(raw))
    author = data.setdefault("scenario_author", {})
    curriculum = data.setdefault("curriculum", {})
    runtime = data.setdefault("runtime", {})

    _set_if_present(author, "model_id", "CYBERSECURITY_OWASP_SCENARIO_AUTHOR_MODEL")
    _set_if_present(author, "provider", "CYBERSECURITY_OWASP_SCENARIO_AUTHOR_PROVIDER")
    _set_if_present(author, "thinking_mode", "CYBERSECURITY_OWASP_SCENARIO_THINKING_MODE")
    _set_if_present(author, "reasoning_effort", "CYBERSECURITY_OWASP_SCENARIO_REASONING_EFFORT")
    _set_if_present(author, "temperature", "CYBERSECURITY_OWASP_SCENARIO_TEMPERATURE", float)
    _set_if_present(author, "top_p", "CYBERSECURITY_OWASP_SCENARIO_TOP_P", float)
    _set_if_present(author, "max_context_tokens", "CYBERSECURITY_OWASP_SCENARIO_MAX_CONTEXT", int)

    _set_if_present(curriculum, "difficulty_bucket_count", "CYBERSECURITY_OWASP_DIFFICULTY_BUCKETS", int)
    _set_if_present(curriculum, "train_scenarios_per_bucket", "CYBERSECURITY_OWASP_TRAIN_SCENARIOS_PER_BUCKET", int)
    _set_if_present(curriculum, "validation_scenarios_per_bucket", "CYBERSECURITY_OWASP_VALIDATION_SCENARIOS_PER_BUCKET", int)
    _set_if_present(curriculum, "heldout_eval_scenarios_per_bucket", "CYBERSECURITY_OWASP_HELDOUT_SCENARIOS_PER_BUCKET", int)
    _set_if_present(curriculum, "target_cache_hit_rate", "CYBERSECURITY_OWASP_TARGET_CACHE_HIT_RATE", float)
    _set_if_present(curriculum, "target_reset_latency_ms", "CYBERSECURITY_OWASP_TARGET_RESET_LATENCY_MS", int)
    _set_if_present(curriculum, "scenario_refresh_rate_per_epoch", "CYBERSECURITY_OWASP_SCENARIO_REFRESH_RATE", float)
    _set_if_present(curriculum, "difficulty_calibration_strategy", "CYBERSECURITY_OWASP_DIFFICULTY_CALIBRATION")

    _set_if_present(runtime, "cache_dir", "CYBERSECURITY_OWASP_SCENARIO_CACHE_DIR")
    _set_if_present(runtime, "cache_mode", "CYBERSECURITY_OWASP_SCENARIO_CACHE_MODE")
    _set_if_present(runtime, "generator_version", "CYBERSECURITY_OWASP_SCENARIO_GENERATOR_VERSION")
    _set_if_present(runtime, "verifier_version", "CYBERSECURITY_OWASP_SCENARIO_VERIFIER_VERSION")
    return data


def _set_if_present(
    target: dict[str, Any],
    key: str,
    env_name: str,
    caster: type | None = None,
) -> None:
    value = os.getenv(env_name)
    if value is None:
        return
    target[key] = caster(value) if caster else value


def _curriculum_from_raw(raw: dict[str, Any]) -> CurriculumCacheConfig:
    values = dict(raw)
    bucket_count = int(values.get("difficulty_bucket_count", 4))
    labels = list(values.get("difficulty_labels") or [])
    if len(labels) < bucket_count:
        labels.extend(f"D{index}" for index in range(len(labels), bucket_count))
    values["difficulty_labels"] = labels
    thresholds = values.get("pass_rate_thresholds") or {}
    values["pass_rate_thresholds"] = {
        str(key): tuple(float(item) for item in value)
        for key, value in thresholds.items()
    }
    return CurriculumCacheConfig(**values)


def _validate_settings(settings: ScenarioAuthoringSettings) -> None:
    author = settings.scenario_author
    curriculum = settings.curriculum
    runtime = settings.runtime

    if not author.model_id:
        raise ValueError("scenario_author.model_id is required")
    if author.temperature <= 0.0 or author.top_p <= 0.0:
        raise ValueError("scenario author sampling values must be positive")
    if author.max_context_tokens < 4096:
        raise ValueError("scenario author max_context_tokens is too small")
    if curriculum.difficulty_bucket_count <= 0:
        raise ValueError("difficulty_bucket_count must be positive")
    if len(curriculum.difficulty_labels) < curriculum.difficulty_bucket_count:
        raise ValueError("difficulty_labels must cover every configured bucket")
    for attr in (
        "train_scenarios_per_bucket",
        "validation_scenarios_per_bucket",
        "heldout_eval_scenarios_per_bucket",
        "target_reset_latency_ms",
    ):
        if int(getattr(curriculum, attr)) <= 0:
            raise ValueError(f"{attr} must be positive")
    if not 0.0 < curriculum.target_cache_hit_rate <= 1.0:
        raise ValueError("target_cache_hit_rate must be in (0, 1]")
    if not 0.0 <= curriculum.scenario_refresh_rate_per_epoch <= 1.0:
        raise ValueError("scenario_refresh_rate_per_epoch must be in [0, 1]")
    if runtime.cache_mode not in {"fallback", "require", "disabled"}:
        raise ValueError("runtime.cache_mode must be fallback, require, or disabled")