Rohan03's picture
V2 merge: purpose_agent/immune.py
5c598da verified
"""
immune.py — Memory immune system. Scans candidate memories for threats.
Every candidate memory must pass the immune scan before entering quarantine.
Scans for:
1. Prompt injection — attempts to override system instructions
2. Score manipulation — attempts to inflate Φ scores
3. Tool misuse — attempts to use tools in unauthorized ways
4. Privacy leaks — PII, API keys, file paths that shouldn't be memorized
5. Scope overreach — memory tries to apply outside its legitimate scope
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from typing import Any
from purpose_agent.memory import MemoryCard, MemoryKind
logger = logging.getLogger(__name__)
@dataclass
class ScanResult:
"""Result of an immune scan on a memory card."""
passed: bool = True
threats: list[str] = field(default_factory=list)
severity: str = "none" # "none", "low", "medium", "high", "critical"
details: dict[str, Any] = field(default_factory=dict)
def add_threat(self, name: str, severity: str, detail: str = "") -> None:
self.threats.append(name)
self.passed = False
sev_order = {"none": 0, "low": 1, "medium": 2, "high": 3, "critical": 4}
if sev_order.get(severity, 0) > sev_order.get(self.severity, 0):
self.severity = severity
if detail:
self.details[name] = detail
# Patterns for threat detection
_INJECTION_PATTERNS = [
r"ignore\s+(previous|above|all)(\s+\w+)*\s*(instructions|prompts|rules)",
r"you\s+are\s+now\s+",
r"system\s*:\s*",
r"<\|im_start\|>",
r"<\|system\|>",
r"\[INST\]",
r"###\s*(instruction|system)",
r"forget\s+(everything|all|your)",
r"override\s+(your|the)\s+(purpose|rules|instructions)",
r"new\s+instructions?\s*:",
r"disregard\s+(the\s+)?(above|previous)",
r"ignore\s+.*\binstructions\b",
]
_SCORE_MANIPULATION_PATTERNS = [
r"(always|must)\s+(score|rate|give)\s+(high|10|maximum|perfect)",
r"Φ\s*=\s*10",
r"phi\s*=\s*10",
r"delta\s*(must|should)\s*be\s*positive",
r"never\s+give\s+(negative|zero|low)\s+(score|delta|rating)",
r"confidence\s*=\s*1\.0",
]
_TOOL_MISUSE_PATTERNS = [
r"rm\s+-rf\s+/",
r"os\.system\s*\(",
r"subprocess\s*\.\s*(call|run|Popen)",
r"__import__\s*\(",
r"eval\s*\(",
r"exec\s*\(",
r"shutil\.rmtree",
r"open\s*\(['\"]/(etc|proc|sys|dev)",
r"curl\s+.*\|\s*sh",
r"wget\s+.*\|\s*bash",
]
_PRIVACY_PATTERNS = [
r"sk-[a-zA-Z0-9]{20,}", # OpenAI API keys
r"hf_[a-zA-Z0-9]{20,}", # HuggingFace tokens
r"ghp_[a-zA-Z0-9]{20,}", # GitHub tokens
r"AKIA[0-9A-Z]{16}", # AWS access keys
r"\b\d{3}-\d{2}-\d{4}\b", # SSN-like patterns
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", # emails
r"/home/[a-zA-Z0-9]+/", # home directories
r"C:\\Users\\[a-zA-Z0-9]+\\", # windows home directories
]
def scan_memory(card: MemoryCard) -> ScanResult:
"""
Run the full immune scan on a memory card.
Returns ScanResult with pass/fail and detailed threat information.
"""
result = ScanResult()
text = f"{card.content} {card.pattern} {card.strategy} {' '.join(card.steps)}"
text_lower = text.lower()
# 1. Prompt injection
for pattern in _INJECTION_PATTERNS:
if re.search(pattern, text_lower):
result.add_threat(
"prompt_injection", "critical",
f"Pattern matched: {pattern}",
)
# 2. Score manipulation
for pattern in _SCORE_MANIPULATION_PATTERNS:
if re.search(pattern, text_lower):
result.add_threat(
"score_manipulation", "high",
f"Pattern matched: {pattern}",
)
# 3. Tool misuse
for pattern in _TOOL_MISUSE_PATTERNS:
if re.search(pattern, text): # case sensitive for code
result.add_threat(
"tool_misuse", "high",
f"Dangerous code pattern: {pattern}",
)
# 4. Privacy leaks
for pattern in _PRIVACY_PATTERNS:
if re.search(pattern, text):
result.add_threat(
"privacy_leak", "medium",
f"PII/secret pattern: {pattern}",
)
# 5. Scope overreach
if card.kind == MemoryKind.TOOL_POLICY and not card.scope.tool_names:
result.add_threat(
"scope_overreach", "low",
"Tool policy without tool scope — could affect all tools",
)
if card.kind == MemoryKind.CRITIC_CALIBRATION:
# Critic calibrations are high-risk — extra scrutiny
if "always" in text_lower and ("high" in text_lower or "10" in text_lower):
result.add_threat(
"score_manipulation", "high",
"Critic calibration attempting to force high scores",
)
# Log result
if result.passed:
logger.debug(f"Immune scan PASSED for memory {card.id}")
else:
logger.warning(
f"Immune scan FAILED for memory {card.id}: "
f"{result.threats} (severity={result.severity})"
)
return result