Rohan03 commited on
Commit
5c598da
·
verified ·
1 Parent(s): 1fa0c29

V2 merge: purpose_agent/immune.py

Browse files
Files changed (1) hide show
  1. purpose_agent/immune.py +158 -0
purpose_agent/immune.py ADDED
@@ -0,0 +1,158 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ immune.py — Memory immune system. Scans candidate memories for threats.
3
+
4
+ Every candidate memory must pass the immune scan before entering quarantine.
5
+ Scans for:
6
+ 1. Prompt injection — attempts to override system instructions
7
+ 2. Score manipulation — attempts to inflate Φ scores
8
+ 3. Tool misuse — attempts to use tools in unauthorized ways
9
+ 4. Privacy leaks — PII, API keys, file paths that shouldn't be memorized
10
+ 5. Scope overreach — memory tries to apply outside its legitimate scope
11
+ """
12
+ from __future__ import annotations
13
+
14
+ import logging
15
+ import re
16
+ from dataclasses import dataclass, field
17
+ from typing import Any
18
+
19
+ from purpose_agent.memory import MemoryCard, MemoryKind
20
+
21
+ logger = logging.getLogger(__name__)
22
+
23
+
24
+ @dataclass
25
+ class ScanResult:
26
+ """Result of an immune scan on a memory card."""
27
+ passed: bool = True
28
+ threats: list[str] = field(default_factory=list)
29
+ severity: str = "none" # "none", "low", "medium", "high", "critical"
30
+ details: dict[str, Any] = field(default_factory=dict)
31
+
32
+ def add_threat(self, name: str, severity: str, detail: str = "") -> None:
33
+ self.threats.append(name)
34
+ self.passed = False
35
+ sev_order = {"none": 0, "low": 1, "medium": 2, "high": 3, "critical": 4}
36
+ if sev_order.get(severity, 0) > sev_order.get(self.severity, 0):
37
+ self.severity = severity
38
+ if detail:
39
+ self.details[name] = detail
40
+
41
+
42
+ # Patterns for threat detection
43
+ _INJECTION_PATTERNS = [
44
+ r"ignore\s+(previous|above|all)(\s+\w+)*\s*(instructions|prompts|rules)",
45
+ r"you\s+are\s+now\s+",
46
+ r"system\s*:\s*",
47
+ r"<\|im_start\|>",
48
+ r"<\|system\|>",
49
+ r"\[INST\]",
50
+ r"###\s*(instruction|system)",
51
+ r"forget\s+(everything|all|your)",
52
+ r"override\s+(your|the)\s+(purpose|rules|instructions)",
53
+ r"new\s+instructions?\s*:",
54
+ r"disregard\s+(the\s+)?(above|previous)",
55
+ r"ignore\s+.*\binstructions\b",
56
+ ]
57
+
58
+ _SCORE_MANIPULATION_PATTERNS = [
59
+ r"(always|must)\s+(score|rate|give)\s+(high|10|maximum|perfect)",
60
+ r"Φ\s*=\s*10",
61
+ r"phi\s*=\s*10",
62
+ r"delta\s*(must|should)\s*be\s*positive",
63
+ r"never\s+give\s+(negative|zero|low)\s+(score|delta|rating)",
64
+ r"confidence\s*=\s*1\.0",
65
+ ]
66
+
67
+ _TOOL_MISUSE_PATTERNS = [
68
+ r"rm\s+-rf\s+/",
69
+ r"os\.system\s*\(",
70
+ r"subprocess\s*\.\s*(call|run|Popen)",
71
+ r"__import__\s*\(",
72
+ r"eval\s*\(",
73
+ r"exec\s*\(",
74
+ r"shutil\.rmtree",
75
+ r"open\s*\(['\"]/(etc|proc|sys|dev)",
76
+ r"curl\s+.*\|\s*sh",
77
+ r"wget\s+.*\|\s*bash",
78
+ ]
79
+
80
+ _PRIVACY_PATTERNS = [
81
+ r"sk-[a-zA-Z0-9]{20,}", # OpenAI API keys
82
+ r"hf_[a-zA-Z0-9]{20,}", # HuggingFace tokens
83
+ r"ghp_[a-zA-Z0-9]{20,}", # GitHub tokens
84
+ r"AKIA[0-9A-Z]{16}", # AWS access keys
85
+ r"\b\d{3}-\d{2}-\d{4}\b", # SSN-like patterns
86
+ r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", # emails
87
+ r"/home/[a-zA-Z0-9]+/", # home directories
88
+ r"C:\\Users\\[a-zA-Z0-9]+\\", # windows home directories
89
+ ]
90
+
91
+
92
+ def scan_memory(card: MemoryCard) -> ScanResult:
93
+ """
94
+ Run the full immune scan on a memory card.
95
+
96
+ Returns ScanResult with pass/fail and detailed threat information.
97
+ """
98
+ result = ScanResult()
99
+ text = f"{card.content} {card.pattern} {card.strategy} {' '.join(card.steps)}"
100
+ text_lower = text.lower()
101
+
102
+ # 1. Prompt injection
103
+ for pattern in _INJECTION_PATTERNS:
104
+ if re.search(pattern, text_lower):
105
+ result.add_threat(
106
+ "prompt_injection", "critical",
107
+ f"Pattern matched: {pattern}",
108
+ )
109
+
110
+ # 2. Score manipulation
111
+ for pattern in _SCORE_MANIPULATION_PATTERNS:
112
+ if re.search(pattern, text_lower):
113
+ result.add_threat(
114
+ "score_manipulation", "high",
115
+ f"Pattern matched: {pattern}",
116
+ )
117
+
118
+ # 3. Tool misuse
119
+ for pattern in _TOOL_MISUSE_PATTERNS:
120
+ if re.search(pattern, text): # case sensitive for code
121
+ result.add_threat(
122
+ "tool_misuse", "high",
123
+ f"Dangerous code pattern: {pattern}",
124
+ )
125
+
126
+ # 4. Privacy leaks
127
+ for pattern in _PRIVACY_PATTERNS:
128
+ if re.search(pattern, text):
129
+ result.add_threat(
130
+ "privacy_leak", "medium",
131
+ f"PII/secret pattern: {pattern}",
132
+ )
133
+
134
+ # 5. Scope overreach
135
+ if card.kind == MemoryKind.TOOL_POLICY and not card.scope.tool_names:
136
+ result.add_threat(
137
+ "scope_overreach", "low",
138
+ "Tool policy without tool scope — could affect all tools",
139
+ )
140
+
141
+ if card.kind == MemoryKind.CRITIC_CALIBRATION:
142
+ # Critic calibrations are high-risk — extra scrutiny
143
+ if "always" in text_lower and ("high" in text_lower or "10" in text_lower):
144
+ result.add_threat(
145
+ "score_manipulation", "high",
146
+ "Critic calibration attempting to force high scores",
147
+ )
148
+
149
+ # Log result
150
+ if result.passed:
151
+ logger.debug(f"Immune scan PASSED for memory {card.id}")
152
+ else:
153
+ logger.warning(
154
+ f"Immune scan FAILED for memory {card.id}: "
155
+ f"{result.threats} (severity={result.severity})"
156
+ )
157
+
158
+ return result