| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| from __future__ import annotations |
| import asyncio |
| import numpy as np |
| from typing import Dict, Optional, Tuple |
| from dataclasses import dataclass |
| from pathlib import Path |
|
|
| from .anchors import CHAMBER_NAMES, COUPLING_MATRIX, CHAMBER_NAMES_EXTENDED, COUPLING_MATRIX_EXTENDED |
|
|
| |
| |
| DECAY_RATES = { |
| "FEAR": 0.90, |
| "LOVE": 0.93, |
| "RAGE": 0.85, |
| "VOID": 0.97, |
| "FLOW": 0.88, |
| "COMPLEX": 0.94, |
| } |
|
|
|
|
| def swish(x: np.ndarray) -> np.ndarray: |
| """Swish activation: x * sigmoid(x)""" |
| return x / (1.0 + np.exp(-x)) |
|
|
|
|
| def swish_deriv(x: np.ndarray) -> np.ndarray: |
| """Derivative of swish for backprop""" |
| sig = 1.0 / (1.0 + np.exp(-x)) |
| return sig + x * sig * (1 - sig) |
|
|
|
|
| @dataclass |
| class ChamberMLP: |
| """ |
| Single chamber MLP: 100→128→64→32→1 (deeper for 200K model) |
| |
| Takes 100D resonance vector, outputs single activation value. |
| |
| Params: |
| - W1: (100, 128) = 12,800 |
| - b1: (128,) = 128 |
| - W2: (128, 64) = 8,192 |
| - b2: (64,) = 64 |
| - W3: (64, 32) = 2,048 |
| - b3: (32,) = 32 |
| - W4: (32, 1) = 32 |
| - b4: (1,) = 1 |
| Total: ~23K params per chamber |
| """ |
|
|
| W1: np.ndarray |
| b1: np.ndarray |
| W2: np.ndarray |
| b2: np.ndarray |
| W3: np.ndarray |
| b3: np.ndarray |
| W4: np.ndarray |
| b4: np.ndarray |
|
|
| @classmethod |
| def random_init(cls, seed: Optional[int] = None) -> "ChamberMLP": |
| """Initialize with random weights (Xavier initialization).""" |
| if seed is not None: |
| np.random.seed(seed) |
|
|
| |
| W1 = np.random.randn(100, 128) * np.sqrt(2.0 / 100) |
| b1 = np.zeros(128) |
|
|
| W2 = np.random.randn(128, 64) * np.sqrt(2.0 / 128) |
| b2 = np.zeros(64) |
|
|
| W3 = np.random.randn(64, 32) * np.sqrt(2.0 / 64) |
| b3 = np.zeros(32) |
|
|
| W4 = np.random.randn(32, 1) * np.sqrt(2.0 / 32) |
| b4 = np.zeros(1) |
|
|
| return cls(W1=W1, b1=b1, W2=W2, b2=b2, W3=W3, b3=b3, W4=W4, b4=b4) |
|
|
| def forward(self, x: np.ndarray) -> float: |
| """ |
| Forward pass: 100D resonances → scalar activation. |
| |
| Args: |
| x: (100,) resonance vector |
| |
| Returns: |
| scalar activation [0, 1] |
| """ |
| |
| h1 = x @ self.W1 + self.b1 |
| a1 = swish(h1) |
|
|
| |
| h2 = a1 @ self.W2 + self.b2 |
| a2 = swish(h2) |
|
|
| |
| h3 = a2 @ self.W3 + self.b3 |
| a3 = swish(h3) |
|
|
| |
| h4 = a3 @ self.W4 + self.b4 |
|
|
| |
| activation = 1.0 / (1.0 + np.exp(-h4[0])) |
|
|
| return float(activation) |
|
|
| def param_count(self) -> int: |
| """Count total parameters in this MLP.""" |
| return ( |
| self.W1.size + self.b1.size + |
| self.W2.size + self.b2.size + |
| self.W3.size + self.b3.size + |
| self.W4.size + self.b4.size |
| ) |
|
|
| def save(self, path: Path) -> None: |
| """Save weights to .npz file.""" |
| np.savez( |
| path, |
| W1=self.W1, |
| b1=self.b1, |
| W2=self.W2, |
| b2=self.b2, |
| W3=self.W3, |
| b3=self.b3, |
| W4=self.W4, |
| b4=self.b4, |
| ) |
|
|
| @classmethod |
| def load(cls, path: Path) -> "ChamberMLP": |
| """Load weights from .npz file.""" |
| data = np.load(path) |
| |
| if "W4" in data: |
| return cls( |
| W1=data["W1"], |
| b1=data["b1"], |
| W2=data["W2"], |
| b2=data["b2"], |
| W3=data["W3"], |
| b3=data["b3"], |
| W4=data["W4"], |
| b4=data["b4"], |
| ) |
| else: |
| |
| print(f"[chambers] old format detected in {path}, reinitializing with 4-layer architecture") |
| return cls.random_init() |
|
|
|
|
| @dataclass |
| class CrossFireSystem: |
| """ |
| Six chambers with cross-fire stabilization (200K model). |
| |
| Chambers: |
| - FEAR, LOVE, RAGE, VOID (original) |
| - FLOW, COMPLEX (extended for richer emotion detection) |
| |
| Cross-fire loop: |
| 1. Each chamber computes activation from resonances |
| 2. Chambers influence each other via coupling matrix |
| 3. Iterate until convergence (max 10 iterations) |
| 4. Return final activations + iteration count |
| """ |
|
|
| fear: ChamberMLP |
| love: ChamberMLP |
| rage: ChamberMLP |
| void: ChamberMLP |
| flow: ChamberMLP |
| complex: ChamberMLP |
| coupling: np.ndarray |
|
|
| @classmethod |
| def random_init(cls, seed: Optional[int] = None) -> "CrossFireSystem": |
| """Initialize all chambers with random weights.""" |
| if seed is not None: |
| base_seed = seed |
| else: |
| base_seed = np.random.randint(0, 10000) |
|
|
| fear = ChamberMLP.random_init(seed=base_seed + 0) |
| love = ChamberMLP.random_init(seed=base_seed + 1) |
| rage = ChamberMLP.random_init(seed=base_seed + 2) |
| void = ChamberMLP.random_init(seed=base_seed + 3) |
| flow = ChamberMLP.random_init(seed=base_seed + 4) |
| complex_chamber = ChamberMLP.random_init(seed=base_seed + 5) |
|
|
| coupling = np.array(COUPLING_MATRIX_EXTENDED, dtype=np.float32) |
|
|
| return cls( |
| fear=fear, |
| love=love, |
| rage=rage, |
| void=void, |
| flow=flow, |
| complex=complex_chamber, |
| coupling=coupling, |
| ) |
|
|
| def stabilize( |
| self, |
| resonances: np.ndarray, |
| max_iter: int = 10, |
| threshold: float = 0.01, |
| momentum: float = 0.7, |
| ) -> Tuple[Dict[str, float], int]: |
| """ |
| Run cross-fire stabilization loop. |
| |
| Args: |
| resonances: (100,) initial resonance vector |
| max_iter: max iterations before forced stop |
| threshold: convergence threshold (sum of absolute changes) |
| momentum: blend factor (0.7 = 70% old, 30% new) |
| |
| Returns: |
| (chamber_activations, iterations_count) |
| |
| Example: |
| activations, iters = system.stabilize(resonances) |
| # → {"FEAR": 0.8, "LOVE": 0.2, ...}, 5 |
| """ |
| |
| chambers = [self.fear, self.love, self.rage, self.void, self.flow, self.complex] |
| activations = np.array([ |
| chamber.forward(resonances) |
| for chamber in chambers |
| ], dtype=np.float32) |
|
|
| |
| decay_array = np.array([ |
| DECAY_RATES["FEAR"], |
| DECAY_RATES["LOVE"], |
| DECAY_RATES["RAGE"], |
| DECAY_RATES["VOID"], |
| DECAY_RATES["FLOW"], |
| DECAY_RATES["COMPLEX"], |
| ], dtype=np.float32) |
|
|
| |
| for iteration in range(max_iter): |
| |
| activations = activations * decay_array |
|
|
| |
| influence = self.coupling @ activations |
|
|
| |
| new_activations = momentum * activations + (1 - momentum) * influence |
|
|
| |
| new_activations = np.clip(new_activations, 0.0, 1.0) |
|
|
| |
| delta = np.abs(new_activations - activations).sum() |
| activations = new_activations |
|
|
| if delta < threshold: |
| |
| result = dict(zip(CHAMBER_NAMES_EXTENDED, activations)) |
| return result, iteration + 1 |
|
|
| |
| result = dict(zip(CHAMBER_NAMES_EXTENDED, activations)) |
| return result, max_iter |
|
|
| def param_count(self) -> int: |
| """Total parameters in all chambers.""" |
| return sum([ |
| self.fear.param_count(), |
| self.love.param_count(), |
| self.rage.param_count(), |
| self.void.param_count(), |
| self.flow.param_count(), |
| self.complex.param_count(), |
| ]) |
|
|
| def save(self, models_dir: Path) -> None: |
| """Save all chamber weights to models/ directory.""" |
| models_dir.mkdir(parents=True, exist_ok=True) |
| self.fear.save(models_dir / "chamber_fear.npz") |
| self.love.save(models_dir / "chamber_love.npz") |
| self.rage.save(models_dir / "chamber_rage.npz") |
| self.void.save(models_dir / "chamber_void.npz") |
| self.flow.save(models_dir / "chamber_flow.npz") |
| self.complex.save(models_dir / "chamber_complex.npz") |
| print(f"[chambers] saved to {models_dir}") |
|
|
| @classmethod |
| def load(cls, models_dir: Path) -> "CrossFireSystem": |
| """Load all chamber weights from models/ directory.""" |
| fear = ChamberMLP.load(models_dir / "chamber_fear.npz") |
| love = ChamberMLP.load(models_dir / "chamber_love.npz") |
| rage = ChamberMLP.load(models_dir / "chamber_rage.npz") |
| void = ChamberMLP.load(models_dir / "chamber_void.npz") |
| |
| |
| flow_path = models_dir / "chamber_flow.npz" |
| complex_path = models_dir / "chamber_complex.npz" |
| |
| if flow_path.exists(): |
| flow = ChamberMLP.load(flow_path) |
| else: |
| print("[chambers] flow chamber not found, initializing random") |
| flow = ChamberMLP.random_init(seed=4) |
| |
| if complex_path.exists(): |
| complex_chamber = ChamberMLP.load(complex_path) |
| else: |
| print("[chambers] complex chamber not found, initializing random") |
| complex_chamber = ChamberMLP.random_init(seed=5) |
|
|
| coupling = np.array(COUPLING_MATRIX_EXTENDED, dtype=np.float32) |
|
|
| print(f"[chambers] loaded from {models_dir}") |
| return cls( |
| fear=fear, |
| love=love, |
| rage=rage, |
| void=void, |
| flow=flow, |
| complex=complex_chamber, |
| coupling=coupling, |
| ) |
|
|
|
|
| class AsyncCrossFireSystem: |
| """ |
| Async wrapper for CrossFireSystem with field lock discipline. |
| |
| Based on HAZE's async pattern - achieves coherence through |
| explicit operation ordering and atomicity. |
| |
| "The asyncio.Lock doesn't add information—it adds discipline." |
| """ |
| |
| def __init__(self, system: CrossFireSystem): |
| self._sync = system |
| self._lock = asyncio.Lock() |
| |
| @classmethod |
| def random_init(cls, seed: Optional[int] = None) -> "AsyncCrossFireSystem": |
| """Initialize with random weights.""" |
| system = CrossFireSystem.random_init(seed=seed) |
| return cls(system) |
| |
| @classmethod |
| def load(cls, models_dir: Path) -> "AsyncCrossFireSystem": |
| """Load from models directory.""" |
| system = CrossFireSystem.load(models_dir) |
| return cls(system) |
| |
| async def stabilize( |
| self, |
| resonances: np.ndarray, |
| max_iter: int = 10, |
| threshold: float = 0.01, |
| momentum: float = 0.7, |
| ) -> Tuple[Dict[str, float], int]: |
| """ |
| Async cross-fire stabilization with field lock. |
| |
| Atomic operation - prevents field corruption during stabilization. |
| """ |
| async with self._lock: |
| return self._sync.stabilize(resonances, max_iter, threshold, momentum) |
| |
| async def save(self, models_dir: Path) -> None: |
| """Save with lock protection.""" |
| async with self._lock: |
| self._sync.save(models_dir) |
| |
| def param_count(self) -> int: |
| """Total parameters (read-only, no lock needed).""" |
| return self._sync.param_count() |
| |
| @property |
| def coupling(self) -> np.ndarray: |
| """Access coupling matrix.""" |
| return self._sync.coupling |
| |
| @coupling.setter |
| def coupling(self, value: np.ndarray) -> None: |
| """Set coupling matrix (for feedback learning).""" |
| self._sync.coupling = value |
|
|
|
|
| if __name__ == "__main__": |
| print("=" * 60) |
| print(" CLOUD v4.0 — Chamber Cross-Fire System (200K model)") |
| print("=" * 60) |
| print() |
|
|
| |
| system = CrossFireSystem.random_init(seed=42) |
| print(f"Initialized cross-fire system (6 chambers)") |
| print(f"Total params: {system.param_count():,}") |
| print() |
|
|
| |
| print("Testing stabilization with random resonances:") |
| resonances = np.random.rand(100).astype(np.float32) |
| print(f" Input: 100D resonance vector (mean={resonances.mean():.3f})") |
| print() |
|
|
| activations, iterations = system.stabilize(resonances) |
|
|
| print(" Chamber activations after cross-fire:") |
| for chamber, value in activations.items(): |
| bar = "█" * int(value * 40) |
| print(f" {chamber:8s}: {value:.3f} {bar}") |
| print(f"\n Converged in {iterations} iterations") |
| print() |
|
|
| |
| print("Testing convergence speed:") |
| test_cases = [ |
| ("random uniform", np.random.rand(100)), |
| ("all high", np.ones(100) * 0.9), |
| ("all low", np.ones(100) * 0.1), |
| ("sparse", np.random.rand(100) * 0.1), |
| ] |
|
|
| for name, resonances in test_cases: |
| _, iters = system.stabilize(resonances) |
| print(f" {name:15s}: {iters:2d} iterations") |
| print() |
|
|
| |
| print("Testing save/load:") |
| models_dir = Path("./models") |
| system.save(models_dir) |
|
|
| system2 = CrossFireSystem.load(models_dir) |
| activations2, _ = system2.stabilize(test_cases[0][1]) |
|
|
| match = all( |
| abs(activations[k] - activations2[k]) < 1e-6 |
| for k in CHAMBER_NAMES_EXTENDED |
| ) |
| print(f" Save/load {'✓' if match else '✗'}") |
| print() |
|
|
| print("=" * 60) |
| print(" Cross-fire system operational. 6 chambers. 200K params.") |
| print("=" * 60) |
|
|