purpose-agent / purpose_agent /sre_patches.py
Rohan03's picture
SRE fixes: 5 critical vulnerability patches (dict snapshot, UNKNOWN reject, token cap, fine-grained lock, None guard)
2bec614 verified
"""
sre_patches.py β€” Surgical fixes for the 5 critical vulnerabilities found in SRE audit.
These patches are applied at import time via purpose_agent.__init__.
They fix the actual runtime behavior without rewriting entire modules.
Fixes:
1. MemoryStore.retrieve() β€” snapshot dict before iteration (prevents RuntimeError)
2. Actor.decide() β€” reject UNKNOWN/empty actions (prevents garbage propagation)
3. Actor._build_system_prompt() β€” hard cap K=10 heuristics (prevents context overflow)
4. ExperienceReplay β€” threading.Lock on mutations (prevents data corruption in swarm)
5. Trajectory.cumulative_reward β€” guard against None scores (prevents TypeError crash)
Import this module to apply all patches:
import purpose_agent.sre_patches # auto-applied
"""
from __future__ import annotations
import logging
import threading
from typing import Any
logger = logging.getLogger("purpose_agent.sre")
_applied = False
def apply_all():
"""Apply all SRE patches. Safe to call multiple times (idempotent)."""
global _applied
if _applied:
return
_applied = True
_patch_memory_store_snapshot()
_patch_actor_unknown_reject()
_patch_actor_heuristic_cap()
_patch_experience_replay_lock()
_patch_trajectory_none_guard()
logger.debug("SRE patches applied (5/5)")
# ═══════════════════════════════════════════════════════════════
# Fix 1: MemoryStore.retrieve() β€” snapshot before iteration
# ═══════════════════════════════════════════════════════════════
def _patch_memory_store_snapshot():
"""Prevent RuntimeError: dictionary changed size during iteration."""
from purpose_agent.memory import MemoryStore
original_retrieve = MemoryStore.retrieve
def safe_retrieve(self, query_text="", scope=None, kinds=None, statuses=None, top_k=10):
"""Patched: iterates over snapshot of _cards, not live dict."""
from purpose_agent.memory import MemoryStatus
import math
statuses = statuses or [MemoryStatus.PROMOTED]
candidates = []
query_emb = self._embed(query_text) if query_text else None
# FIX: snapshot the values BEFORE iteration
cards_snapshot = list(self._cards.values())
for card in cards_snapshot:
if card.status not in statuses:
continue
if kinds and card.kind not in kinds:
continue
if scope and not card.scope.matches(scope):
continue
relevance = 0.5
if query_emb and card.embedding:
relevance = self._cosine(query_emb, card.embedding)
elif query_emb:
card.embedding = self._embed(card.content or card.pattern)
relevance = self._cosine(query_emb, card.embedding)
score = 0.4 * relevance + 0.3 * card.trust_score + 0.3 * card.utility_score
candidates.append((score, card))
candidates.sort(key=lambda x: -x[0])
return [c for _, c in candidates[:top_k]]
MemoryStore.retrieve = safe_retrieve
# ═══════════════════════════════════════════════════════════════
# Fix 2: Actor.decide() β€” reject UNKNOWN/empty actions
# ═══════════════════════════════════════════════════════════════
def _patch_actor_unknown_reject():
"""Prevent garbage UNKNOWN actions from propagating to environment."""
from purpose_agent.actor import Actor
from purpose_agent.types import Action
original_decide = Actor.decide
def safe_decide(self, purpose, current_state, history=None):
action = original_decide(self, purpose, current_state, history)
# Reject UNKNOWN/empty β€” safe fallback to DONE
if not action.name or action.name == "UNKNOWN":
logger.warning("Actor produced UNKNOWN action β€” falling back to DONE")
return Action(
name="DONE",
params={},
thought="[SRE] Failed to parse a valid action. Stopping safely.",
expected_delta="",
)
# Ensure params is always a dict (never None)
if not isinstance(action.params, dict):
action.params = {}
return action
Actor.decide = safe_decide
# ═══════════════════════════════════════════════════════════════
# Fix 3: Actor heuristic cap β€” max K=10 in prompt
# ═══════════════════════════════════════════════════════════════
def _patch_actor_heuristic_cap():
"""Prevent context window overflow from unbounded heuristic injection."""
from purpose_agent.actor import Actor
MAX_STRATEGIC = 5 # Max strategic heuristics in prompt
MAX_PROCEDURAL = 5 # Max procedural SOPs in prompt
original_format_strategic = Actor._format_strategic_memory
def capped_format_strategic(self):
if not self.strategic_memory:
return "None yet β€” this is your first task."
# Cap: only top K by Q-value
top = sorted(self.strategic_memory, key=lambda x: -x.q_value)[:MAX_STRATEGIC]
lines = []
for h in top:
lines.append(f"- When: {h.pattern}\n Do: {h.strategy}")
if len(self.strategic_memory) > MAX_STRATEGIC:
lines.append(f" ({len(self.strategic_memory) - MAX_STRATEGIC} more available)")
return "\n".join(lines)
original_format_procedural = Actor._format_procedural_memory
def capped_format_procedural(self):
if not self.procedural_memory:
return "No procedures available."
top = sorted(self.procedural_memory, key=lambda x: -x.q_value)[:MAX_PROCEDURAL]
lines = ["Available procedures:"]
for h in top:
lines.append(f"- {h.pattern}: {h.strategy[:80]}")
return "\n".join(lines)
Actor._format_strategic_memory = capped_format_strategic
Actor._format_procedural_memory = capped_format_procedural
# ═══════════════════════════════════════════════════════════════
# Fix 4: ExperienceReplay β€” fine-grained threading lock
# ═══════════════════════════════════════════════════════════════
def _patch_experience_replay_lock():
"""Add thread lock to ExperienceReplay mutations for swarm() safety."""
from purpose_agent.experience_replay import ExperienceReplay
# Add a lock to all instances
_lock = threading.Lock()
original_add = ExperienceReplay.add
def locked_add(self, trajectory):
with _lock:
return original_add(self, trajectory)
original_update_q = ExperienceReplay.update_q_value
def locked_update_q(self, record_id, reward, alpha=0.1):
with _lock:
return original_update_q(self, record_id, reward, alpha)
ExperienceReplay.add = locked_add
ExperienceReplay.update_q_value = locked_update_q
# ═══════════════════════════════════════════════════════════════
# Fix 5: Trajectory β€” guard against None scores
# ═══════════════════════════════════════════════════════════════
def _patch_trajectory_none_guard():
"""Prevent TypeError when score is None in trajectory calculations."""
from purpose_agent.types import Trajectory
@property
def safe_cumulative_reward(self) -> float:
"""Sum of positive deltas, guarding against None scores."""
total = 0.0
for s in self.steps:
if s.score is not None and s.score.delta is not None and s.score.delta > 0:
total += s.score.delta
return total
@property
def safe_total_delta(self) -> float:
"""Net improvement, guarding against None scores."""
total = 0.0
for s in self.steps:
if s.score is not None and s.score.delta is not None:
total += s.score.delta
return total
@property
def safe_success_rate(self) -> float:
"""Fraction of steps that improved, guarding against None."""
scored = [s for s in self.steps if s.score is not None and s.score.delta is not None]
if not scored:
return 0.0
return sum(1 for s in scored if s.score.improved) / len(scored)
@property
def safe_final_phi(self) -> float | None:
"""Final Ξ¦, guarding against None."""
scored = [s for s in self.steps if s.score is not None]
if not scored:
return None
return scored[-1].score.phi_after
# Replace the properties
Trajectory.cumulative_reward = safe_cumulative_reward
Trajectory.total_delta = safe_total_delta
Trajectory.success_rate = safe_success_rate
Trajectory.final_phi = safe_final_phi
# Auto-apply on import
apply_all()