File size: 3,232 Bytes
6abc8c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
be8eade
 
 
 
 
 
 
6abc8c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""Episode artifact logging for training, debugging, and demos."""

from __future__ import annotations

import json
import os
from pathlib import Path
from typing import Any

try:
    from ..models import CyberSecurityOWASPState
except ImportError:  # pragma: no cover
    from models import CyberSecurityOWASPState


class EpisodeArtifactLogger:
    """Appends compact JSONL episode transcripts under outputs/rollouts."""

    def __init__(self, output_path: str | Path | None = None):
        configured = output_path or os.getenv("CYBERSECURITY_OWASP_EPISODE_LOG")
        self.output_path = Path(configured) if configured else Path("outputs/rollouts/episodes.jsonl")

    def log_episode(
        self,
        state: CyberSecurityOWASPState,
        *,
        final_observation: dict[str, Any] | None = None,
    ) -> Path:
        self.output_path.parent.mkdir(parents=True, exist_ok=True)
        record = {
            "episode_id": state.episode_id,
            "task_id": state.task_id,
            "seed": state.seed,
            "split": state.split,
            "difficulty": state.difficulty,
            "difficulty_tier": state.difficulty_tier,
            "template_id": state.template_id,
            "scenario_family": state.scenario_family,
            "domain": state.domain,
            "bug_family": state.bug_family,
            "target_weakness": state.target_weakness,
            "agent_actions": state.action_history,
            "observations": state.observation_history,
            "final_observation": final_observation or {},
            "patch_diff": state.patch_diff,
            "visible_test_result": self._verifier_layer(state, "visible"),
            "hidden_test_result": self._verifier_layer(state, "hidden_tests"),
            "oracle_result": self._verifier_layer(state, "oracle_matrix"),
            "regression_result": self._verifier_layer(state, "regression"),
            "reward_breakdown": state.reward_history[-1] if state.reward_history else {},
            "reward_breakdown_by_step": state.reward_history,
            "total_reward": state.accumulated_reward,
            "final_reward_breakdown": state.reward_history[-1] if state.reward_history else {},
            "progress_reward_total": state.progress_reward_total,
            "completion_tokens": state.completion_tokens,
            "diagnosis_submitted": state.diagnosis_submitted,
            "diagnosis": state.diagnosis,
            "request_trace": state.request_trace,
            "final_status": "resolved" if state.success else "failed",
            "failure_reason": state.failure_reason,
            "safety_violations": [
                flag for flag in state.anti_cheat_flags if "network" in flag or "unsafe" in flag
            ],
            "anti_cheat_flags": state.anti_cheat_flags,
            "metrics": state.metrics,
        }
        with self.output_path.open("a", encoding="utf-8") as handle:
            handle.write(json.dumps(record, sort_keys=True) + "\n")
        state.episode_artifact_path = str(self.output_path)
        return self.output_path

    def _verifier_layer(self, state: CyberSecurityOWASPState, key: str) -> Any:
        return (state.verification_summary or {}).get(key)