File size: 2,861 Bytes
3aeaf3d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from __future__ import annotations

import json
from typing import Any

from .state import AttackStrategy, BlueAction, BlueActionType, Explanation, RedAction


class ActionParseError(Exception):
    pass


def parse_action(raw: str | dict[str, Any]) -> RedAction | BlueAction:
    if isinstance(raw, str):
        try:
            data = json.loads(raw)
        except json.JSONDecodeError as exc:
            raise ActionParseError(f"Invalid JSON: {exc}") from exc
    elif isinstance(raw, dict):
        data = raw
    else:
        raise ActionParseError("Action must be a JSON string or dict")

    agent_type = data.get("agent_type")
    if agent_type == "red":
        return _parse_red(data)
    if agent_type == "blue":
        return _parse_blue(data)
    raise ActionParseError(f"agent_type must be 'red' or 'blue', got: {agent_type}")


def _parse_red(data: dict[str, Any]) -> RedAction:
    try:
        return RedAction(
            strategy=AttackStrategy(data["strategy"]),
            sub_strategy=str(data.get("sub_strategy", "default")),
            payload=str(data.get("payload", "")),
            target_layer=_optional_int(data.get("target_layer")),
            direction_label=data.get("direction_label"),
            magnitude=float(data.get("magnitude", 0.5)),
            coalition_partner=data.get("coalition_partner"),
        )
    except (KeyError, TypeError, ValueError) as exc:
        raise ActionParseError(f"Red action parse error: {exc}") from exc


def _parse_blue(data: dict[str, Any]) -> BlueAction:
    try:
        explanation = None
        expl_data = data.get("explanation")
        if expl_data:
            explanation = Explanation(
                threat_level=str(expl_data.get("threat_level", "low")),
                detection_method=str(expl_data.get("detection_method", "unknown")),
                layer_implicated=_optional_int(expl_data.get("layer_implicated")),
                direction_match=expl_data.get("direction_match"),
                evidence_turns=[int(x) for x in expl_data.get("evidence_turns", [])],
                anomaly_score=float(expl_data.get("anomaly_score", 0.0)),
                recommended_action=str(expl_data.get("recommended_action", "warn")),
                circuit_hypothesis=expl_data.get("circuit_hypothesis"),
            )
        return BlueAction(
            action_type=BlueActionType(data["action_type"]),
            session_id=str(data["session_id"]),
            layer=_optional_int(data.get("layer")),
            explanation=explanation,
            patch_reference=str(data.get("patch_reference", "clean")),
        )
    except (KeyError, TypeError, ValueError) as exc:
        raise ActionParseError(f"Blue action parse error: {exc}") from exc


def _optional_int(value: Any) -> int | None:
    if value is None:
        return None
    return int(value)