Cyber_analyst-round1 / server /episode_logger.py
Humanlearning's picture
feat: enhance scenario authoring and caching mechanisms, update action submission terminology, and improve reward configuration for CyberSecurity_OWASP environment
be8eade
"""Episode artifact logging for training, debugging, and demos."""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any
try:
from ..models import CyberSecurityOWASPState
except ImportError: # pragma: no cover
from models import CyberSecurityOWASPState
class EpisodeArtifactLogger:
"""Appends compact JSONL episode transcripts under outputs/rollouts."""
def __init__(self, output_path: str | Path | None = None):
configured = output_path or os.getenv("CYBERSECURITY_OWASP_EPISODE_LOG")
self.output_path = Path(configured) if configured else Path("outputs/rollouts/episodes.jsonl")
def log_episode(
self,
state: CyberSecurityOWASPState,
*,
final_observation: dict[str, Any] | None = None,
) -> Path:
self.output_path.parent.mkdir(parents=True, exist_ok=True)
record = {
"episode_id": state.episode_id,
"task_id": state.task_id,
"seed": state.seed,
"split": state.split,
"difficulty": state.difficulty,
"difficulty_tier": state.difficulty_tier,
"template_id": state.template_id,
"scenario_family": state.scenario_family,
"domain": state.domain,
"bug_family": state.bug_family,
"target_weakness": state.target_weakness,
"agent_actions": state.action_history,
"observations": state.observation_history,
"final_observation": final_observation or {},
"patch_diff": state.patch_diff,
"visible_test_result": self._verifier_layer(state, "visible"),
"hidden_test_result": self._verifier_layer(state, "hidden_tests"),
"oracle_result": self._verifier_layer(state, "oracle_matrix"),
"regression_result": self._verifier_layer(state, "regression"),
"reward_breakdown": state.reward_history[-1] if state.reward_history else {},
"reward_breakdown_by_step": state.reward_history,
"total_reward": state.accumulated_reward,
"final_reward_breakdown": state.reward_history[-1] if state.reward_history else {},
"progress_reward_total": state.progress_reward_total,
"completion_tokens": state.completion_tokens,
"diagnosis_submitted": state.diagnosis_submitted,
"diagnosis": state.diagnosis,
"request_trace": state.request_trace,
"final_status": "resolved" if state.success else "failed",
"failure_reason": state.failure_reason,
"safety_violations": [
flag for flag in state.anti_cheat_flags if "network" in flag or "unsafe" in flag
],
"anti_cheat_flags": state.anti_cheat_flags,
"metrics": state.metrics,
}
with self.output_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(record, sort_keys=True) + "\n")
state.episode_artifact_path = str(self.output_path)
return self.output_path
def _verifier_layer(self, state: CyberSecurityOWASPState, key: str) -> Any:
return (state.verification_summary or {}).get(key)