""" sre_patches.py — Surgical fixes for the 5 critical vulnerabilities found in SRE audit. These patches are applied at import time via purpose_agent.__init__. They fix the actual runtime behavior without rewriting entire modules. Fixes: 1. MemoryStore.retrieve() — snapshot dict before iteration (prevents RuntimeError) 2. Actor.decide() — reject UNKNOWN/empty actions (prevents garbage propagation) 3. Actor._build_system_prompt() — hard cap K=10 heuristics (prevents context overflow) 4. ExperienceReplay — threading.Lock on mutations (prevents data corruption in swarm) 5. Trajectory.cumulative_reward — guard against None scores (prevents TypeError crash) Import this module to apply all patches: import purpose_agent.sre_patches # auto-applied """ from __future__ import annotations import logging import threading from typing import Any logger = logging.getLogger("purpose_agent.sre") _applied = False def apply_all(): """Apply all SRE patches. Safe to call multiple times (idempotent).""" global _applied if _applied: return _applied = True _patch_memory_store_snapshot() _patch_actor_unknown_reject() _patch_actor_heuristic_cap() _patch_experience_replay_lock() _patch_trajectory_none_guard() logger.debug("SRE patches applied (5/5)") # ═══════════════════════════════════════════════════════════════ # Fix 1: MemoryStore.retrieve() — snapshot before iteration # ═══════════════════════════════════════════════════════════════ def _patch_memory_store_snapshot(): """Prevent RuntimeError: dictionary changed size during iteration.""" from purpose_agent.memory import MemoryStore original_retrieve = MemoryStore.retrieve def safe_retrieve(self, query_text="", scope=None, kinds=None, statuses=None, top_k=10): """Patched: iterates over snapshot of _cards, not live dict.""" from purpose_agent.memory import MemoryStatus import math statuses = statuses or [MemoryStatus.PROMOTED] candidates = [] query_emb = self._embed(query_text) if query_text else None # FIX: snapshot the values BEFORE iteration cards_snapshot = list(self._cards.values()) for card in cards_snapshot: if card.status not in statuses: continue if kinds and card.kind not in kinds: continue if scope and not card.scope.matches(scope): continue relevance = 0.5 if query_emb and card.embedding: relevance = self._cosine(query_emb, card.embedding) elif query_emb: card.embedding = self._embed(card.content or card.pattern) relevance = self._cosine(query_emb, card.embedding) score = 0.4 * relevance + 0.3 * card.trust_score + 0.3 * card.utility_score candidates.append((score, card)) candidates.sort(key=lambda x: -x[0]) return [c for _, c in candidates[:top_k]] MemoryStore.retrieve = safe_retrieve # ═══════════════════════════════════════════════════════════════ # Fix 2: Actor.decide() — reject UNKNOWN/empty actions # ═══════════════════════════════════════════════════════════════ def _patch_actor_unknown_reject(): """Prevent garbage UNKNOWN actions from propagating to environment.""" from purpose_agent.actor import Actor from purpose_agent.types import Action original_decide = Actor.decide def safe_decide(self, purpose, current_state, history=None): action = original_decide(self, purpose, current_state, history) # Reject UNKNOWN/empty — safe fallback to DONE if not action.name or action.name == "UNKNOWN": logger.warning("Actor produced UNKNOWN action — falling back to DONE") return Action( name="DONE", params={}, thought="[SRE] Failed to parse a valid action. Stopping safely.", expected_delta="", ) # Ensure params is always a dict (never None) if not isinstance(action.params, dict): action.params = {} return action Actor.decide = safe_decide # ═══════════════════════════════════════════════════════════════ # Fix 3: Actor heuristic cap — max K=10 in prompt # ═══════════════════════════════════════════════════════════════ def _patch_actor_heuristic_cap(): """Prevent context window overflow from unbounded heuristic injection.""" from purpose_agent.actor import Actor MAX_STRATEGIC = 5 # Max strategic heuristics in prompt MAX_PROCEDURAL = 5 # Max procedural SOPs in prompt original_format_strategic = Actor._format_strategic_memory def capped_format_strategic(self): if not self.strategic_memory: return "None yet — this is your first task." # Cap: only top K by Q-value top = sorted(self.strategic_memory, key=lambda x: -x.q_value)[:MAX_STRATEGIC] lines = [] for h in top: lines.append(f"- When: {h.pattern}\n Do: {h.strategy}") if len(self.strategic_memory) > MAX_STRATEGIC: lines.append(f" ({len(self.strategic_memory) - MAX_STRATEGIC} more available)") return "\n".join(lines) original_format_procedural = Actor._format_procedural_memory def capped_format_procedural(self): if not self.procedural_memory: return "No procedures available." top = sorted(self.procedural_memory, key=lambda x: -x.q_value)[:MAX_PROCEDURAL] lines = ["Available procedures:"] for h in top: lines.append(f"- {h.pattern}: {h.strategy[:80]}") return "\n".join(lines) Actor._format_strategic_memory = capped_format_strategic Actor._format_procedural_memory = capped_format_procedural # ═══════════════════════════════════════════════════════════════ # Fix 4: ExperienceReplay — fine-grained threading lock # ═══════════════════════════════════════════════════════════════ def _patch_experience_replay_lock(): """Add thread lock to ExperienceReplay mutations for swarm() safety.""" from purpose_agent.experience_replay import ExperienceReplay # Add a lock to all instances _lock = threading.Lock() original_add = ExperienceReplay.add def locked_add(self, trajectory): with _lock: return original_add(self, trajectory) original_update_q = ExperienceReplay.update_q_value def locked_update_q(self, record_id, reward, alpha=0.1): with _lock: return original_update_q(self, record_id, reward, alpha) ExperienceReplay.add = locked_add ExperienceReplay.update_q_value = locked_update_q # ═══════════════════════════════════════════════════════════════ # Fix 5: Trajectory — guard against None scores # ═══════════════════════════════════════════════════════════════ def _patch_trajectory_none_guard(): """Prevent TypeError when score is None in trajectory calculations.""" from purpose_agent.types import Trajectory @property def safe_cumulative_reward(self) -> float: """Sum of positive deltas, guarding against None scores.""" total = 0.0 for s in self.steps: if s.score is not None and s.score.delta is not None and s.score.delta > 0: total += s.score.delta return total @property def safe_total_delta(self) -> float: """Net improvement, guarding against None scores.""" total = 0.0 for s in self.steps: if s.score is not None and s.score.delta is not None: total += s.score.delta return total @property def safe_success_rate(self) -> float: """Fraction of steps that improved, guarding against None.""" scored = [s for s in self.steps if s.score is not None and s.score.delta is not None] if not scored: return 0.0 return sum(1 for s in scored if s.score.improved) / len(scored) @property def safe_final_phi(self) -> float | None: """Final Φ, guarding against None.""" scored = [s for s in self.steps if s.score is not None] if not scored: return None return scored[-1].score.phi_after # Replace the properties Trajectory.cumulative_reward = safe_cumulative_reward Trajectory.total_delta = safe_total_delta Trajectory.success_rate = safe_success_rate Trajectory.final_phi = safe_final_phi # Auto-apply on import apply_all()