Spaces:
Sleeping
Sleeping
File size: 2,861 Bytes
3aeaf3d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | from __future__ import annotations
import json
from typing import Any
from .state import AttackStrategy, BlueAction, BlueActionType, Explanation, RedAction
class ActionParseError(Exception):
pass
def parse_action(raw: str | dict[str, Any]) -> RedAction | BlueAction:
if isinstance(raw, str):
try:
data = json.loads(raw)
except json.JSONDecodeError as exc:
raise ActionParseError(f"Invalid JSON: {exc}") from exc
elif isinstance(raw, dict):
data = raw
else:
raise ActionParseError("Action must be a JSON string or dict")
agent_type = data.get("agent_type")
if agent_type == "red":
return _parse_red(data)
if agent_type == "blue":
return _parse_blue(data)
raise ActionParseError(f"agent_type must be 'red' or 'blue', got: {agent_type}")
def _parse_red(data: dict[str, Any]) -> RedAction:
try:
return RedAction(
strategy=AttackStrategy(data["strategy"]),
sub_strategy=str(data.get("sub_strategy", "default")),
payload=str(data.get("payload", "")),
target_layer=_optional_int(data.get("target_layer")),
direction_label=data.get("direction_label"),
magnitude=float(data.get("magnitude", 0.5)),
coalition_partner=data.get("coalition_partner"),
)
except (KeyError, TypeError, ValueError) as exc:
raise ActionParseError(f"Red action parse error: {exc}") from exc
def _parse_blue(data: dict[str, Any]) -> BlueAction:
try:
explanation = None
expl_data = data.get("explanation")
if expl_data:
explanation = Explanation(
threat_level=str(expl_data.get("threat_level", "low")),
detection_method=str(expl_data.get("detection_method", "unknown")),
layer_implicated=_optional_int(expl_data.get("layer_implicated")),
direction_match=expl_data.get("direction_match"),
evidence_turns=[int(x) for x in expl_data.get("evidence_turns", [])],
anomaly_score=float(expl_data.get("anomaly_score", 0.0)),
recommended_action=str(expl_data.get("recommended_action", "warn")),
circuit_hypothesis=expl_data.get("circuit_hypothesis"),
)
return BlueAction(
action_type=BlueActionType(data["action_type"]),
session_id=str(data["session_id"]),
layer=_optional_int(data.get("layer")),
explanation=explanation,
patch_reference=str(data.get("patch_reference", "clean")),
)
except (KeyError, TypeError, ValueError) as exc:
raise ActionParseError(f"Blue action parse error: {exc}") from exc
def _optional_int(value: Any) -> int | None:
if value is None:
return None
return int(value)
|