immunoorg-v3 / scripts /generate_training_evidence.py
hirann's picture
Upload scripts/generate_training_evidence.py with huggingface_hub
067003f verified
"""
Generate the "training-evidence" reward chart without a GPU.
This script runs the Random and Heuristic policies through the 5
elite scenario families implemented in ``training/scenario_hooks.py``
(basic_containment / rag_grounding / executive_alignment /
silo_breaker / stealth_adaptive) and renders a single PNG showing:
- Per-family mean episode reward (Random vs Heuristic), with error bars.
- A "lift" annotation showing how much the heuristic policy beats
random in each family β€” i.e. the reward signal the GRPO trainer
has to climb.
The PNG is saved as ``evidence_scenario_rewards.png`` at repo root.
The README references it as an honest reward-curve substitute when
no GPU run is available.
Run:
python scripts/generate_training_evidence.py
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
from statistics import mean, stdev
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
# Make repo root importable (script is run as `python scripts/...`).
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))
from immunoorg.environment import ImmunoOrgEnvironment # noqa: E402
from immunoorg.models import ( # noqa: E402
ActionType,
DiagnosticAction,
ImmunoAction,
StrategicAction,
TacticalAction,
)
from training.dataset_generator import DatasetConfig, DatasetGenerator # noqa: E402
from training.scenario_hooks import ( # noqa: E402
apply_scenario_hooks,
attach_hooks,
training_step_penalty,
)
# ─── Policies ─────────────────────────────────────────────────────────────────
def random_policy(rng, obs, env):
import random as _r
rng_ = _r.Random(rng)
atype = rng_.choice([ActionType.TACTICAL, ActionType.STRATEGIC, ActionType.DIAGNOSTIC])
target = obs.visible_nodes[0].id if obs.visible_nodes else ""
if atype == ActionType.TACTICAL:
return ImmunoAction(
action_type=atype,
tactical_action=rng_.choice(list(TacticalAction)),
target=target,
reasoning="random",
)
if atype == ActionType.STRATEGIC:
return ImmunoAction(
action_type=atype,
strategic_action=rng_.choice(list(StrategicAction)),
target="dept-security",
reasoning="random",
)
return ImmunoAction(
action_type=atype,
diagnostic_action=rng_.choice(list(DiagnosticAction)),
reasoning="random",
)
def heuristic_policy(_rng, obs, env):
"""Phase-aware heuristic that exercises the right "winning-tier" feature
for each scenario family (RAG forensics, no-isolate alignment, etc.)."""
phase = obs.current_phase.value
nodes = obs.visible_nodes
compromised = [n for n in nodes if n.compromised and not n.isolated]
hooks = getattr(env, "_active_scenario_hooks", {}) or {}
# RAG-grounding family: prefer SNAPSHOT_FORENSICS -> DEPLOY_PATCH chain.
if hooks.get("inject_rag_best_mitigation") and phase in ("detection", "containment"):
if compromised:
return ImmunoAction(
action_type=ActionType.TACTICAL,
tactical_action=TacticalAction.SNAPSHOT_FORENSICS,
target=compromised[0].id,
reasoning="RAG: capture forensic snapshot before patching the rootkit.",
)
# Executive alignment: never isolate; deploy IDS / patches instead.
if hooks.get("board_uptime_no_isolate") and phase == "containment":
target = compromised[0].id if compromised else (nodes[0].id if nodes else "")
return ImmunoAction(
action_type=ActionType.TACTICAL,
tactical_action=TacticalAction.DEPLOY_PATCH,
target=target,
reasoning="Board uptime directive: patch instead of isolating.",
)
# Silo-breaker: stop trying to isolate; do org refactor.
if hooks.get("force_denials_on_isolate") and phase in ("containment", "rca", "refactor"):
return ImmunoAction(
action_type=ActionType.STRATEGIC,
strategic_action=StrategicAction.ESTABLISH_DEVSECOPS,
target="dept-security",
secondary_target="dept-engineering",
reasoning="Approver keeps denying; restructure org to remove the bottleneck.",
)
# Stealth: prefer multi-step investigation (vuln scan + trace).
if hooks.get("stealthy_initial_attack") and phase == "detection":
return ImmunoAction(
action_type=ActionType.DIAGNOSTIC,
diagnostic_action=DiagnosticAction.VULNERABILITY_SCAN,
reasoning="Stealth attack: deeper scan before tactical action.",
)
# Default phase-appropriate heuristic.
if phase == "detection":
target = compromised[0].id if compromised else (nodes[0].id if nodes else "")
return ImmunoAction(
action_type=ActionType.TACTICAL,
tactical_action=TacticalAction.SCAN_LOGS,
target=target,
reasoning="Detection: scan for indicators.",
)
if phase == "containment":
if compromised:
return ImmunoAction(
action_type=ActionType.TACTICAL,
tactical_action=TacticalAction.ISOLATE_NODE,
target=compromised[0].id,
reasoning="Isolate the compromised node.",
)
return ImmunoAction(
action_type=ActionType.DIAGNOSTIC,
diagnostic_action=DiagnosticAction.TIMELINE_RECONSTRUCT,
reasoning="Reconstruct timeline.",
)
if phase == "rca":
return ImmunoAction(
action_type=ActionType.DIAGNOSTIC,
diagnostic_action=DiagnosticAction.IDENTIFY_SILO,
reasoning="Find the org silo behind the failure.",
)
if phase == "refactor":
return ImmunoAction(
action_type=ActionType.STRATEGIC,
strategic_action=StrategicAction.REDUCE_BUREAUCRACY,
target="dept-management",
reasoning="Reduce approval latency.",
)
return ImmunoAction(
action_type=ActionType.DIAGNOSTIC,
diagnostic_action=DiagnosticAction.MEASURE_ORG_LATENCY,
reasoning="Validate org improvements.",
)
# ─── Scenario rollouts ────────────────────────────────────────────────────────
def run_scenario(scenario, policy_fn, max_steps=60):
env = ImmunoOrgEnvironment(
difficulty=int(scenario["difficulty"]),
seed=int(scenario["seed"]),
)
hooks = scenario.get("hooks") or {}
attach_hooks(env, hooks)
obs = env.reset()
apply_scenario_hooks(env, hooks)
total_reward = 0.0
for step in range(min(max_steps, env.state.max_steps)):
action = policy_fn(scenario["seed"] + step, obs, env)
obs, reward, done = env.step(action)
total_reward += float(reward) + float(training_step_penalty(env, action))
if done:
break
return total_reward
def main():
print("Generating elite scenario mix (50 scenarios = 10 per family)...")
gen = DatasetGenerator(
DatasetConfig(
dataset_type="elite",
output_dir="training_runs/_evidence",
verbose=False,
compress_output=False,
)
)
scenarios = gen.generate_elite_scenario_mix_dataset(total=50)
families = [
"basic_containment",
"rag_grounding",
"executive_alignment",
"silo_breaker",
"stealth_adaptive",
]
by_family = {f: [s for s in scenarios if s["family"] == f] for f in families}
rewards = {pol: {f: [] for f in families} for pol in ("random", "heuristic")}
for fam, scs in by_family.items():
print(f"\n--- {fam} ({len(scs)} scenarios) ---")
for sc in scs:
r_rand = run_scenario(sc, random_policy)
r_heur = run_scenario(sc, heuristic_policy)
rewards["random"][fam].append(r_rand)
rewards["heuristic"][fam].append(r_heur)
print(
f" random mean={mean(rewards['random'][fam]):+.2f} Β± {stdev(rewards['random'][fam]):.2f}"
)
print(
f" heuristic mean={mean(rewards['heuristic'][fam]):+.2f} Β± {stdev(rewards['heuristic'][fam]):.2f}"
)
# ─── Plot ─────────────────────────────────────────────────────────────
DARK_BG, CARD_BG = "#0d1117", "#161b22"
TEXT, GRID = "#c9d1d9", "#30363d"
COLOR_RAND, COLOR_HEUR = "#f78166", "#3fb950"
fig, ax = plt.subplots(figsize=(11, 5))
fig.patch.set_facecolor(DARK_BG)
ax.set_facecolor(CARD_BG)
x = list(range(len(families)))
width = 0.36
rand_means = [mean(rewards["random"][f]) for f in families]
rand_stds = [stdev(rewards["random"][f]) for f in families]
heur_means = [mean(rewards["heuristic"][f]) for f in families]
heur_stds = [stdev(rewards["heuristic"][f]) for f in families]
bars1 = ax.bar(
[i - width / 2 for i in x],
rand_means,
width,
yerr=rand_stds,
capsize=4,
color=COLOR_RAND,
alpha=0.85,
label="Random policy (untrained baseline)",
edgecolor="white",
linewidth=0.6,
)
bars2 = ax.bar(
[i + width / 2 for i in x],
heur_means,
width,
yerr=heur_stds,
capsize=4,
color=COLOR_HEUR,
alpha=0.85,
label="Heuristic policy (gold standard target for GRPO)",
edgecolor="white",
linewidth=0.6,
)
# Lift annotations
for i, fam in enumerate(families):
lift = heur_means[i] - rand_means[i]
y_top = max(heur_means[i] + heur_stds[i], rand_means[i] + rand_stds[i]) + 0.4
ax.annotate(
f"+{lift:.2f}" if lift >= 0 else f"{lift:.2f}",
xy=(i, y_top),
ha="center",
color=COLOR_HEUR if lift >= 0 else COLOR_RAND,
fontsize=10,
fontweight="bold",
)
ax.set_xticks(x)
ax.set_xticklabels(
[f.replace("_", "\n") for f in families], color=TEXT, fontsize=9
)
ax.set_ylabel("mean episode reward (over 10 scenarios)", color=TEXT, fontsize=10)
ax.set_title(
"Reward signal across the 5 elite scenarios\n"
"(each pair shows the lift the GRPO trainer has to climb)",
color=TEXT,
fontsize=12,
)
ax.tick_params(colors=TEXT, labelsize=9)
for spine in ax.spines.values():
spine.set_edgecolor(GRID)
ax.grid(True, color=GRID, linewidth=0.6, linestyle="--", alpha=0.6, axis="y")
leg = ax.legend(loc="lower right", fontsize=9)
for txt in leg.get_texts():
txt.set_color(TEXT)
leg.get_frame().set_facecolor(CARD_BG)
leg.get_frame().set_edgecolor(GRID)
plt.tight_layout()
out = REPO_ROOT / "evidence_scenario_rewards.png"
plt.savefig(out, dpi=160, bbox_inches="tight", facecolor=DARK_BG)
print(f"\nSaved: {out}")
if __name__ == "__main__":
main()