File size: 4,680 Bytes
eef94ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
memory_ci.py β€” Memory Continuous Integration pipeline.

Candidate memories go through a strict promotion pipeline:

    candidate β†’ immune_scan β†’ quarantine β†’ replay_test β†’ promote/reject

No memory reaches the agent's prompt without passing every gate.
"""
from __future__ import annotations

import logging
from typing import Any

from purpose_agent.memory import MemoryCard, MemoryKind, MemoryStatus, MemoryStore
from purpose_agent.immune import scan_memory, ScanResult

logger = logging.getLogger(__name__)


class MemoryCI:
    """
    Continuous integration pipeline for agent memories.

    Usage:
        ci = MemoryCI(memory_store)

        # Submit a candidate memory
        card = MemoryCard(kind=MemoryKind.SKILL_CARD, pattern="...", strategy="...")
        result = ci.submit(card)

        # result.status is now QUARANTINED (passed scan) or REJECTED (failed scan)

        # After replay testing confirms it helps:
        ci.promote(card.id)

        # Or if replay shows it hurts:
        ci.reject(card.id, reason="decreased success rate in replay test")
    """

    def __init__(self, store: MemoryStore):
        self.store = store
        self._scan_log: list[dict[str, Any]] = []

    def submit(self, card: MemoryCard) -> MemoryCard:
        """
        Submit a candidate memory for the CI pipeline.

        Steps:
        1. Add to store as CANDIDATE
        2. Run immune scan
        3. If scan passes β†’ QUARANTINED (awaiting replay test)
        4. If scan fails β†’ REJECTED (with reason)
        """
        card.status = MemoryStatus.CANDIDATE
        self.store.add(card)

        # Immune scan
        scan_result = scan_memory(card)
        card.immune_scan_result = {
            "passed": scan_result.passed,
            "threats": scan_result.threats,
            "severity": scan_result.severity,
        }

        self._scan_log.append({
            "card_id": card.id,
            "kind": card.kind.value,
            "scan_passed": scan_result.passed,
            "threats": scan_result.threats,
        })

        if scan_result.passed:
            self.store.update_status(card.id, MemoryStatus.QUARANTINED)
            logger.info(f"MemoryCI: {card.id} β†’ QUARANTINED (scan passed)")
        else:
            reason = f"Immune scan failed: {', '.join(scan_result.threats)} (severity={scan_result.severity})"
            self.store.update_status(card.id, MemoryStatus.REJECTED, reason=reason)
            logger.warning(f"MemoryCI: {card.id} β†’ REJECTED ({reason})")

        return card

    def promote(self, card_id: str) -> bool:
        """
        Promote a quarantined memory to active use.
        Only quarantined memories can be promoted.
        """
        card = self.store.get(card_id)
        if not card:
            logger.warning(f"MemoryCI: card {card_id} not found")
            return False

        if card.status != MemoryStatus.QUARANTINED:
            logger.warning(
                f"MemoryCI: cannot promote {card_id} β€” "
                f"status is {card.status.value}, expected quarantined"
            )
            return False

        self.store.update_status(card_id, MemoryStatus.PROMOTED)
        logger.info(f"MemoryCI: {card_id} β†’ PROMOTED")
        return True

    def reject(self, card_id: str, reason: str = "") -> bool:
        """Reject a memory (from any non-promoted status)."""
        card = self.store.get(card_id)
        if not card:
            return False

        if card.status == MemoryStatus.PROMOTED:
            # Demote: promoted β†’ archived (don't delete, keep for audit)
            self.store.update_status(card_id, MemoryStatus.ARCHIVED, reason=reason)
            logger.info(f"MemoryCI: {card_id} DEMOTED β†’ ARCHIVED ({reason})")
        else:
            self.store.update_status(card_id, MemoryStatus.REJECTED, reason=reason)
            logger.info(f"MemoryCI: {card_id} β†’ REJECTED ({reason})")
        return True

    def archive(self, card_id: str) -> bool:
        """Archive a promoted memory (soft delete β€” keeps for audit trail)."""
        card = self.store.get(card_id)
        if not card:
            return False
        self.store.update_status(card_id, MemoryStatus.ARCHIVED)
        return True

    def get_quarantined(self) -> list[MemoryCard]:
        """Get all memories awaiting replay testing."""
        return self.store.get_by_status(MemoryStatus.QUARANTINED)

    def get_rejected(self) -> list[MemoryCard]:
        """Get all rejected memories (for audit)."""
        return self.store.get_by_status(MemoryStatus.REJECTED)

    @property
    def scan_log(self) -> list[dict]:
        return self._scan_log