File size: 4,680 Bytes
eef94ae | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | """
memory_ci.py β Memory Continuous Integration pipeline.
Candidate memories go through a strict promotion pipeline:
candidate β immune_scan β quarantine β replay_test β promote/reject
No memory reaches the agent's prompt without passing every gate.
"""
from __future__ import annotations
import logging
from typing import Any
from purpose_agent.memory import MemoryCard, MemoryKind, MemoryStatus, MemoryStore
from purpose_agent.immune import scan_memory, ScanResult
logger = logging.getLogger(__name__)
class MemoryCI:
"""
Continuous integration pipeline for agent memories.
Usage:
ci = MemoryCI(memory_store)
# Submit a candidate memory
card = MemoryCard(kind=MemoryKind.SKILL_CARD, pattern="...", strategy="...")
result = ci.submit(card)
# result.status is now QUARANTINED (passed scan) or REJECTED (failed scan)
# After replay testing confirms it helps:
ci.promote(card.id)
# Or if replay shows it hurts:
ci.reject(card.id, reason="decreased success rate in replay test")
"""
def __init__(self, store: MemoryStore):
self.store = store
self._scan_log: list[dict[str, Any]] = []
def submit(self, card: MemoryCard) -> MemoryCard:
"""
Submit a candidate memory for the CI pipeline.
Steps:
1. Add to store as CANDIDATE
2. Run immune scan
3. If scan passes β QUARANTINED (awaiting replay test)
4. If scan fails β REJECTED (with reason)
"""
card.status = MemoryStatus.CANDIDATE
self.store.add(card)
# Immune scan
scan_result = scan_memory(card)
card.immune_scan_result = {
"passed": scan_result.passed,
"threats": scan_result.threats,
"severity": scan_result.severity,
}
self._scan_log.append({
"card_id": card.id,
"kind": card.kind.value,
"scan_passed": scan_result.passed,
"threats": scan_result.threats,
})
if scan_result.passed:
self.store.update_status(card.id, MemoryStatus.QUARANTINED)
logger.info(f"MemoryCI: {card.id} β QUARANTINED (scan passed)")
else:
reason = f"Immune scan failed: {', '.join(scan_result.threats)} (severity={scan_result.severity})"
self.store.update_status(card.id, MemoryStatus.REJECTED, reason=reason)
logger.warning(f"MemoryCI: {card.id} β REJECTED ({reason})")
return card
def promote(self, card_id: str) -> bool:
"""
Promote a quarantined memory to active use.
Only quarantined memories can be promoted.
"""
card = self.store.get(card_id)
if not card:
logger.warning(f"MemoryCI: card {card_id} not found")
return False
if card.status != MemoryStatus.QUARANTINED:
logger.warning(
f"MemoryCI: cannot promote {card_id} β "
f"status is {card.status.value}, expected quarantined"
)
return False
self.store.update_status(card_id, MemoryStatus.PROMOTED)
logger.info(f"MemoryCI: {card_id} β PROMOTED")
return True
def reject(self, card_id: str, reason: str = "") -> bool:
"""Reject a memory (from any non-promoted status)."""
card = self.store.get(card_id)
if not card:
return False
if card.status == MemoryStatus.PROMOTED:
# Demote: promoted β archived (don't delete, keep for audit)
self.store.update_status(card_id, MemoryStatus.ARCHIVED, reason=reason)
logger.info(f"MemoryCI: {card_id} DEMOTED β ARCHIVED ({reason})")
else:
self.store.update_status(card_id, MemoryStatus.REJECTED, reason=reason)
logger.info(f"MemoryCI: {card_id} β REJECTED ({reason})")
return True
def archive(self, card_id: str) -> bool:
"""Archive a promoted memory (soft delete β keeps for audit trail)."""
card = self.store.get(card_id)
if not card:
return False
self.store.update_status(card_id, MemoryStatus.ARCHIVED)
return True
def get_quarantined(self) -> list[MemoryCard]:
"""Get all memories awaiting replay testing."""
return self.store.get_by_status(MemoryStatus.QUARANTINED)
def get_rejected(self) -> list[MemoryCard]:
"""Get all rejected memories (for audit)."""
return self.store.get_by_status(MemoryStatus.REJECTED)
@property
def scan_log(self) -> list[dict]:
return self._scan_log
|