| import asyncio
|
| import os
|
| import uuid
|
| from datetime import datetime, timezone, timedelta
|
| from pathlib import Path
|
| from typing import Dict, Optional
|
| from unittest.mock import AsyncMock
|
|
|
| import pytest
|
| import pytest_asyncio
|
|
|
| from mnemocore.core.binary_hdv import BinaryHDV
|
| from mnemocore.core.config import reset_config
|
| from mnemocore.core.node import MemoryNode
|
|
|
| try:
|
| from mnemocore.core.engine import HAIMEngine
|
| _ENGINE_IMPORT_ERROR = None
|
| except (ModuleNotFoundError, ImportError) as exc:
|
| HAIMEngine = None
|
| _ENGINE_IMPORT_ERROR = exc
|
| pytestmark = pytest.mark.skip(
|
| reason=f"HAIMEngine unavailable in current branch state: {exc}"
|
| )
|
|
|
|
|
| @pytest_asyncio.fixture
|
| async def isolated_engine():
|
| root = Path(".tmp_phase43_tests") / str(uuid.uuid4())
|
| data_dir = root / "data"
|
| data_dir.mkdir(parents=True, exist_ok=True)
|
|
|
| os.environ["HAIM_DATA_DIR"] = str(data_dir)
|
| os.environ["HAIM_MEMORY_FILE"] = str(data_dir / "memory.jsonl")
|
| os.environ["HAIM_CODEBOOK_FILE"] = str(data_dir / "codebook.json")
|
| os.environ["HAIM_SYNAPSES_FILE"] = str(data_dir / "synapses.json")
|
| os.environ["HAIM_WARM_MMAP_DIR"] = str(data_dir / "warm")
|
| os.environ["HAIM_COLD_ARCHIVE_DIR"] = str(data_dir / "cold")
|
| os.environ["HAIM_DIMENSIONALITY"] = "1024"
|
|
|
| reset_config()
|
| engine = HAIMEngine()
|
| yield engine
|
|
|
| for key in [
|
| "HAIM_DATA_DIR",
|
| "HAIM_MEMORY_FILE",
|
| "HAIM_CODEBOOK_FILE",
|
| "HAIM_SYNAPSES_FILE",
|
| "HAIM_WARM_MMAP_DIR",
|
| "HAIM_COLD_ARCHIVE_DIR",
|
| "HAIM_DIMENSIONALITY",
|
| ]:
|
| os.environ.pop(key, None)
|
| reset_config()
|
|
|
|
|
| @pytest.mark.asyncio
|
| async def test_query_chrono_uses_batch_lookup(isolated_engine):
|
| engine = isolated_engine
|
| now = datetime.now(timezone.utc)
|
|
|
| node1 = MemoryNode(id="n1", hdv=BinaryHDV.random(engine.dimension), content="c1", created_at=now)
|
| node2 = MemoryNode(id="n2", hdv=BinaryHDV.random(engine.dimension), content="c2", created_at=now)
|
|
|
| engine.tier_manager.search = AsyncMock(return_value=[("n1", 0.9), ("n2", 0.8)])
|
| engine.tier_manager.get_memories_batch = AsyncMock(return_value=[node1, node2])
|
| engine.tier_manager.get_memory = AsyncMock(
|
| side_effect=AssertionError("Per-node get_memory() should not be used in chrono loop")
|
| )
|
| engine.tier_manager.get_hot_recent = AsyncMock(return_value=[])
|
|
|
| results = await engine.query(
|
| "chrono",
|
| top_k=2,
|
| associative_jump=False,
|
| track_gaps=False,
|
| chrono_weight=True,
|
| include_neighbors=False,
|
| )
|
|
|
| assert len(results) <= 2
|
| engine.tier_manager.get_memories_batch.assert_awaited_once()
|
| engine.tier_manager.get_memory.assert_not_awaited()
|
|
|
|
|
| @pytest.mark.asyncio
|
| async def test_query_include_neighbors_preserves_top_k_contract(isolated_engine):
|
| engine = isolated_engine
|
| now = datetime.now(timezone.utc)
|
|
|
| n1 = MemoryNode(
|
| id="n1",
|
| hdv=BinaryHDV.random(engine.dimension),
|
| content="n1",
|
| created_at=now,
|
| previous_id="p1",
|
| )
|
| n2 = MemoryNode(
|
| id="n2",
|
| hdv=BinaryHDV.random(engine.dimension),
|
| content="n2",
|
| created_at=now,
|
| previous_id="p2",
|
| )
|
| p1 = MemoryNode(id="p1", hdv=BinaryHDV.random(engine.dimension), content="p1", created_at=now)
|
| p2 = MemoryNode(id="p2", hdv=BinaryHDV.random(engine.dimension), content="p2", created_at=now)
|
|
|
| by_id: Dict[str, Optional[MemoryNode]] = {"n1": n1, "n2": n2, "p1": p1, "p2": p2}
|
|
|
| async def _get_memory(node_id: str):
|
| return by_id.get(node_id)
|
|
|
| engine.tier_manager.search = AsyncMock(return_value=[("n1", 0.9), ("n2", 0.8), ("n3", 0.7)])
|
| engine.tier_manager.get_hot_recent = AsyncMock(return_value=[])
|
| engine.tier_manager.get_memory = AsyncMock(side_effect=_get_memory)
|
| engine.tier_manager.use_qdrant = False
|
|
|
| results = await engine.query(
|
| "neighbors",
|
| top_k=2,
|
| associative_jump=False,
|
| track_gaps=False,
|
| chrono_weight=False,
|
| include_neighbors=True,
|
| )
|
|
|
| assert len(results) == 2
|
|
|
|
|
| @pytest.mark.asyncio
|
| async def test_background_dream_uses_semaphore_locked_api(isolated_engine):
|
| engine = isolated_engine
|
| engine.subconscious_queue.append("x")
|
| await engine._dream_sem.acquire()
|
| try:
|
| engine.tier_manager.get_memory = AsyncMock(
|
| side_effect=AssertionError("Should return early while semaphore is locked")
|
| )
|
| await engine._background_dream()
|
| engine.tier_manager.get_memory.assert_not_awaited()
|
| finally:
|
| engine._dream_sem.release()
|
|
|
|
|
| def _assert_linear_chain(nodes):
|
| ids = [n.id for n in nodes]
|
| prev = {n.id: n.previous_id for n in nodes}
|
|
|
| roots = [nid for nid, p in prev.items() if p is None]
|
| assert len(roots) == 1
|
|
|
| prev_non_none = [p for p in prev.values() if p is not None]
|
| assert len(prev_non_none) == len(nodes) - 1
|
| assert len(set(prev_non_none)) == len(prev_non_none)
|
| assert all(p in ids for p in prev_non_none)
|
|
|
| child_by_prev = {p: nid for nid, p in prev.items() if p is not None}
|
| current = roots[0]
|
| visited = {current}
|
| for _ in range(len(nodes) - 1):
|
| current = child_by_prev[current]
|
| assert current not in visited
|
| visited.add(current)
|
| assert len(visited) == len(nodes)
|
|
|
|
|
| @pytest.mark.asyncio
|
| async def test_persist_memory_concurrent_stores_keep_linear_previous_chain(isolated_engine):
|
| engine = isolated_engine
|
| engine.tier_manager.add_memory = AsyncMock(return_value=None)
|
| engine._append_persisted = AsyncMock(return_value=None)
|
|
|
| vec_a = BinaryHDV.random(engine.dimension)
|
| vec_b = BinaryHDV.random(engine.dimension)
|
| vec_c = BinaryHDV.random(engine.dimension)
|
|
|
| a, b, c = await asyncio.gather(
|
| engine._persist_memory("a", vec_a, {"eig": 0.1}),
|
| engine._persist_memory("b", vec_b, {"eig": 0.2}),
|
| engine._persist_memory("c", vec_c, {"eig": 0.3}),
|
| )
|
|
|
| _assert_linear_chain([a, b, c])
|
|
|
|
|
| @pytest.mark.asyncio
|
| async def test_get_stats_reports_engine_version_45(isolated_engine):
|
| engine = isolated_engine
|
| engine.tier_manager.get_stats = AsyncMock(return_value={"hot_count": 0, "warm_count": 0})
|
| stats = await engine.get_stats()
|
| assert stats["engine_version"] == "4.5.0"
|
|
|
|
|
| @pytest.mark.asyncio
|
| async def test_tier_manager_search_applies_hot_time_range_filter(isolated_engine):
|
| engine = isolated_engine
|
| tm = engine.tier_manager
|
| tm.use_qdrant = False
|
|
|
| now = datetime.now(timezone.utc)
|
| old_node = MemoryNode(
|
| id="old",
|
| hdv=BinaryHDV.random(engine.dimension),
|
| content="old",
|
| created_at=now - timedelta(days=2),
|
| )
|
| new_node = MemoryNode(
|
| id="new",
|
| hdv=BinaryHDV.random(engine.dimension),
|
| content="new",
|
| created_at=now,
|
| )
|
|
|
| tm.hot = {"old": old_node, "new": new_node}
|
| tm.search_hot = lambda query_vec, top_k=5: [("old", 0.95), ("new", 0.90)]
|
|
|
| query_vec = BinaryHDV.random(engine.dimension)
|
| results = await tm.search(
|
| query_vec,
|
| top_k=5,
|
| time_range=(now - timedelta(hours=1), now + timedelta(hours=1)),
|
| )
|
|
|
| assert [nid for nid, _ in results] == ["new"]
|
|
|
|
|
| @pytest.mark.asyncio
|
| async def test_orchestrate_orch_or_is_async_and_lock_guarded(isolated_engine):
|
| engine = isolated_engine
|
| node = MemoryNode(
|
| id="orch",
|
| hdv=BinaryHDV.random(engine.dimension),
|
| content="orch content",
|
| created_at=datetime.now(timezone.utc),
|
| )
|
| node.ltp_strength = 0.8
|
| node.epistemic_value = 0.4
|
| node.access_count = 5
|
| engine.tier_manager.hot[node.id] = node
|
|
|
| await engine.tier_manager.lock.acquire()
|
| task = asyncio.create_task(engine.orchestrate_orch_or(max_collapse=1))
|
| await asyncio.sleep(0.05)
|
| assert not task.done()
|
| engine.tier_manager.lock.release()
|
|
|
| collapsed = await task
|
| assert len(collapsed) == 1
|
| assert collapsed[0].id == "orch"
|
|
|