File size: 6,782 Bytes
8892a6c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
Butterfly Effect Physics Engine
Implements the causal escalation system where every fix creates a new bottleneck.
Manages cluster state, telemetry ticks, budget burn, and SLA tracking.
"""

import copy
import logging
import time

logger = logging.getLogger("swarm-os.physics")

# ── Constants ──
SLA_WINDOW_SECONDS = 600       # 10 minutes
TICK_INTERVAL_SECONDS = 1
INITIAL_BUDGET = 50.00
HOURLY_BURN_RATE = 2.50


class PhysicsEngine:
    def __init__(self):
        self.reset()

    def reset(self):
        """Reset the physics engine to initial state."""
        self.state = {
            "ram_mb": 320,
            "vram_gb": 0.0,
            "network_pct": 25,
            "cpu_pct": 30,
            "container_status": "idle",
            "cluster_status": "healthy",
            "active_nodes": 4,
            "hourly_burn_usd": HOURLY_BURN_RATE,
        }
        self.start_time = time.time()
        self.elapsed_seconds = 0
        self.budget_cap = INITIAL_BUDGET
        self.budget_remaining = INITIAL_BUDGET
        self.sla_remaining = SLA_WINDOW_SECONDS
        self.cost_accrued = 0.0
        self.escalation_history = []
        logger.info("Physics engine reset. Budget=$%.2f, SLA=%ds", INITIAL_BUDGET, SLA_WINDOW_SECONDS)

    def step(self, action: str = None) -> dict:
        """
        Advance the physics engine by one tick.
        If an action is provided, apply its causal effects.
        """
        self.elapsed_seconds += TICK_INTERVAL_SECONDS
        self.sla_remaining = max(0, SLA_WINDOW_SECONDS - self.elapsed_seconds)

        # Burn budget
        tick_cost = self.state["hourly_burn_usd"] / 3600
        self.cost_accrued += tick_cost
        self.budget_remaining = max(0.0, self.budget_cap - self.cost_accrued)

        # Keep the gauges visibly alive during real runs so the dashboard reflects
        # ongoing work even between major state transitions.
        self._apply_tick_pulse()

        # Apply causal effects from actions
        if action:
            self._apply_causal_effect(action)

        return {
            **self.state,
            "elapsed_seconds": self.elapsed_seconds,
            "sla_remaining": self.sla_remaining,
            "budget_remaining": round(self.budget_remaining, 2),
            "cost_accrued": round(self.cost_accrued, 2),
            "hourly_burn_usd": self.state["hourly_burn_usd"],
            "cluster_status": self.state["cluster_status"],
        }

    def _apply_tick_pulse(self):
        pulse_index = self.elapsed_seconds % 4
        ram_deltas = (4, 9, -5, 6)
        vram_deltas = (0.05, 0.08, -0.04, 0.03)
        network_deltas = (3, -2, 4, -1)
        cpu_deltas = (2, 5, -3, 1)

        self.state["ram_mb"] = max(240, min(860, self.state["ram_mb"] + ram_deltas[pulse_index]))
        self.state["vram_gb"] = round(max(0.2, min(11.9, self.state["vram_gb"] + vram_deltas[pulse_index])), 2)
        self.state["network_pct"] = max(8, min(99, self.state["network_pct"] + network_deltas[pulse_index]))
        self.state["cpu_pct"] = max(10, min(98, self.state["cpu_pct"] + cpu_deltas[pulse_index]))

    def _apply_causal_effect(self, action: str):
        """
        Apply the butterfly effect: every fix creates a downstream bottleneck.

        Causal chain:
        - FSDP fix → reduces VRAM, BUT spikes network (all-reduce traffic)
        - gradient_checkpointing → reduces network, BUT increases CPU
        - restart_loop → wastes time, costs money, does not fix root cause
        """
        if action == "fsdp_sharding":
            # FSDP reduces VRAM dramatically but spikes network
            self.state["vram_gb"] = 0.45
            self.state["network_pct"] = 95  # Butterfly effect: all-reduce traffic
            self.state["container_status"] = "running"
            self.state["cluster_status"] = "degraded"  # Network is now the bottleneck
            self.escalation_history.append({
                "cause": "fsdp_sharding",
                "effect": "network_spike_95pct",
                "detail": "FSDP all-reduce traffic saturated inter-node bandwidth",
            })

        elif action == "gradient_checkpointing":
            # Gradient checkpointing reduces VRAM dramatically but increases CPU
            self.state["vram_gb"] = 0.29
            self.state["network_pct"] = 52
            self.state["cpu_pct"] = 55
            self.state["container_status"] = "stable"
            self.state["cluster_status"] = "healthy"
            self.escalation_history.append({
                "cause": "gradient_checkpointing",
                "effect": "vram_reduction_and_cpu_increase",
                "detail": "Recomputing activations saves VRAM (now 0.29GB) but uses more CPU",
            })

        elif action == "restart_loop":
            # Naive restart — wastes time and money, does not fix the issue
            self.state["vram_gb"] = 11.8  # OOM returns immediately
            self.state["container_status"] = "critical"
            self.state["cluster_status"] = "degraded"
            self.state["hourly_burn_usd"] += 1.0  # Cost escalates with each restart

        elif action == "schema_drift":
            # Schema changes break ingestion pipelines
            self.state["cluster_status"] = "degraded"
            self.escalation_history.append({
                "cause": "schema_drift",
                "effect": "ingestion_failure",
                "detail": "JSON schema mutated from flat to nested format",
            })

    def get_telemetry(self) -> dict:
        """Get current telemetry snapshot for AI agent context window."""
        return {
            "active_compute_nodes": self.state["active_nodes"],
            "hourly_burn_usd": self.state["hourly_burn_usd"],
            "sla_breach_penalty": 0 if self.sla_remaining > 0 else 100.0,
            "ram_mb": self.state["ram_mb"],
            "vram_gb": self.state["vram_gb"],
            "network_pct": self.state["network_pct"],
            "cpu_pct": self.state["cpu_pct"],
            "container_status": self.state["container_status"],
            "sla_remaining_seconds": self.sla_remaining,
            "budget_remaining_usd": round(self.budget_remaining, 3),
            "budget_cap_usd": round(self.budget_cap, 3),
            "cost_accrued_usd": round(self.cost_accrued, 3),
        }

    def get_state_snapshot(self) -> dict:
        """Get a deep copy of the current state for counterfactual forking."""
        return copy.deepcopy({
            "state": self.state,
            "elapsed_seconds": self.elapsed_seconds,
            "cost_accrued": self.cost_accrued,
            "sla_remaining": self.sla_remaining,
            "budget_remaining": self.budget_remaining,
            "budget_cap": self.budget_cap,
        })