import glob import json import os import random import time from collections import deque from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Counter, Tuple, Deque from bone_types import Prisma, RealityLayer, ErrorLog, DecisionTrace, DecisionCrystal class BoneJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, set): return list(obj) if isinstance(obj, deque): return list(obj) if hasattr(obj, "to_dict"): return obj.to_dict() if hasattr(obj, "__dict__"): return obj.__dict__ return super().default(obj) class EventBus: def __init__(self, max_memory=1024): self.buffer = deque(maxlen=max_memory) self.subscribers = {} def subscribe(self, event_type, callback): if event_type not in self.subscribers: self.subscribers[event_type] = [] self.subscribers[event_type].append(callback) def publish(self, event_type, data=None): if event_type not in self.subscribers: return for callback in list(self.subscribers[event_type]): try: callback(data) except Exception as e: cb_name = getattr(callback, "__name__", str(callback)) error_msg = f"Error in '{cb_name}' for '{event_type}': {e}" print(f"{Prisma.RED}[BUS]: {error_msg}{Prisma.RST}") self.log(f"EVENT_FAILURE: {error_msg}", category="CRIT") def log(self, text: str, category: str = "SYSTEM"): entry = {"text": text, "category": category, "timestamp": time.time()} self.buffer.append(entry) def flush(self) -> List[Dict]: current_logs = list(self.buffer) self.buffer.clear() return current_logs def get_recent_logs(self, count=10): return list(self.buffer)[-count:] class LoreManifest: DATA_DIR = "lore" _instance = None def __init__(self, data_dir=None): self.DATA_DIR = data_dir or self.DATA_DIR self._cache = {} @classmethod def get_instance(cls): if cls._instance is None: cls._instance = LoreManifest() return cls._instance def get(self, category: str, sub_key: str = None) -> Any: if category not in self._cache: data = self._load_from_disk(category) self._cache[category] = data if data is not None else {} data = self._cache[category] if sub_key and isinstance(data, dict): return data.get(sub_key) return data def _load_from_disk(self, category: str) -> Optional[Dict]: filename = f"{category.lower()}.json" filepath = os.path.join(self.DATA_DIR, filename) if not os.path.exists(filepath): return None try: with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) print(f"{Prisma.GRY}[LORE]: Lazy-loaded '{category}'.{Prisma.RST}") return data except Exception as e: print(f"{Prisma.RED}[LORE]: Corrupt JSON in '{category}': {e}{Prisma.RST}") return None def inject(self, category: str, data: Any): if category not in self._cache: self._cache[category] = {} if isinstance(self._cache[category], dict) and isinstance(data, dict): self._cache[category].update(data) else: self._cache[category] = data def flush_cache(self, category: str = None): if category: if category in self._cache: del self._cache[category] print(f"{Prisma.CYN}[LORE]: Flushed '{category}'.{Prisma.RST}") else: print(f"{Prisma.GRY}[LORE]: Category '{category}' not in cache.{Prisma.RST}") else: self._cache = {} print(f"{Prisma.CYN}[LORE]: Flushed Lore cache.{Prisma.RST}") class TheObserver: def __init__(self): self.start_time = time.time() self.cycle_times = deque(maxlen=20) self.llm_latencies = deque(maxlen=20) self.memory_snapshots = deque(maxlen=20) self.error_counts = Counter() self.user_turns = 0 self.LATENCY_WARNING = 5.0 self.CYCLE_WARNING = 8.0 self.last_cycle_duration = 0.0 @staticmethod def clock_in(): return time.time() def clock_out(self, start_time, metric_type="cycle"): duration = time.time() - start_time if metric_type == "cycle": self.cycle_times.append(duration) self.last_cycle_duration = duration elif metric_type == "llm": self.llm_latencies.append(duration) return duration @property def uptime(self) -> float: return time.time() - self.start_time def calculate_efficiency(self, health: float, stamina: float) -> float: duration = max(0.01, self.last_cycle_duration) resource_sum = health + stamina return resource_sum / duration def log_error(self, module_name): self.error_counts[module_name] += 1 def record_memory(self, node_count): self.memory_snapshots.append(node_count) def pass_judgment(self, avg_cycle, avg_llm): if avg_cycle == 0.0 and avg_llm == 0.0: return "ASLEEP (WAKE UP)" if avg_cycle < 0.1 and avg_llm < 0.5: return "SUSPICIOUSLY EFFICIENT (Did we skip the math?)" if avg_llm > self.LATENCY_WARNING: jokes = [ "BRAIN FOG (The neural net is buffering)", "DEGRADED (Thinking... thinking...)", "PONDEROUS (Is the LLM on a coffee break?)", ] return random.choice(jokes) if avg_cycle > self.CYCLE_WARNING: return "SLUGGISH (The gears need oil)" return "NOMINAL (Boringly adequate)" def get_report(self): avg_cycle = sum(self.cycle_times) / max(1, len(self.cycle_times)) avg_llm = sum(self.llm_latencies) / max(1, len(self.llm_latencies)) uptime = time.time() - self.start_time status_msg = self.pass_judgment(avg_cycle, avg_llm) return { "uptime_sec": int(uptime), "turns": self.user_turns, "avg_cycle_sec": round(avg_cycle, 2), "avg_llm_sec": round(avg_llm, 2), "status": status_msg, "errors": dict(self.error_counts), "graph_size": self.memory_snapshots[-1] if self.memory_snapshots else 0, } @dataclass class SystemHealth: physics_online: bool = True bio_online: bool = True mind_online: bool = True cortex_online: bool = True errors: List[ErrorLog] = field(default_factory=list) warnings: List[str] = field(default_factory=list) hints: List[str] = field(default_factory=list) observer: Optional["TheObserver"] = None def link_observer(self, observer_ref): self.observer = observer_ref def report_failure(self, component: str, error: Exception, severity="ERROR"): msg = str(error) self.errors.append(ErrorLog(component, msg, severity=severity)) if self.observer: self.observer.log_error(component) attr_name = f"{component.lower()}_online" if hasattr(self, attr_name): setattr(self, attr_name, False) return f"[{component} OFFLINE]: {msg}" def report_warning(self, message: str): self.warnings.append(message) def report_hint(self, message: str): self.hints.append(message) def flush_feedback(self) -> Dict[str, List[str]]: feedback = {"warnings": list(self.warnings), "hints": list(self.hints)} self.warnings.clear() self.hints.clear() return feedback class RealityStack: def __init__(self): self._stack = [RealityLayer.SIMULATION] self._lock = False @property def current_depth(self) -> int: return self._stack[-1] def push_layer(self, layer: int, _context: Any = None) -> bool: if layer == self.current_depth: return True if layer == RealityLayer.DEBUG or layer == self.current_depth + 1: self._stack.append(layer) return True return False def pop_layer(self) -> int: if self._lock: return self.current_depth if len(self._stack) > 1: return self._stack.pop() return self._stack[0] def stabilize_at(self, layer: int): self._stack = [layer] def get_grammar_rules(self) -> Dict[str, bool]: depth = self.current_depth return { "allow_narrative": depth in [RealityLayer.SIMULATION, RealityLayer.DEEP_CX, RealityLayer.DEBUG], "allow_commands": depth >= RealityLayer.SIMULATION, "allow_meta": depth >= RealityLayer.DEBUG, "raw_output": depth == RealityLayer.DEEP_CX, "system_override": depth == RealityLayer.DEBUG, } class ArchetypeArbiter: @staticmethod def arbitrate( physics_lens: str, soul_archetype: str, council_mandates: List[Dict], trigram: Dict = None, ) -> Tuple[str, str, str]: for mandate in council_mandates: if mandate.get("type") == "LOCKDOWN": return ( "THE CENSOR", "COUNCIL", "Martial Law declared. Identity suppressed.", ) if mandate.get("type") == "FORCE_MODE": return "THE MACHINE", "COUNCIL", "Bureaucratic override active." if "/" in soul_archetype: return ( soul_archetype, "SOUL", f"The Diamond Soul refracts the physics ({soul_archetype}).", ) if trigram: trigram_name = trigram.get("name") mythos = LoreManifest.get_instance().get("MYTHOS") or {} rules = mythos.get("trigram_resonance", []) for rule in rules: if rule.get("trigram") == trigram_name: required_lens = rule.get("lens") required_soul = rule.get("soul") match_lens = ( (required_lens == physics_lens) if required_lens else True ) match_soul = ( (required_soul == soul_archetype) if required_soul else True ) if match_lens and match_soul: return ( rule["result"], rule.get("source", "COSMIC"), rule.get("msg", "Resonance detected."), ) if physics_lens in ["THE MANIC", "THE VOID"]: return ( physics_lens, "PHYSICS", f"Environment is too loud. You are {physics_lens}.", ) return soul_archetype, "SOUL", "The Soul guides the lens." class TelemetryService: log_dir = "logs/telemetry" _tracer_instance = None BUFFER_SIZE = 50 def __init__(self): self.trace_buffer: Deque[DecisionTrace] = deque(maxlen=50) self.write_buffer: List[str] = [] self.active_crystal = None self.disabled = False self.write_errors = 0 try: os.makedirs(self.log_dir, exist_ok=True) self.current_trace_file = os.path.join( self.log_dir, f"trace_{int(time.time())}.jsonl" ) except OSError: print( f"{Prisma.RED}[TELEMETRY]: Disk Access Denied. Telemetry Disabled.{Prisma.RST}" ) self.disabled = True self.current_trace_file = None @classmethod def get_instance(cls): if cls._tracer_instance is None: cls._tracer_instance = TelemetryService() return cls._tracer_instance def start_cycle(self, trace_id: str): if self.disabled: return self.active_crystal = DecisionCrystal(decision_id=trace_id) def log_decision( self, component: str, decision_type: str, inputs: Any, reasoning: str, outcome: str, ): if self.disabled or not self.active_crystal: return trace = DecisionTrace( trace_id=self.active_crystal.decision_id, timestamp=time.time(), component=component, decision_type=decision_type, inputs=inputs if isinstance(inputs, dict) else {"raw": str(inputs)}, reasoning=reasoning, outcome=outcome, ) self.trace_buffer.append(trace) self._buffer_line(trace.to_json()) def log_crystal(self, crystal: DecisionCrystal): if self.disabled: return self._buffer_line(crystal.crystallize()) def start_phase(self, phase_name: str, _context: Any): self.log_decision( phase_name, "PHASE_START", {"timestamp": time.time()}, "Phase execution initiated.", "RUNNING", ) def end_phase(self, phase_name: str, _ctx_before: Any, _ctx_after: Any): self.log_decision( phase_name, "PHASE_END", {"timestamp": time.time()}, "Phase execution completed.", "SUCCESS", ) def finalize_cycle(self): if self.active_crystal: self.log_crystal(self.active_crystal) self.active_crystal = None self.flush_to_disk() def _buffer_line(self, json_str: str): if self.disabled: return self.write_buffer.append(json_str) if len(self.write_buffer) >= self.BUFFER_SIZE: self.flush_to_disk() def flush_to_disk(self): if self.disabled or not self.current_trace_file or not self.write_buffer: return try: with open(self.current_trace_file, "a", encoding="utf-8") as f: f.write("\n".join(self.write_buffer) + "\n") self.write_buffer.clear() self.write_errors = 0 except IOError as e: self.write_errors += 1 if self.write_errors >= 5: print(f"{Prisma.RED}[TELEMETRY]: Critical write failure threshold reached. Telemetry disabled. {e}{Prisma.RST}") self.disabled = True self.write_buffer.clear() else: keep_count = self.BUFFER_SIZE // 2 self.write_buffer = self.write_buffer[-keep_count:] print(f"{Prisma.RED}[TELEMETRY]: Write error ({self.write_errors}). Retrying later. {e}{Prisma.RST}") def read_recent_history(self, limit=4) -> List[str]: if not os.path.exists(self.log_dir): return [] pattern = os.path.join(self.log_dir, "trace_*.jsonl") files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True) history = [] for fpath in files: if len(history) >= limit: break try: with open(fpath, "r", encoding="utf-8") as f: lines = deque(f, maxlen=limit * 2) for line in reversed(lines): if len(history) >= limit: break try: data = json.loads(line) if ( data.get("_type") == "CRYSTAL" or "final_response" in data ): resp = data.get("final_response", "") if not resp: continue prompt = data.get("prompt_snapshot", "") user_text = "Unknown" if "User:" in prompt: parts = prompt.split("User:") if len(parts) > 1: user_text = parts[1].split("\n")[0].strip() entry = f"User: {user_text} | System: {resp}" history.insert(0, entry) except json.JSONDecodeError: continue except Exception: continue return history[-limit:] def get_last_thoughts(self, limit=3) -> List[str]: history = self.read_recent_history(limit) return [h.split("System: ")[-1] for h in history if "System: " in h] def get_last_fatal_error(self) -> Optional[str]: pattern = os.path.join(self.log_dir, "trace_*.jsonl") files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True) if len(files) < 2: return None prev_file = files[1] try: with open(prev_file, "r") as f: lines = f.readlines() if not lines: return None last_line = json.loads(lines[-1]) if "outcome" in last_line and "CRITICAL" in str(last_line["outcome"]): return f"PREVIOUS SYSTEM CRASH: {last_line.get('reasoning', 'Unknown')}" except Exception: return None def generate_session_summary(self, _uptime: float = 0.0) -> str: self.flush_to_disk() count = len(self.trace_buffer) status = "DISABLED" if self.disabled else "ACTIVE" return ( f"\n[TELEMETRY] Session ended ({status}). {count} crystals crystallized.\n" f" Trace: {self.current_trace_file}" )