purpose-agent / purpose_agent /memory_ci.py
Rohan03's picture
V2 merge: purpose_agent/memory_ci.py
eef94ae verified
"""
memory_ci.py β€” Memory Continuous Integration pipeline.
Candidate memories go through a strict promotion pipeline:
candidate β†’ immune_scan β†’ quarantine β†’ replay_test β†’ promote/reject
No memory reaches the agent's prompt without passing every gate.
"""
from __future__ import annotations
import logging
from typing import Any
from purpose_agent.memory import MemoryCard, MemoryKind, MemoryStatus, MemoryStore
from purpose_agent.immune import scan_memory, ScanResult
logger = logging.getLogger(__name__)
class MemoryCI:
"""
Continuous integration pipeline for agent memories.
Usage:
ci = MemoryCI(memory_store)
# Submit a candidate memory
card = MemoryCard(kind=MemoryKind.SKILL_CARD, pattern="...", strategy="...")
result = ci.submit(card)
# result.status is now QUARANTINED (passed scan) or REJECTED (failed scan)
# After replay testing confirms it helps:
ci.promote(card.id)
# Or if replay shows it hurts:
ci.reject(card.id, reason="decreased success rate in replay test")
"""
def __init__(self, store: MemoryStore):
self.store = store
self._scan_log: list[dict[str, Any]] = []
def submit(self, card: MemoryCard) -> MemoryCard:
"""
Submit a candidate memory for the CI pipeline.
Steps:
1. Add to store as CANDIDATE
2. Run immune scan
3. If scan passes β†’ QUARANTINED (awaiting replay test)
4. If scan fails β†’ REJECTED (with reason)
"""
card.status = MemoryStatus.CANDIDATE
self.store.add(card)
# Immune scan
scan_result = scan_memory(card)
card.immune_scan_result = {
"passed": scan_result.passed,
"threats": scan_result.threats,
"severity": scan_result.severity,
}
self._scan_log.append({
"card_id": card.id,
"kind": card.kind.value,
"scan_passed": scan_result.passed,
"threats": scan_result.threats,
})
if scan_result.passed:
self.store.update_status(card.id, MemoryStatus.QUARANTINED)
logger.info(f"MemoryCI: {card.id} β†’ QUARANTINED (scan passed)")
else:
reason = f"Immune scan failed: {', '.join(scan_result.threats)} (severity={scan_result.severity})"
self.store.update_status(card.id, MemoryStatus.REJECTED, reason=reason)
logger.warning(f"MemoryCI: {card.id} β†’ REJECTED ({reason})")
return card
def promote(self, card_id: str) -> bool:
"""
Promote a quarantined memory to active use.
Only quarantined memories can be promoted.
"""
card = self.store.get(card_id)
if not card:
logger.warning(f"MemoryCI: card {card_id} not found")
return False
if card.status != MemoryStatus.QUARANTINED:
logger.warning(
f"MemoryCI: cannot promote {card_id} β€” "
f"status is {card.status.value}, expected quarantined"
)
return False
self.store.update_status(card_id, MemoryStatus.PROMOTED)
logger.info(f"MemoryCI: {card_id} β†’ PROMOTED")
return True
def reject(self, card_id: str, reason: str = "") -> bool:
"""Reject a memory (from any non-promoted status)."""
card = self.store.get(card_id)
if not card:
return False
if card.status == MemoryStatus.PROMOTED:
# Demote: promoted β†’ archived (don't delete, keep for audit)
self.store.update_status(card_id, MemoryStatus.ARCHIVED, reason=reason)
logger.info(f"MemoryCI: {card_id} DEMOTED β†’ ARCHIVED ({reason})")
else:
self.store.update_status(card_id, MemoryStatus.REJECTED, reason=reason)
logger.info(f"MemoryCI: {card_id} β†’ REJECTED ({reason})")
return True
def archive(self, card_id: str) -> bool:
"""Archive a promoted memory (soft delete β€” keeps for audit trail)."""
card = self.store.get(card_id)
if not card:
return False
self.store.update_status(card_id, MemoryStatus.ARCHIVED)
return True
def get_quarantined(self) -> list[MemoryCard]:
"""Get all memories awaiting replay testing."""
return self.store.get_by_status(MemoryStatus.QUARANTINED)
def get_rejected(self) -> list[MemoryCard]:
"""Get all rejected memories (for audit)."""
return self.store.get_by_status(MemoryStatus.REJECTED)
@property
def scan_log(self) -> list[dict]:
return self._scan_log