File size: 5,281 Bytes
5c598da | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | """
immune.py — Memory immune system. Scans candidate memories for threats.
Every candidate memory must pass the immune scan before entering quarantine.
Scans for:
1. Prompt injection — attempts to override system instructions
2. Score manipulation — attempts to inflate Φ scores
3. Tool misuse — attempts to use tools in unauthorized ways
4. Privacy leaks — PII, API keys, file paths that shouldn't be memorized
5. Scope overreach — memory tries to apply outside its legitimate scope
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from typing import Any
from purpose_agent.memory import MemoryCard, MemoryKind
logger = logging.getLogger(__name__)
@dataclass
class ScanResult:
"""Result of an immune scan on a memory card."""
passed: bool = True
threats: list[str] = field(default_factory=list)
severity: str = "none" # "none", "low", "medium", "high", "critical"
details: dict[str, Any] = field(default_factory=dict)
def add_threat(self, name: str, severity: str, detail: str = "") -> None:
self.threats.append(name)
self.passed = False
sev_order = {"none": 0, "low": 1, "medium": 2, "high": 3, "critical": 4}
if sev_order.get(severity, 0) > sev_order.get(self.severity, 0):
self.severity = severity
if detail:
self.details[name] = detail
# Patterns for threat detection
_INJECTION_PATTERNS = [
r"ignore\s+(previous|above|all)(\s+\w+)*\s*(instructions|prompts|rules)",
r"you\s+are\s+now\s+",
r"system\s*:\s*",
r"<\|im_start\|>",
r"<\|system\|>",
r"\[INST\]",
r"###\s*(instruction|system)",
r"forget\s+(everything|all|your)",
r"override\s+(your|the)\s+(purpose|rules|instructions)",
r"new\s+instructions?\s*:",
r"disregard\s+(the\s+)?(above|previous)",
r"ignore\s+.*\binstructions\b",
]
_SCORE_MANIPULATION_PATTERNS = [
r"(always|must)\s+(score|rate|give)\s+(high|10|maximum|perfect)",
r"Φ\s*=\s*10",
r"phi\s*=\s*10",
r"delta\s*(must|should)\s*be\s*positive",
r"never\s+give\s+(negative|zero|low)\s+(score|delta|rating)",
r"confidence\s*=\s*1\.0",
]
_TOOL_MISUSE_PATTERNS = [
r"rm\s+-rf\s+/",
r"os\.system\s*\(",
r"subprocess\s*\.\s*(call|run|Popen)",
r"__import__\s*\(",
r"eval\s*\(",
r"exec\s*\(",
r"shutil\.rmtree",
r"open\s*\(['\"]/(etc|proc|sys|dev)",
r"curl\s+.*\|\s*sh",
r"wget\s+.*\|\s*bash",
]
_PRIVACY_PATTERNS = [
r"sk-[a-zA-Z0-9]{20,}", # OpenAI API keys
r"hf_[a-zA-Z0-9]{20,}", # HuggingFace tokens
r"ghp_[a-zA-Z0-9]{20,}", # GitHub tokens
r"AKIA[0-9A-Z]{16}", # AWS access keys
r"\b\d{3}-\d{2}-\d{4}\b", # SSN-like patterns
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", # emails
r"/home/[a-zA-Z0-9]+/", # home directories
r"C:\\Users\\[a-zA-Z0-9]+\\", # windows home directories
]
def scan_memory(card: MemoryCard) -> ScanResult:
"""
Run the full immune scan on a memory card.
Returns ScanResult with pass/fail and detailed threat information.
"""
result = ScanResult()
text = f"{card.content} {card.pattern} {card.strategy} {' '.join(card.steps)}"
text_lower = text.lower()
# 1. Prompt injection
for pattern in _INJECTION_PATTERNS:
if re.search(pattern, text_lower):
result.add_threat(
"prompt_injection", "critical",
f"Pattern matched: {pattern}",
)
# 2. Score manipulation
for pattern in _SCORE_MANIPULATION_PATTERNS:
if re.search(pattern, text_lower):
result.add_threat(
"score_manipulation", "high",
f"Pattern matched: {pattern}",
)
# 3. Tool misuse
for pattern in _TOOL_MISUSE_PATTERNS:
if re.search(pattern, text): # case sensitive for code
result.add_threat(
"tool_misuse", "high",
f"Dangerous code pattern: {pattern}",
)
# 4. Privacy leaks
for pattern in _PRIVACY_PATTERNS:
if re.search(pattern, text):
result.add_threat(
"privacy_leak", "medium",
f"PII/secret pattern: {pattern}",
)
# 5. Scope overreach
if card.kind == MemoryKind.TOOL_POLICY and not card.scope.tool_names:
result.add_threat(
"scope_overreach", "low",
"Tool policy without tool scope — could affect all tools",
)
if card.kind == MemoryKind.CRITIC_CALIBRATION:
# Critic calibrations are high-risk — extra scrutiny
if "always" in text_lower and ("high" in text_lower or "10" in text_lower):
result.add_threat(
"score_manipulation", "high",
"Critic calibration attempting to force high scores",
)
# Log result
if result.passed:
logger.debug(f"Immune scan PASSED for memory {card.id}")
else:
logger.warning(
f"Immune scan FAILED for memory {card.id}: "
f"{result.threats} (severity={result.severity})"
)
return result
|