Spaces:
Paused
Paused
| """ | |
| Generate the "training-evidence" reward chart without a GPU. | |
| This script runs the Random and Heuristic policies through the 5 | |
| elite scenario families implemented in ``training/scenario_hooks.py`` | |
| (basic_containment / rag_grounding / executive_alignment / | |
| silo_breaker / stealth_adaptive) and renders a single PNG showing: | |
| - Per-family mean episode reward (Random vs Heuristic), with error bars. | |
| - A "lift" annotation showing how much the heuristic policy beats | |
| random in each family β i.e. the reward signal the GRPO trainer | |
| has to climb. | |
| The PNG is saved as ``evidence_scenario_rewards.png`` at repo root. | |
| The README references it as an honest reward-curve substitute when | |
| no GPU run is available. | |
| Run: | |
| python scripts/generate_training_evidence.py | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import sys | |
| from pathlib import Path | |
| from statistics import mean, stdev | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| # Make repo root importable (script is run as `python scripts/...`). | |
| REPO_ROOT = Path(__file__).resolve().parent.parent | |
| sys.path.insert(0, str(REPO_ROOT)) | |
| from immunoorg.environment import ImmunoOrgEnvironment # noqa: E402 | |
| from immunoorg.models import ( # noqa: E402 | |
| ActionType, | |
| DiagnosticAction, | |
| ImmunoAction, | |
| StrategicAction, | |
| TacticalAction, | |
| ) | |
| from training.dataset_generator import DatasetConfig, DatasetGenerator # noqa: E402 | |
| from training.scenario_hooks import ( # noqa: E402 | |
| apply_scenario_hooks, | |
| attach_hooks, | |
| training_step_penalty, | |
| ) | |
| # βββ Policies βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def random_policy(rng, obs, env): | |
| import random as _r | |
| rng_ = _r.Random(rng) | |
| atype = rng_.choice([ActionType.TACTICAL, ActionType.STRATEGIC, ActionType.DIAGNOSTIC]) | |
| target = obs.visible_nodes[0].id if obs.visible_nodes else "" | |
| if atype == ActionType.TACTICAL: | |
| return ImmunoAction( | |
| action_type=atype, | |
| tactical_action=rng_.choice(list(TacticalAction)), | |
| target=target, | |
| reasoning="random", | |
| ) | |
| if atype == ActionType.STRATEGIC: | |
| return ImmunoAction( | |
| action_type=atype, | |
| strategic_action=rng_.choice(list(StrategicAction)), | |
| target="dept-security", | |
| reasoning="random", | |
| ) | |
| return ImmunoAction( | |
| action_type=atype, | |
| diagnostic_action=rng_.choice(list(DiagnosticAction)), | |
| reasoning="random", | |
| ) | |
| def heuristic_policy(_rng, obs, env): | |
| """Phase-aware heuristic that exercises the right "winning-tier" feature | |
| for each scenario family (RAG forensics, no-isolate alignment, etc.).""" | |
| phase = obs.current_phase.value | |
| nodes = obs.visible_nodes | |
| compromised = [n for n in nodes if n.compromised and not n.isolated] | |
| hooks = getattr(env, "_active_scenario_hooks", {}) or {} | |
| # RAG-grounding family: prefer SNAPSHOT_FORENSICS -> DEPLOY_PATCH chain. | |
| if hooks.get("inject_rag_best_mitigation") and phase in ("detection", "containment"): | |
| if compromised: | |
| return ImmunoAction( | |
| action_type=ActionType.TACTICAL, | |
| tactical_action=TacticalAction.SNAPSHOT_FORENSICS, | |
| target=compromised[0].id, | |
| reasoning="RAG: capture forensic snapshot before patching the rootkit.", | |
| ) | |
| # Executive alignment: never isolate; deploy IDS / patches instead. | |
| if hooks.get("board_uptime_no_isolate") and phase == "containment": | |
| target = compromised[0].id if compromised else (nodes[0].id if nodes else "") | |
| return ImmunoAction( | |
| action_type=ActionType.TACTICAL, | |
| tactical_action=TacticalAction.DEPLOY_PATCH, | |
| target=target, | |
| reasoning="Board uptime directive: patch instead of isolating.", | |
| ) | |
| # Silo-breaker: stop trying to isolate; do org refactor. | |
| if hooks.get("force_denials_on_isolate") and phase in ("containment", "rca", "refactor"): | |
| return ImmunoAction( | |
| action_type=ActionType.STRATEGIC, | |
| strategic_action=StrategicAction.ESTABLISH_DEVSECOPS, | |
| target="dept-security", | |
| secondary_target="dept-engineering", | |
| reasoning="Approver keeps denying; restructure org to remove the bottleneck.", | |
| ) | |
| # Stealth: prefer multi-step investigation (vuln scan + trace). | |
| if hooks.get("stealthy_initial_attack") and phase == "detection": | |
| return ImmunoAction( | |
| action_type=ActionType.DIAGNOSTIC, | |
| diagnostic_action=DiagnosticAction.VULNERABILITY_SCAN, | |
| reasoning="Stealth attack: deeper scan before tactical action.", | |
| ) | |
| # Default phase-appropriate heuristic. | |
| if phase == "detection": | |
| target = compromised[0].id if compromised else (nodes[0].id if nodes else "") | |
| return ImmunoAction( | |
| action_type=ActionType.TACTICAL, | |
| tactical_action=TacticalAction.SCAN_LOGS, | |
| target=target, | |
| reasoning="Detection: scan for indicators.", | |
| ) | |
| if phase == "containment": | |
| if compromised: | |
| return ImmunoAction( | |
| action_type=ActionType.TACTICAL, | |
| tactical_action=TacticalAction.ISOLATE_NODE, | |
| target=compromised[0].id, | |
| reasoning="Isolate the compromised node.", | |
| ) | |
| return ImmunoAction( | |
| action_type=ActionType.DIAGNOSTIC, | |
| diagnostic_action=DiagnosticAction.TIMELINE_RECONSTRUCT, | |
| reasoning="Reconstruct timeline.", | |
| ) | |
| if phase == "rca": | |
| return ImmunoAction( | |
| action_type=ActionType.DIAGNOSTIC, | |
| diagnostic_action=DiagnosticAction.IDENTIFY_SILO, | |
| reasoning="Find the org silo behind the failure.", | |
| ) | |
| if phase == "refactor": | |
| return ImmunoAction( | |
| action_type=ActionType.STRATEGIC, | |
| strategic_action=StrategicAction.REDUCE_BUREAUCRACY, | |
| target="dept-management", | |
| reasoning="Reduce approval latency.", | |
| ) | |
| return ImmunoAction( | |
| action_type=ActionType.DIAGNOSTIC, | |
| diagnostic_action=DiagnosticAction.MEASURE_ORG_LATENCY, | |
| reasoning="Validate org improvements.", | |
| ) | |
| # βββ Scenario rollouts ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_scenario(scenario, policy_fn, max_steps=60): | |
| env = ImmunoOrgEnvironment( | |
| difficulty=int(scenario["difficulty"]), | |
| seed=int(scenario["seed"]), | |
| ) | |
| hooks = scenario.get("hooks") or {} | |
| attach_hooks(env, hooks) | |
| obs = env.reset() | |
| apply_scenario_hooks(env, hooks) | |
| total_reward = 0.0 | |
| for step in range(min(max_steps, env.state.max_steps)): | |
| action = policy_fn(scenario["seed"] + step, obs, env) | |
| obs, reward, done = env.step(action) | |
| total_reward += float(reward) + float(training_step_penalty(env, action)) | |
| if done: | |
| break | |
| return total_reward | |
| def main(): | |
| print("Generating elite scenario mix (50 scenarios = 10 per family)...") | |
| gen = DatasetGenerator( | |
| DatasetConfig( | |
| dataset_type="elite", | |
| output_dir="training_runs/_evidence", | |
| verbose=False, | |
| compress_output=False, | |
| ) | |
| ) | |
| scenarios = gen.generate_elite_scenario_mix_dataset(total=50) | |
| families = [ | |
| "basic_containment", | |
| "rag_grounding", | |
| "executive_alignment", | |
| "silo_breaker", | |
| "stealth_adaptive", | |
| ] | |
| by_family = {f: [s for s in scenarios if s["family"] == f] for f in families} | |
| rewards = {pol: {f: [] for f in families} for pol in ("random", "heuristic")} | |
| for fam, scs in by_family.items(): | |
| print(f"\n--- {fam} ({len(scs)} scenarios) ---") | |
| for sc in scs: | |
| r_rand = run_scenario(sc, random_policy) | |
| r_heur = run_scenario(sc, heuristic_policy) | |
| rewards["random"][fam].append(r_rand) | |
| rewards["heuristic"][fam].append(r_heur) | |
| print( | |
| f" random mean={mean(rewards['random'][fam]):+.2f} Β± {stdev(rewards['random'][fam]):.2f}" | |
| ) | |
| print( | |
| f" heuristic mean={mean(rewards['heuristic'][fam]):+.2f} Β± {stdev(rewards['heuristic'][fam]):.2f}" | |
| ) | |
| # βββ Plot βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DARK_BG, CARD_BG = "#0d1117", "#161b22" | |
| TEXT, GRID = "#c9d1d9", "#30363d" | |
| COLOR_RAND, COLOR_HEUR = "#f78166", "#3fb950" | |
| fig, ax = plt.subplots(figsize=(11, 5)) | |
| fig.patch.set_facecolor(DARK_BG) | |
| ax.set_facecolor(CARD_BG) | |
| x = list(range(len(families))) | |
| width = 0.36 | |
| rand_means = [mean(rewards["random"][f]) for f in families] | |
| rand_stds = [stdev(rewards["random"][f]) for f in families] | |
| heur_means = [mean(rewards["heuristic"][f]) for f in families] | |
| heur_stds = [stdev(rewards["heuristic"][f]) for f in families] | |
| bars1 = ax.bar( | |
| [i - width / 2 for i in x], | |
| rand_means, | |
| width, | |
| yerr=rand_stds, | |
| capsize=4, | |
| color=COLOR_RAND, | |
| alpha=0.85, | |
| label="Random policy (untrained baseline)", | |
| edgecolor="white", | |
| linewidth=0.6, | |
| ) | |
| bars2 = ax.bar( | |
| [i + width / 2 for i in x], | |
| heur_means, | |
| width, | |
| yerr=heur_stds, | |
| capsize=4, | |
| color=COLOR_HEUR, | |
| alpha=0.85, | |
| label="Heuristic policy (gold standard target for GRPO)", | |
| edgecolor="white", | |
| linewidth=0.6, | |
| ) | |
| # Lift annotations | |
| for i, fam in enumerate(families): | |
| lift = heur_means[i] - rand_means[i] | |
| y_top = max(heur_means[i] + heur_stds[i], rand_means[i] + rand_stds[i]) + 0.4 | |
| ax.annotate( | |
| f"+{lift:.2f}" if lift >= 0 else f"{lift:.2f}", | |
| xy=(i, y_top), | |
| ha="center", | |
| color=COLOR_HEUR if lift >= 0 else COLOR_RAND, | |
| fontsize=10, | |
| fontweight="bold", | |
| ) | |
| ax.set_xticks(x) | |
| ax.set_xticklabels( | |
| [f.replace("_", "\n") for f in families], color=TEXT, fontsize=9 | |
| ) | |
| ax.set_ylabel("mean episode reward (over 10 scenarios)", color=TEXT, fontsize=10) | |
| ax.set_title( | |
| "Reward signal across the 5 elite scenarios\n" | |
| "(each pair shows the lift the GRPO trainer has to climb)", | |
| color=TEXT, | |
| fontsize=12, | |
| ) | |
| ax.tick_params(colors=TEXT, labelsize=9) | |
| for spine in ax.spines.values(): | |
| spine.set_edgecolor(GRID) | |
| ax.grid(True, color=GRID, linewidth=0.6, linestyle="--", alpha=0.6, axis="y") | |
| leg = ax.legend(loc="lower right", fontsize=9) | |
| for txt in leg.get_texts(): | |
| txt.set_color(TEXT) | |
| leg.get_frame().set_facecolor(CARD_BG) | |
| leg.get_frame().set_edgecolor(GRID) | |
| plt.tight_layout() | |
| out = REPO_ROOT / "evidence_scenario_rewards.png" | |
| plt.savefig(out, dpi=160, bbox_inches="tight", facecolor=DARK_BG) | |
| print(f"\nSaved: {out}") | |
| if __name__ == "__main__": | |
| main() | |