| """ |
| immune.py — Memory immune system. Scans candidate memories for threats. |
| |
| Every candidate memory must pass the immune scan before entering quarantine. |
| Scans for: |
| 1. Prompt injection — attempts to override system instructions |
| 2. Score manipulation — attempts to inflate Φ scores |
| 3. Tool misuse — attempts to use tools in unauthorized ways |
| 4. Privacy leaks — PII, API keys, file paths that shouldn't be memorized |
| 5. Scope overreach — memory tries to apply outside its legitimate scope |
| """ |
| from __future__ import annotations |
|
|
| import logging |
| import re |
| from dataclasses import dataclass, field |
| from typing import Any |
|
|
| from purpose_agent.memory import MemoryCard, MemoryKind |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class ScanResult: |
| """Result of an immune scan on a memory card.""" |
| passed: bool = True |
| threats: list[str] = field(default_factory=list) |
| severity: str = "none" |
| details: dict[str, Any] = field(default_factory=dict) |
|
|
| def add_threat(self, name: str, severity: str, detail: str = "") -> None: |
| self.threats.append(name) |
| self.passed = False |
| sev_order = {"none": 0, "low": 1, "medium": 2, "high": 3, "critical": 4} |
| if sev_order.get(severity, 0) > sev_order.get(self.severity, 0): |
| self.severity = severity |
| if detail: |
| self.details[name] = detail |
|
|
|
|
| |
| _INJECTION_PATTERNS = [ |
| r"ignore\s+(previous|above|all)(\s+\w+)*\s*(instructions|prompts|rules)", |
| r"you\s+are\s+now\s+", |
| r"system\s*:\s*", |
| r"<\|im_start\|>", |
| r"<\|system\|>", |
| r"\[INST\]", |
| r"###\s*(instruction|system)", |
| r"forget\s+(everything|all|your)", |
| r"override\s+(your|the)\s+(purpose|rules|instructions)", |
| r"new\s+instructions?\s*:", |
| r"disregard\s+(the\s+)?(above|previous)", |
| r"ignore\s+.*\binstructions\b", |
| ] |
|
|
| _SCORE_MANIPULATION_PATTERNS = [ |
| r"(always|must)\s+(score|rate|give)\s+(high|10|maximum|perfect)", |
| r"Φ\s*=\s*10", |
| r"phi\s*=\s*10", |
| r"delta\s*(must|should)\s*be\s*positive", |
| r"never\s+give\s+(negative|zero|low)\s+(score|delta|rating)", |
| r"confidence\s*=\s*1\.0", |
| ] |
|
|
| _TOOL_MISUSE_PATTERNS = [ |
| r"rm\s+-rf\s+/", |
| r"os\.system\s*\(", |
| r"subprocess\s*\.\s*(call|run|Popen)", |
| r"__import__\s*\(", |
| r"eval\s*\(", |
| r"exec\s*\(", |
| r"shutil\.rmtree", |
| r"open\s*\(['\"]/(etc|proc|sys|dev)", |
| r"curl\s+.*\|\s*sh", |
| r"wget\s+.*\|\s*bash", |
| ] |
|
|
| _PRIVACY_PATTERNS = [ |
| r"sk-[a-zA-Z0-9]{20,}", |
| r"hf_[a-zA-Z0-9]{20,}", |
| r"ghp_[a-zA-Z0-9]{20,}", |
| r"AKIA[0-9A-Z]{16}", |
| r"\b\d{3}-\d{2}-\d{4}\b", |
| r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", |
| r"/home/[a-zA-Z0-9]+/", |
| r"C:\\Users\\[a-zA-Z0-9]+\\", |
| ] |
|
|
|
|
| def scan_memory(card: MemoryCard) -> ScanResult: |
| """ |
| Run the full immune scan on a memory card. |
| |
| Returns ScanResult with pass/fail and detailed threat information. |
| """ |
| result = ScanResult() |
| text = f"{card.content} {card.pattern} {card.strategy} {' '.join(card.steps)}" |
| text_lower = text.lower() |
|
|
| |
| for pattern in _INJECTION_PATTERNS: |
| if re.search(pattern, text_lower): |
| result.add_threat( |
| "prompt_injection", "critical", |
| f"Pattern matched: {pattern}", |
| ) |
|
|
| |
| for pattern in _SCORE_MANIPULATION_PATTERNS: |
| if re.search(pattern, text_lower): |
| result.add_threat( |
| "score_manipulation", "high", |
| f"Pattern matched: {pattern}", |
| ) |
|
|
| |
| for pattern in _TOOL_MISUSE_PATTERNS: |
| if re.search(pattern, text): |
| result.add_threat( |
| "tool_misuse", "high", |
| f"Dangerous code pattern: {pattern}", |
| ) |
|
|
| |
| for pattern in _PRIVACY_PATTERNS: |
| if re.search(pattern, text): |
| result.add_threat( |
| "privacy_leak", "medium", |
| f"PII/secret pattern: {pattern}", |
| ) |
|
|
| |
| if card.kind == MemoryKind.TOOL_POLICY and not card.scope.tool_names: |
| result.add_threat( |
| "scope_overreach", "low", |
| "Tool policy without tool scope — could affect all tools", |
| ) |
|
|
| if card.kind == MemoryKind.CRITIC_CALIBRATION: |
| |
| if "always" in text_lower and ("high" in text_lower or "10" in text_lower): |
| result.add_threat( |
| "score_manipulation", "high", |
| "Critic calibration attempting to force high scores", |
| ) |
|
|
| |
| if result.passed: |
| logger.debug(f"Immune scan PASSED for memory {card.id}") |
| else: |
| logger.warning( |
| f"Immune scan FAILED for memory {card.id}: " |
| f"{result.threats} (severity={result.severity})" |
| ) |
|
|
| return result |
|
|