""" immune.py — Memory immune system. Scans candidate memories for threats. Every candidate memory must pass the immune scan before entering quarantine. Scans for: 1. Prompt injection — attempts to override system instructions 2. Score manipulation — attempts to inflate Φ scores 3. Tool misuse — attempts to use tools in unauthorized ways 4. Privacy leaks — PII, API keys, file paths that shouldn't be memorized 5. Scope overreach — memory tries to apply outside its legitimate scope """ from __future__ import annotations import logging import re from dataclasses import dataclass, field from typing import Any from purpose_agent.memory import MemoryCard, MemoryKind logger = logging.getLogger(__name__) @dataclass class ScanResult: """Result of an immune scan on a memory card.""" passed: bool = True threats: list[str] = field(default_factory=list) severity: str = "none" # "none", "low", "medium", "high", "critical" details: dict[str, Any] = field(default_factory=dict) def add_threat(self, name: str, severity: str, detail: str = "") -> None: self.threats.append(name) self.passed = False sev_order = {"none": 0, "low": 1, "medium": 2, "high": 3, "critical": 4} if sev_order.get(severity, 0) > sev_order.get(self.severity, 0): self.severity = severity if detail: self.details[name] = detail # Patterns for threat detection _INJECTION_PATTERNS = [ r"ignore\s+(previous|above|all)(\s+\w+)*\s*(instructions|prompts|rules)", r"you\s+are\s+now\s+", r"system\s*:\s*", r"<\|im_start\|>", r"<\|system\|>", r"\[INST\]", r"###\s*(instruction|system)", r"forget\s+(everything|all|your)", r"override\s+(your|the)\s+(purpose|rules|instructions)", r"new\s+instructions?\s*:", r"disregard\s+(the\s+)?(above|previous)", r"ignore\s+.*\binstructions\b", ] _SCORE_MANIPULATION_PATTERNS = [ r"(always|must)\s+(score|rate|give)\s+(high|10|maximum|perfect)", r"Φ\s*=\s*10", r"phi\s*=\s*10", r"delta\s*(must|should)\s*be\s*positive", r"never\s+give\s+(negative|zero|low)\s+(score|delta|rating)", r"confidence\s*=\s*1\.0", ] _TOOL_MISUSE_PATTERNS = [ r"rm\s+-rf\s+/", r"os\.system\s*\(", r"subprocess\s*\.\s*(call|run|Popen)", r"__import__\s*\(", r"eval\s*\(", r"exec\s*\(", r"shutil\.rmtree", r"open\s*\(['\"]/(etc|proc|sys|dev)", r"curl\s+.*\|\s*sh", r"wget\s+.*\|\s*bash", ] _PRIVACY_PATTERNS = [ r"sk-[a-zA-Z0-9]{20,}", # OpenAI API keys r"hf_[a-zA-Z0-9]{20,}", # HuggingFace tokens r"ghp_[a-zA-Z0-9]{20,}", # GitHub tokens r"AKIA[0-9A-Z]{16}", # AWS access keys r"\b\d{3}-\d{2}-\d{4}\b", # SSN-like patterns r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", # emails r"/home/[a-zA-Z0-9]+/", # home directories r"C:\\Users\\[a-zA-Z0-9]+\\", # windows home directories ] def scan_memory(card: MemoryCard) -> ScanResult: """ Run the full immune scan on a memory card. Returns ScanResult with pass/fail and detailed threat information. """ result = ScanResult() text = f"{card.content} {card.pattern} {card.strategy} {' '.join(card.steps)}" text_lower = text.lower() # 1. Prompt injection for pattern in _INJECTION_PATTERNS: if re.search(pattern, text_lower): result.add_threat( "prompt_injection", "critical", f"Pattern matched: {pattern}", ) # 2. Score manipulation for pattern in _SCORE_MANIPULATION_PATTERNS: if re.search(pattern, text_lower): result.add_threat( "score_manipulation", "high", f"Pattern matched: {pattern}", ) # 3. Tool misuse for pattern in _TOOL_MISUSE_PATTERNS: if re.search(pattern, text): # case sensitive for code result.add_threat( "tool_misuse", "high", f"Dangerous code pattern: {pattern}", ) # 4. Privacy leaks for pattern in _PRIVACY_PATTERNS: if re.search(pattern, text): result.add_threat( "privacy_leak", "medium", f"PII/secret pattern: {pattern}", ) # 5. Scope overreach if card.kind == MemoryKind.TOOL_POLICY and not card.scope.tool_names: result.add_threat( "scope_overreach", "low", "Tool policy without tool scope — could affect all tools", ) if card.kind == MemoryKind.CRITIC_CALIBRATION: # Critic calibrations are high-risk — extra scrutiny if "always" in text_lower and ("high" in text_lower or "10" in text_lower): result.add_threat( "score_manipulation", "high", "Critic calibration attempting to force high scores", ) # Log result if result.passed: logger.debug(f"Immune scan PASSED for memory {card.id}") else: logger.warning( f"Immune scan FAILED for memory {card.id}: " f"{result.threats} (severity={result.severity})" ) return result