| """
|
| Agent memory system using ChromaDB for vector storage.
|
|
|
| If ChromaDB (HTTP server) is not reachable, all operations become no-ops
|
| so the simulation continues without memory — never falls back to the
|
| embedded in-memory client which crashes on Windows (hnswlib access violation).
|
| """
|
| import logging
|
| import os
|
| import time
|
| from typing import List, Dict, Any, Optional
|
|
|
|
|
| os.environ.setdefault("ANONYMIZED_TELEMETRY", "False")
|
| import chromadb
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class AgentMemoryStore:
|
| """
|
| Vector database for storing and retrieving agent memories.
|
|
|
| Uses ChromaDB (HTTP mode only) for:
|
| - Agent profile embeddings
|
| - Past experiences and interactions
|
| - RAG (Retrieval Augmented Generation) context for decisions
|
|
|
| If ChromaDB is not available the store silently becomes a no-op so the
|
| simulation can run without memory rather than crashing.
|
| """
|
|
|
| def __init__(
|
| self,
|
| chroma_host: str = "localhost",
|
| chroma_port: int = 8000,
|
| collection_name: str = "agent_memories",
|
| ssl: bool = False,
|
| ):
|
| """
|
| Initialize connection to ChromaDB HTTP server.
|
|
|
| Falls back to disabled (no-op) mode instead of in-memory mode to
|
| avoid the Windows hnswlib C++ access violation crash.
|
| """
|
| self.collection = None
|
|
|
| try:
|
| from chromadb.config import Settings as ChromaSettings
|
| client = chromadb.HttpClient(
|
| host=chroma_host,
|
| port=chroma_port,
|
| ssl=ssl,
|
| settings=ChromaSettings(
|
| chroma_client_auth_provider=None,
|
| anonymized_telemetry=False,
|
| ),
|
| )
|
|
|
|
|
| client.heartbeat()
|
|
|
| self.collection = client.get_or_create_collection(
|
| name=collection_name,
|
| )
|
| logger.info(f"Connected to ChromaDB at {chroma_host}:{chroma_port}")
|
|
|
| except Exception as e:
|
|
|
|
|
| logger.info(
|
| f"ChromaDB not available at {chroma_host}:{chroma_port} "
|
| f"({type(e).__name__}: {e}). "
|
| f"Running without agent memory (simulation unaffected)."
|
| )
|
| self.collection = None
|
|
|
| @property
|
| def available(self) -> bool:
|
| """True if ChromaDB is connected and usable."""
|
| return self.collection is not None
|
|
|
| def create_agent_profile(self, agent_id: str, profile: Dict[str, Any]):
|
| """
|
| Store agent's demographic profile and values.
|
| No-op if ChromaDB is unavailable.
|
| """
|
| if not self.available:
|
| return
|
|
|
| values = profile.get('values', [])
|
| values_str = ', '.join(values) if values else 'Not specified'
|
|
|
| profile_text = f"""
|
| Demographics:
|
| - Age: {profile.get('age', 'Unknown')}
|
| - Gender: {profile.get('gender', 'Unknown')}
|
| - Location: {profile.get('location', 'Unknown')}
|
| - Education: {profile.get('education', 'Not specified')}
|
| - Occupation: {profile.get('occupation', 'Not specified')}
|
|
|
| Core values: {values_str}
|
| """
|
|
|
|
|
| sanitized_profile = {}
|
| for key, value in profile.items():
|
| if isinstance(value, (str, int, float, bool)):
|
| sanitized_profile[key] = value
|
| elif isinstance(value, list):
|
| sanitized_profile[key] = ', '.join(str(v) for v in value)
|
| elif value is None:
|
| sanitized_profile[key] = ''
|
| else:
|
| sanitized_profile[key] = str(value)
|
|
|
| metadata = {"agent_id": agent_id, "type": "profile", **sanitized_profile}
|
|
|
| try:
|
| existing = self.collection.get(ids=[f"{agent_id}_profile"])
|
| if existing['ids']:
|
| self.collection.update(
|
| ids=[f"{agent_id}_profile"],
|
| documents=[profile_text],
|
| metadatas=[metadata]
|
| )
|
| else:
|
| self.collection.add(
|
| ids=[f"{agent_id}_profile"],
|
| documents=[profile_text],
|
| metadatas=[metadata]
|
| )
|
| logger.debug(f"Stored profile for agent {agent_id}")
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to store profile for {agent_id}: {e}")
|
|
|
| def add_experience(
|
| self,
|
| agent_id: str,
|
| experience: str,
|
| experience_type: str = "interaction"
|
| ):
|
| """
|
| Store a past interaction or experience.
|
| No-op if ChromaDB is unavailable.
|
| """
|
| if not self.available:
|
| return
|
|
|
| experience_id = f"{agent_id}_exp_{int(time.time() * 1000)}"
|
|
|
| try:
|
| self.collection.add(
|
| ids=[experience_id],
|
| documents=[experience],
|
| metadatas=[{
|
| "agent_id": agent_id,
|
| "type": experience_type,
|
| "timestamp": time.time()
|
| }]
|
| )
|
| logger.debug(f"Added experience for {agent_id}: {experience[:50]}...")
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to add experience for {agent_id}: {e}")
|
|
|
| def query_relevant_context(
|
| self,
|
| agent_id: str,
|
| query: str,
|
| n_results: int = 3
|
| ) -> List[str]:
|
| """
|
| Retrieve relevant memories for decision-making (RAG).
|
| Returns empty list if ChromaDB is unavailable.
|
| """
|
| if not self.available:
|
| return []
|
|
|
| try:
|
| results = self.collection.query(
|
| query_texts=[query],
|
| where={"agent_id": agent_id},
|
| n_results=n_results
|
| )
|
|
|
| if results and results['documents']:
|
| return results['documents'][0]
|
| return []
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to query memories for {agent_id}: {e}")
|
| return []
|
|
|
| def get_agent_profile(self, agent_id: str) -> Optional[Dict[str, Any]]:
|
| """
|
| Get stored profile for an agent.
|
| Returns None if ChromaDB is unavailable.
|
| """
|
| if not self.available:
|
| return None
|
|
|
| try:
|
| results = self.collection.get(
|
| ids=[f"{agent_id}_profile"],
|
| include=["metadatas"]
|
| )
|
|
|
| if results and results['metadatas']:
|
| return results['metadatas'][0]
|
| return None
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to get profile for {agent_id}: {e}")
|
| return None
|
|
|
| def clear_agent_memories(self, agent_id: str):
|
| """
|
| Remove all memories for an agent.
|
| No-op if ChromaDB is unavailable.
|
| """
|
| if not self.available:
|
| return
|
|
|
| try:
|
| results = self.collection.get(
|
| where={"agent_id": agent_id},
|
| include=["metadatas"]
|
| )
|
|
|
| if results and results['ids']:
|
| self.collection.delete(ids=results['ids'])
|
| logger.info(f"Cleared {len(results['ids'])} memories for {agent_id}")
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to clear memories for {agent_id}: {e}")
|
|
|
| def clear_all(self):
|
| """
|
| Clear entire collection.
|
| No-op if ChromaDB is unavailable.
|
| """
|
| if not self.available:
|
| return
|
|
|
| try:
|
| self.collection.delete(where={})
|
| logger.info("Cleared all agent memories")
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to clear all memories: {e}")
|
|
|