MnemoCore / tests /test_phase43_regressions.py
Granis87's picture
Initial upload of MnemoCore
dbb04e4 verified
import asyncio
import os
import uuid
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Dict, Optional
from unittest.mock import AsyncMock
import pytest
import pytest_asyncio
from mnemocore.core.binary_hdv import BinaryHDV
from mnemocore.core.config import reset_config
from mnemocore.core.node import MemoryNode
try:
from mnemocore.core.engine import HAIMEngine
_ENGINE_IMPORT_ERROR = None
except (ModuleNotFoundError, ImportError) as exc:
HAIMEngine = None
_ENGINE_IMPORT_ERROR = exc
pytestmark = pytest.mark.skip(
reason=f"HAIMEngine unavailable in current branch state: {exc}"
)
@pytest_asyncio.fixture
async def isolated_engine():
root = Path(".tmp_phase43_tests") / str(uuid.uuid4())
data_dir = root / "data"
data_dir.mkdir(parents=True, exist_ok=True)
os.environ["HAIM_DATA_DIR"] = str(data_dir)
os.environ["HAIM_MEMORY_FILE"] = str(data_dir / "memory.jsonl")
os.environ["HAIM_CODEBOOK_FILE"] = str(data_dir / "codebook.json")
os.environ["HAIM_SYNAPSES_FILE"] = str(data_dir / "synapses.json")
os.environ["HAIM_WARM_MMAP_DIR"] = str(data_dir / "warm")
os.environ["HAIM_COLD_ARCHIVE_DIR"] = str(data_dir / "cold")
os.environ["HAIM_DIMENSIONALITY"] = "1024"
reset_config()
engine = HAIMEngine()
yield engine
for key in [
"HAIM_DATA_DIR",
"HAIM_MEMORY_FILE",
"HAIM_CODEBOOK_FILE",
"HAIM_SYNAPSES_FILE",
"HAIM_WARM_MMAP_DIR",
"HAIM_COLD_ARCHIVE_DIR",
"HAIM_DIMENSIONALITY",
]:
os.environ.pop(key, None)
reset_config()
@pytest.mark.asyncio
async def test_query_chrono_uses_batch_lookup(isolated_engine):
engine = isolated_engine
now = datetime.now(timezone.utc)
node1 = MemoryNode(id="n1", hdv=BinaryHDV.random(engine.dimension), content="c1", created_at=now)
node2 = MemoryNode(id="n2", hdv=BinaryHDV.random(engine.dimension), content="c2", created_at=now)
engine.tier_manager.search = AsyncMock(return_value=[("n1", 0.9), ("n2", 0.8)])
engine.tier_manager.get_memories_batch = AsyncMock(return_value=[node1, node2])
engine.tier_manager.get_memory = AsyncMock(
side_effect=AssertionError("Per-node get_memory() should not be used in chrono loop")
)
engine.tier_manager.get_hot_recent = AsyncMock(return_value=[])
results = await engine.query(
"chrono",
top_k=2,
associative_jump=False,
track_gaps=False,
chrono_weight=True,
include_neighbors=False,
)
assert len(results) <= 2
engine.tier_manager.get_memories_batch.assert_awaited_once()
engine.tier_manager.get_memory.assert_not_awaited()
@pytest.mark.asyncio
async def test_query_include_neighbors_preserves_top_k_contract(isolated_engine):
engine = isolated_engine
now = datetime.now(timezone.utc)
n1 = MemoryNode(
id="n1",
hdv=BinaryHDV.random(engine.dimension),
content="n1",
created_at=now,
previous_id="p1",
)
n2 = MemoryNode(
id="n2",
hdv=BinaryHDV.random(engine.dimension),
content="n2",
created_at=now,
previous_id="p2",
)
p1 = MemoryNode(id="p1", hdv=BinaryHDV.random(engine.dimension), content="p1", created_at=now)
p2 = MemoryNode(id="p2", hdv=BinaryHDV.random(engine.dimension), content="p2", created_at=now)
by_id: Dict[str, Optional[MemoryNode]] = {"n1": n1, "n2": n2, "p1": p1, "p2": p2}
async def _get_memory(node_id: str):
return by_id.get(node_id)
engine.tier_manager.search = AsyncMock(return_value=[("n1", 0.9), ("n2", 0.8), ("n3", 0.7)])
engine.tier_manager.get_hot_recent = AsyncMock(return_value=[])
engine.tier_manager.get_memory = AsyncMock(side_effect=_get_memory)
engine.tier_manager.use_qdrant = False
results = await engine.query(
"neighbors",
top_k=2,
associative_jump=False,
track_gaps=False,
chrono_weight=False,
include_neighbors=True,
)
assert len(results) == 2
@pytest.mark.asyncio
async def test_background_dream_uses_semaphore_locked_api(isolated_engine):
engine = isolated_engine
engine.subconscious_queue.append("x")
await engine._dream_sem.acquire()
try:
engine.tier_manager.get_memory = AsyncMock(
side_effect=AssertionError("Should return early while semaphore is locked")
)
await engine._background_dream()
engine.tier_manager.get_memory.assert_not_awaited()
finally:
engine._dream_sem.release()
def _assert_linear_chain(nodes):
ids = [n.id for n in nodes]
prev = {n.id: n.previous_id for n in nodes}
roots = [nid for nid, p in prev.items() if p is None]
assert len(roots) == 1
prev_non_none = [p for p in prev.values() if p is not None]
assert len(prev_non_none) == len(nodes) - 1
assert len(set(prev_non_none)) == len(prev_non_none)
assert all(p in ids for p in prev_non_none)
child_by_prev = {p: nid for nid, p in prev.items() if p is not None}
current = roots[0]
visited = {current}
for _ in range(len(nodes) - 1):
current = child_by_prev[current]
assert current not in visited
visited.add(current)
assert len(visited) == len(nodes)
@pytest.mark.asyncio
async def test_persist_memory_concurrent_stores_keep_linear_previous_chain(isolated_engine):
engine = isolated_engine
engine.tier_manager.add_memory = AsyncMock(return_value=None)
engine._append_persisted = AsyncMock(return_value=None)
vec_a = BinaryHDV.random(engine.dimension)
vec_b = BinaryHDV.random(engine.dimension)
vec_c = BinaryHDV.random(engine.dimension)
a, b, c = await asyncio.gather(
engine._persist_memory("a", vec_a, {"eig": 0.1}),
engine._persist_memory("b", vec_b, {"eig": 0.2}),
engine._persist_memory("c", vec_c, {"eig": 0.3}),
)
_assert_linear_chain([a, b, c])
@pytest.mark.asyncio
async def test_get_stats_reports_engine_version_45(isolated_engine):
engine = isolated_engine
engine.tier_manager.get_stats = AsyncMock(return_value={"hot_count": 0, "warm_count": 0})
stats = await engine.get_stats()
assert stats["engine_version"] == "4.5.0"
@pytest.mark.asyncio
async def test_tier_manager_search_applies_hot_time_range_filter(isolated_engine):
engine = isolated_engine
tm = engine.tier_manager
tm.use_qdrant = False
now = datetime.now(timezone.utc)
old_node = MemoryNode(
id="old",
hdv=BinaryHDV.random(engine.dimension),
content="old",
created_at=now - timedelta(days=2),
)
new_node = MemoryNode(
id="new",
hdv=BinaryHDV.random(engine.dimension),
content="new",
created_at=now,
)
tm.hot = {"old": old_node, "new": new_node}
tm.search_hot = lambda query_vec, top_k=5: [("old", 0.95), ("new", 0.90)]
query_vec = BinaryHDV.random(engine.dimension)
results = await tm.search(
query_vec,
top_k=5,
time_range=(now - timedelta(hours=1), now + timedelta(hours=1)),
)
assert [nid for nid, _ in results] == ["new"]
@pytest.mark.asyncio
async def test_orchestrate_orch_or_is_async_and_lock_guarded(isolated_engine):
engine = isolated_engine
node = MemoryNode(
id="orch",
hdv=BinaryHDV.random(engine.dimension),
content="orch content",
created_at=datetime.now(timezone.utc),
)
node.ltp_strength = 0.8
node.epistemic_value = 0.4
node.access_count = 5
engine.tier_manager.hot[node.id] = node
await engine.tier_manager.lock.acquire()
task = asyncio.create_task(engine.orchestrate_orch_or(max_collapse=1))
await asyncio.sleep(0.05)
assert not task.done()
engine.tier_manager.lock.release()
collapsed = await task
assert len(collapsed) == 1
assert collapsed[0].id == "orch"