| """ |
| memory_ci.py β Memory Continuous Integration pipeline. |
| |
| Candidate memories go through a strict promotion pipeline: |
| |
| candidate β immune_scan β quarantine β replay_test β promote/reject |
| |
| No memory reaches the agent's prompt without passing every gate. |
| """ |
| from __future__ import annotations |
|
|
| import logging |
| from typing import Any |
|
|
| from purpose_agent.memory import MemoryCard, MemoryKind, MemoryStatus, MemoryStore |
| from purpose_agent.immune import scan_memory, ScanResult |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class MemoryCI: |
| """ |
| Continuous integration pipeline for agent memories. |
| |
| Usage: |
| ci = MemoryCI(memory_store) |
| |
| # Submit a candidate memory |
| card = MemoryCard(kind=MemoryKind.SKILL_CARD, pattern="...", strategy="...") |
| result = ci.submit(card) |
| |
| # result.status is now QUARANTINED (passed scan) or REJECTED (failed scan) |
| |
| # After replay testing confirms it helps: |
| ci.promote(card.id) |
| |
| # Or if replay shows it hurts: |
| ci.reject(card.id, reason="decreased success rate in replay test") |
| """ |
|
|
| def __init__(self, store: MemoryStore): |
| self.store = store |
| self._scan_log: list[dict[str, Any]] = [] |
|
|
| def submit(self, card: MemoryCard) -> MemoryCard: |
| """ |
| Submit a candidate memory for the CI pipeline. |
| |
| Steps: |
| 1. Add to store as CANDIDATE |
| 2. Run immune scan |
| 3. If scan passes β QUARANTINED (awaiting replay test) |
| 4. If scan fails β REJECTED (with reason) |
| """ |
| card.status = MemoryStatus.CANDIDATE |
| self.store.add(card) |
|
|
| |
| scan_result = scan_memory(card) |
| card.immune_scan_result = { |
| "passed": scan_result.passed, |
| "threats": scan_result.threats, |
| "severity": scan_result.severity, |
| } |
|
|
| self._scan_log.append({ |
| "card_id": card.id, |
| "kind": card.kind.value, |
| "scan_passed": scan_result.passed, |
| "threats": scan_result.threats, |
| }) |
|
|
| if scan_result.passed: |
| self.store.update_status(card.id, MemoryStatus.QUARANTINED) |
| logger.info(f"MemoryCI: {card.id} β QUARANTINED (scan passed)") |
| else: |
| reason = f"Immune scan failed: {', '.join(scan_result.threats)} (severity={scan_result.severity})" |
| self.store.update_status(card.id, MemoryStatus.REJECTED, reason=reason) |
| logger.warning(f"MemoryCI: {card.id} β REJECTED ({reason})") |
|
|
| return card |
|
|
| def promote(self, card_id: str) -> bool: |
| """ |
| Promote a quarantined memory to active use. |
| Only quarantined memories can be promoted. |
| """ |
| card = self.store.get(card_id) |
| if not card: |
| logger.warning(f"MemoryCI: card {card_id} not found") |
| return False |
|
|
| if card.status != MemoryStatus.QUARANTINED: |
| logger.warning( |
| f"MemoryCI: cannot promote {card_id} β " |
| f"status is {card.status.value}, expected quarantined" |
| ) |
| return False |
|
|
| self.store.update_status(card_id, MemoryStatus.PROMOTED) |
| logger.info(f"MemoryCI: {card_id} β PROMOTED") |
| return True |
|
|
| def reject(self, card_id: str, reason: str = "") -> bool: |
| """Reject a memory (from any non-promoted status).""" |
| card = self.store.get(card_id) |
| if not card: |
| return False |
|
|
| if card.status == MemoryStatus.PROMOTED: |
| |
| self.store.update_status(card_id, MemoryStatus.ARCHIVED, reason=reason) |
| logger.info(f"MemoryCI: {card_id} DEMOTED β ARCHIVED ({reason})") |
| else: |
| self.store.update_status(card_id, MemoryStatus.REJECTED, reason=reason) |
| logger.info(f"MemoryCI: {card_id} β REJECTED ({reason})") |
| return True |
|
|
| def archive(self, card_id: str) -> bool: |
| """Archive a promoted memory (soft delete β keeps for audit trail).""" |
| card = self.store.get(card_id) |
| if not card: |
| return False |
| self.store.update_status(card_id, MemoryStatus.ARCHIVED) |
| return True |
|
|
| def get_quarantined(self) -> list[MemoryCard]: |
| """Get all memories awaiting replay testing.""" |
| return self.store.get_by_status(MemoryStatus.QUARANTINED) |
|
|
| def get_rejected(self) -> list[MemoryCard]: |
| """Get all rejected memories (for audit).""" |
| return self.store.get_by_status(MemoryStatus.REJECTED) |
|
|
| @property |
| def scan_log(self) -> list[dict]: |
| return self._scan_log |
|
|