backend / simulation /agents /agent_memory.py
vish85521's picture
Upload 182 files
66f749a verified
"""
Agent memory system using ChromaDB for vector storage.
If ChromaDB (HTTP server) is not reachable, all operations become no-ops
so the simulation continues without memory — never falls back to the
embedded in-memory client which crashes on Windows (hnswlib access violation).
"""
import logging
import os
import time
from typing import List, Dict, Any, Optional
# Suppress PostHog telemetry noise (capture() argument mismatch)
os.environ.setdefault("ANONYMIZED_TELEMETRY", "False")
import chromadb
logger = logging.getLogger(__name__)
class AgentMemoryStore:
"""
Vector database for storing and retrieving agent memories.
Uses ChromaDB (HTTP mode only) for:
- Agent profile embeddings
- Past experiences and interactions
- RAG (Retrieval Augmented Generation) context for decisions
If ChromaDB is not available the store silently becomes a no-op so the
simulation can run without memory rather than crashing.
"""
def __init__(
self,
chroma_host: str = "localhost",
chroma_port: int = 8000,
collection_name: str = "agent_memories",
ssl: bool = False,
):
"""
Initialize connection to ChromaDB HTTP server.
Falls back to disabled (no-op) mode instead of in-memory mode to
avoid the Windows hnswlib C++ access violation crash.
"""
self.collection = None # None means disabled / no-op
try:
from chromadb.config import Settings as ChromaSettings
client = chromadb.HttpClient(
host=chroma_host,
port=chroma_port,
ssl=ssl,
settings=ChromaSettings(
chroma_client_auth_provider=None,
anonymized_telemetry=False, # suppress telemetry noise
),
)
# Heartbeat check — raises immediately if server is down
client.heartbeat()
self.collection = client.get_or_create_collection(
name=collection_name,
)
logger.info(f"Connected to ChromaDB at {chroma_host}:{chroma_port}")
except Exception as e:
# Do NOT fall back to chromadb.Client() — the embedded hnswlib
# backend crashes on Windows with an access violation.
logger.info(
f"ChromaDB not available at {chroma_host}:{chroma_port} "
f"({type(e).__name__}: {e}). "
f"Running without agent memory (simulation unaffected)."
)
self.collection = None
@property
def available(self) -> bool:
"""True if ChromaDB is connected and usable."""
return self.collection is not None
def create_agent_profile(self, agent_id: str, profile: Dict[str, Any]):
"""
Store agent's demographic profile and values.
No-op if ChromaDB is unavailable.
"""
if not self.available:
return
values = profile.get('values', [])
values_str = ', '.join(values) if values else 'Not specified'
profile_text = f"""
Demographics:
- Age: {profile.get('age', 'Unknown')}
- Gender: {profile.get('gender', 'Unknown')}
- Location: {profile.get('location', 'Unknown')}
- Education: {profile.get('education', 'Not specified')}
- Occupation: {profile.get('occupation', 'Not specified')}
Core values: {values_str}
"""
# Sanitize metadata: ChromaDB only accepts str, int, float, bool
sanitized_profile = {}
for key, value in profile.items():
if isinstance(value, (str, int, float, bool)):
sanitized_profile[key] = value
elif isinstance(value, list):
sanitized_profile[key] = ', '.join(str(v) for v in value)
elif value is None:
sanitized_profile[key] = ''
else:
sanitized_profile[key] = str(value)
metadata = {"agent_id": agent_id, "type": "profile", **sanitized_profile}
try:
existing = self.collection.get(ids=[f"{agent_id}_profile"])
if existing['ids']:
self.collection.update(
ids=[f"{agent_id}_profile"],
documents=[profile_text],
metadatas=[metadata]
)
else:
self.collection.add(
ids=[f"{agent_id}_profile"],
documents=[profile_text],
metadatas=[metadata]
)
logger.debug(f"Stored profile for agent {agent_id}")
except Exception as e:
logger.error(f"Failed to store profile for {agent_id}: {e}")
def add_experience(
self,
agent_id: str,
experience: str,
experience_type: str = "interaction"
):
"""
Store a past interaction or experience.
No-op if ChromaDB is unavailable.
"""
if not self.available:
return
experience_id = f"{agent_id}_exp_{int(time.time() * 1000)}"
try:
self.collection.add(
ids=[experience_id],
documents=[experience],
metadatas=[{
"agent_id": agent_id,
"type": experience_type,
"timestamp": time.time()
}]
)
logger.debug(f"Added experience for {agent_id}: {experience[:50]}...")
except Exception as e:
logger.error(f"Failed to add experience for {agent_id}: {e}")
def query_relevant_context(
self,
agent_id: str,
query: str,
n_results: int = 3
) -> List[str]:
"""
Retrieve relevant memories for decision-making (RAG).
Returns empty list if ChromaDB is unavailable.
"""
if not self.available:
return []
try:
results = self.collection.query(
query_texts=[query],
where={"agent_id": agent_id},
n_results=n_results
)
if results and results['documents']:
return results['documents'][0]
return []
except Exception as e:
logger.error(f"Failed to query memories for {agent_id}: {e}")
return []
def get_agent_profile(self, agent_id: str) -> Optional[Dict[str, Any]]:
"""
Get stored profile for an agent.
Returns None if ChromaDB is unavailable.
"""
if not self.available:
return None
try:
results = self.collection.get(
ids=[f"{agent_id}_profile"],
include=["metadatas"]
)
if results and results['metadatas']:
return results['metadatas'][0]
return None
except Exception as e:
logger.error(f"Failed to get profile for {agent_id}: {e}")
return None
def clear_agent_memories(self, agent_id: str):
"""
Remove all memories for an agent.
No-op if ChromaDB is unavailable.
"""
if not self.available:
return
try:
results = self.collection.get(
where={"agent_id": agent_id},
include=["metadatas"]
)
if results and results['ids']:
self.collection.delete(ids=results['ids'])
logger.info(f"Cleared {len(results['ids'])} memories for {agent_id}")
except Exception as e:
logger.error(f"Failed to clear memories for {agent_id}: {e}")
def clear_all(self):
"""
Clear entire collection.
No-op if ChromaDB is unavailable.
"""
if not self.available:
return
try:
self.collection.delete(where={})
logger.info("Cleared all agent memories")
except Exception as e:
logger.error(f"Failed to clear all memories: {e}")