File size: 5,281 Bytes
5c598da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
immune.py — Memory immune system. Scans candidate memories for threats.

Every candidate memory must pass the immune scan before entering quarantine.
Scans for:
  1. Prompt injection — attempts to override system instructions
  2. Score manipulation — attempts to inflate Φ scores
  3. Tool misuse — attempts to use tools in unauthorized ways
  4. Privacy leaks — PII, API keys, file paths that shouldn't be memorized
  5. Scope overreach — memory tries to apply outside its legitimate scope
"""
from __future__ import annotations

import logging
import re
from dataclasses import dataclass, field
from typing import Any

from purpose_agent.memory import MemoryCard, MemoryKind

logger = logging.getLogger(__name__)


@dataclass
class ScanResult:
    """Result of an immune scan on a memory card."""
    passed: bool = True
    threats: list[str] = field(default_factory=list)
    severity: str = "none"   # "none", "low", "medium", "high", "critical"
    details: dict[str, Any] = field(default_factory=dict)

    def add_threat(self, name: str, severity: str, detail: str = "") -> None:
        self.threats.append(name)
        self.passed = False
        sev_order = {"none": 0, "low": 1, "medium": 2, "high": 3, "critical": 4}
        if sev_order.get(severity, 0) > sev_order.get(self.severity, 0):
            self.severity = severity
        if detail:
            self.details[name] = detail


# Patterns for threat detection
_INJECTION_PATTERNS = [
    r"ignore\s+(previous|above|all)(\s+\w+)*\s*(instructions|prompts|rules)",
    r"you\s+are\s+now\s+",
    r"system\s*:\s*",
    r"<\|im_start\|>",
    r"<\|system\|>",
    r"\[INST\]",
    r"###\s*(instruction|system)",
    r"forget\s+(everything|all|your)",
    r"override\s+(your|the)\s+(purpose|rules|instructions)",
    r"new\s+instructions?\s*:",
    r"disregard\s+(the\s+)?(above|previous)",
    r"ignore\s+.*\binstructions\b",
]

_SCORE_MANIPULATION_PATTERNS = [
    r"(always|must)\s+(score|rate|give)\s+(high|10|maximum|perfect)",
    r"Φ\s*=\s*10",
    r"phi\s*=\s*10",
    r"delta\s*(must|should)\s*be\s*positive",
    r"never\s+give\s+(negative|zero|low)\s+(score|delta|rating)",
    r"confidence\s*=\s*1\.0",
]

_TOOL_MISUSE_PATTERNS = [
    r"rm\s+-rf\s+/",
    r"os\.system\s*\(",
    r"subprocess\s*\.\s*(call|run|Popen)",
    r"__import__\s*\(",
    r"eval\s*\(",
    r"exec\s*\(",
    r"shutil\.rmtree",
    r"open\s*\(['\"]/(etc|proc|sys|dev)",
    r"curl\s+.*\|\s*sh",
    r"wget\s+.*\|\s*bash",
]

_PRIVACY_PATTERNS = [
    r"sk-[a-zA-Z0-9]{20,}",                  # OpenAI API keys
    r"hf_[a-zA-Z0-9]{20,}",                  # HuggingFace tokens
    r"ghp_[a-zA-Z0-9]{20,}",                 # GitHub tokens
    r"AKIA[0-9A-Z]{16}",                      # AWS access keys
    r"\b\d{3}-\d{2}-\d{4}\b",                 # SSN-like patterns
    r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",  # emails
    r"/home/[a-zA-Z0-9]+/",                   # home directories
    r"C:\\Users\\[a-zA-Z0-9]+\\",             # windows home directories
]


def scan_memory(card: MemoryCard) -> ScanResult:
    """
    Run the full immune scan on a memory card.

    Returns ScanResult with pass/fail and detailed threat information.
    """
    result = ScanResult()
    text = f"{card.content} {card.pattern} {card.strategy} {' '.join(card.steps)}"
    text_lower = text.lower()

    # 1. Prompt injection
    for pattern in _INJECTION_PATTERNS:
        if re.search(pattern, text_lower):
            result.add_threat(
                "prompt_injection", "critical",
                f"Pattern matched: {pattern}",
            )

    # 2. Score manipulation
    for pattern in _SCORE_MANIPULATION_PATTERNS:
        if re.search(pattern, text_lower):
            result.add_threat(
                "score_manipulation", "high",
                f"Pattern matched: {pattern}",
            )

    # 3. Tool misuse
    for pattern in _TOOL_MISUSE_PATTERNS:
        if re.search(pattern, text):  # case sensitive for code
            result.add_threat(
                "tool_misuse", "high",
                f"Dangerous code pattern: {pattern}",
            )

    # 4. Privacy leaks
    for pattern in _PRIVACY_PATTERNS:
        if re.search(pattern, text):
            result.add_threat(
                "privacy_leak", "medium",
                f"PII/secret pattern: {pattern}",
            )

    # 5. Scope overreach
    if card.kind == MemoryKind.TOOL_POLICY and not card.scope.tool_names:
        result.add_threat(
            "scope_overreach", "low",
            "Tool policy without tool scope — could affect all tools",
        )

    if card.kind == MemoryKind.CRITIC_CALIBRATION:
        # Critic calibrations are high-risk — extra scrutiny
        if "always" in text_lower and ("high" in text_lower or "10" in text_lower):
            result.add_threat(
                "score_manipulation", "high",
                "Critic calibration attempting to force high scores",
            )

    # Log result
    if result.passed:
        logger.debug(f"Immune scan PASSED for memory {card.id}")
    else:
        logger.warning(
            f"Immune scan FAILED for memory {card.id}: "
            f"{result.threats} (severity={result.severity})"
        )

    return result