| import json |
| import asyncio |
| from uuid import uuid4 |
| from pydantic import Field |
| from datetime import datetime |
| from typing import Optional, List, Tuple, Dict, Union |
|
|
| from evoagentx.agents import Agent |
| from evoagentx.core.parser import Parser |
| from evoagentx.models import BaseLLM |
| from evoagentx.core.logging import logger |
| from evoagentx.models import OpenAILLMConfig |
| from evoagentx.storages.base import StorageHandler |
| from evoagentx.core.message import Message, MessageType |
| from evoagentx.memory.memory_manager import MemoryManager |
| from evoagentx.memory.long_term_memory import LongTermMemory |
| from evoagentx.actions.action import Action, ActionInput, ActionOutput |
| from evoagentx.rag.rag_config import RAGConfig |
|
|
|
|
| class MemoryActionInput(ActionInput): |
| user_prompt: str = Field(description="The user's input prompt") |
| conversation_id: Optional[str] = Field(default=None, description="ID for tracking conversation") |
| top_k: Optional[int] = Field(default=5, description="Number of memory results to retrieve") |
| metadata_filters: Optional[Dict] = Field(default=None, description="Filters for memory retrieval") |
|
|
|
|
| class MemoryActionOutput(ActionOutput): |
| response: str = Field(description="The agent's response based on memory and prompt") |
|
|
|
|
| class MemoryAction(Action): |
| def __init__( |
| self, |
| name: str = "MemoryAction", |
| description: str = "Action that processes user input with long-term memory context", |
| prompt: str = "Based on the following context and user prompt, provide a relevant response:\n\nContext: {context}\n\nUser Prompt: {user_prompt}\n\n", |
| inputs_format: ActionInput = None, |
| outputs_format: ActionOutput = None, |
| **kwargs |
| ): |
| inputs_format = inputs_format or MemoryActionInput |
| outputs_format = outputs_format or MemoryActionOutput |
| super().__init__( |
| name=name, |
| description=description, |
| prompt=prompt, |
| inputs_format=inputs_format, |
| outputs_format=outputs_format, |
| **kwargs |
| ) |
|
|
| def execute(self, llm: BaseLLM | None = None, |
| inputs: Dict | None = None, |
| sys_msg: str | None = None, |
| return_prompt: bool = False, |
| memory_manager: Optional[MemoryManager] = None, |
| **kwargs |
| ) -> Parser | Tuple[Parser | str] | None: |
| return asyncio.run(self.async_execute(llm, inputs, sys_msg, return_prompt, memory_manager, **kwargs)) |
|
|
| async def async_execute( |
| self, |
| llm: Optional["BaseLLM"] = None, |
| inputs: Optional[Dict] = None, |
| sys_msg: Optional[str] = None, |
| return_prompt: bool = False, |
| memory_manager: Optional[MemoryManager] = None, |
| **kwargs |
| ) -> Union[MemoryActionOutput, tuple]: |
| if not memory_manager: |
| logger.error("MemoryManager is required for MemoryAction execution") |
| raise ValueError("MemoryManager is required for MemoryAction") |
|
|
| action_input = self.inputs_format(**inputs) |
| user_prompt = action_input.user_prompt |
| conversation_id = action_input.conversation_id |
| if not conversation_id: |
| conversation_id = str(uuid4()) |
| logger.warning("No conversation_id provided; generated a new UUID4 for this session") |
| top_k = action_input.top_k |
| metadata_filters = action_input.metadata_filters |
|
|
| message = await memory_manager.create_conversation_message( |
| user_prompt=user_prompt, |
| conversation_id=conversation_id, |
| top_k=top_k, |
| metadata_filters=metadata_filters |
| ) |
|
|
| action_input_attrs = self.inputs_format.get_attrs() |
| action_input_data = {attr: getattr(action_input, attr, "undefined") for attr in action_input_attrs} |
| action_input_data["context"] = message.content |
| prompt = self.prompt.format(**action_input_data) |
| logger.info(f"The New Created Message by LongTermMemory:\n\n{prompt}") |
|
|
| output = await llm.async_generate( |
| prompt=prompt, |
| system_message=sys_msg, |
| parser=self.outputs_format, |
| parse_mode='str' |
| ) |
| |
| response_message = Message( |
| content=output.content, |
| msg_type=MessageType.RESPONSE, |
| timestamp=datetime.now().isoformat(), |
| conversation_id=conversation_id, |
| memory_ids=message.memory_ids |
| ) |
| memory_ids = await memory_manager.handle_memory( |
| action="add", |
| data=response_message, |
| ) |
|
|
| |
| final_output = self.outputs_format( |
| response=output.content, |
| memory_ids=memory_ids |
| ) |
|
|
| if return_prompt: |
| return final_output, prompt |
| return final_output |
|
|
|
|
| class MemoryAgent(Agent): |
| memory_manager: Optional[MemoryManager] = Field(default=None, description="Manager for long-term memory operations") |
| inputs: List[Dict] = Field(default_factory=list, description="Input specifications for the memory action") |
| outputs: List[Dict] = Field(default_factory=list, description="Output specifications for the memory action") |
|
|
| def __init__( |
| self, |
| name: str = "MemoryAgent", |
| description: str = "An agent that uses long-term memory to provide context-aware responses", |
| inputs: Optional[List[Dict]] = None, |
| outputs: Optional[List[Dict]] = None, |
| llm_config: Optional[OpenAILLMConfig] = None, |
| storage_handler: Optional[StorageHandler] = None, |
| rag_config: Optional[RAGConfig] = None, |
| conversation_id: Optional[str] = None, |
| system_prompt: Optional[str] = None, |
| prompt: str = "Based on the following context and user prompt, provide a relevant response:\n\nContext: {context}\n\nUser Prompt: {user_prompt}", |
| **kwargs |
| ): |
| |
| inputs = inputs or [] |
| outputs = outputs or [] |
|
|
| |
| super().__init__( |
| name=name, |
| description=description, |
| llm_config=llm_config, |
| system_prompt=system_prompt, |
| storage_handler=storage_handler, |
| inputs=inputs, |
| outputs=outputs, |
| **kwargs |
| ) |
|
|
| self.long_term_memory = LongTermMemory( |
| storage_handler=storage_handler, |
| rag_config=rag_config, |
| default_corpus_id=conversation_id |
| ) |
| self.memory_manager = MemoryManager( |
| memory=self.long_term_memory, |
| llm=llm_config.get_llm() if llm_config else None, |
| use_llm_management=True |
| ) |
|
|
| |
| self.inputs = inputs |
| self.outputs = outputs |
|
|
| |
| self.actions = [] |
| self._action_map = {} |
| memory_action = MemoryAction( |
| name="MemoryAction", |
| description="Action that processes user input with long-term memory context", |
| prompt=prompt, |
| inputs_format=MemoryActionInput, |
| outputs_format=MemoryActionOutput |
| ) |
| self.add_action(memory_action) |
|
|
| def _create_output_message( |
| self, |
| action_output, |
| action_name: str, |
| action_input_data: Optional[Dict], |
| prompt: str, |
| return_msg_type: MessageType = MessageType.RESPONSE, |
| **kwargs |
| ) -> Message: |
| msg = super()._create_output_message( |
| action_output=action_output, |
| action_name=action_name, |
| action_input_data=action_input_data, |
| prompt=prompt, |
| return_msg_type=return_msg_type, |
| **kwargs |
| ) |
|
|
| if action_input_data and "user_prompt" in action_input_data: |
| user_msg = Message( |
| content=action_input_data["user_prompt"], |
| msg_type=MessageType.REQUEST, |
| conversation_id=msg.conversation_id |
| ) |
| asyncio.create_task(self.memory_manager.handle_memory(action="add", data=user_msg)) |
|
|
| response_msg = Message( |
| content=action_output.response if hasattr(action_output, "response") else str(action_output), |
| msg_type=MessageType.RESPONSE, |
| conversation_id=msg.conversation_id |
| ) |
| asyncio.create_task(self.memory_manager.handle_memory(action="add", data=response_msg)) |
|
|
| return msg |
|
|
| async def async_execute( |
| self, |
| action_name: str, |
| msgs: Optional[List[Message]] = None, |
| action_input_data: Optional[Dict] = None, |
| return_msg_type: Optional[MessageType] = MessageType.RESPONSE, |
| return_action_input_data: Optional[bool] = False, |
| **kwargs |
| ) -> Union[Message, Tuple[Message, Dict]]: |
| """ |
| Execute an action asynchronously with memory management. |
| |
| Args: |
| action_name: Name of the action to execute |
| msgs: Optional list of messages providing context |
| action_input_data: Optional input data for the action |
| return_msg_type: Message type for the return message |
| return_action_input_data: Whether to return the action input data |
| **kwargs: Additional parameters |
| |
| Returns: |
| Message or tuple: The execution result, optionally with input data |
| """ |
| action, action_input_data = self._prepare_execution( |
| action_name=action_name, |
| msgs=msgs, |
| action_input_data=action_input_data, |
| **kwargs |
| ) |
|
|
| |
| execution_results = await action.async_execute( |
| llm=self.llm, |
| inputs=action_input_data, |
| sys_msg=self.system_prompt, |
| return_prompt=True, |
| memory_manager=self.memory_manager, |
| **kwargs |
| ) |
| action_output, prompt = execution_results |
|
|
| message = self._create_output_message( |
| action_output=action_output, |
| prompt=prompt, |
| action_name=action_name, |
| return_msg_type=return_msg_type, |
| action_input_data=action_input_data, |
| **kwargs |
| ) |
| if return_action_input_data: |
| return message, action_input_data |
| return message |
|
|
| def execute( |
| self, |
| action_name: str, |
| msgs: Optional[List[Message]] = None, |
| action_input_data: Optional[Dict] = None, |
| return_msg_type: Optional[MessageType] = MessageType.RESPONSE, |
| return_action_input_data: Optional[bool] = False, |
| **kwargs |
| ) -> Union[Message, Tuple[Message, Dict]]: |
| """ |
| Execute an action synchronously with memory management. |
| |
| Args: |
| action_name: Name of the action to execute |
| msgs: Optional list of messages providing context |
| action_input_data: Optional input data for the action |
| return_msg_type: Message type for the return message |
| return_action_input_data: Whether to return the action input data |
| **kwargs: Additional parameters |
| |
| Returns: |
| Message or tuple: The execution result, optionally with input data |
| """ |
| action, action_input_data = self._prepare_execution( |
| action_name=action_name, |
| msgs=msgs, |
| action_input_data=action_input_data, |
| **kwargs |
| ) |
|
|
| |
| execution_results = action.execute( |
| llm=self.llm, |
| inputs=action_input_data, |
| sys_msg=self.system_prompt, |
| return_prompt=True, |
| memory_manager=self.memory_manager, |
| **kwargs |
| ) |
| action_output, prompt = execution_results |
|
|
| message = self._create_output_message( |
| action_output=action_output, |
| prompt=prompt, |
| action_name=action_name, |
| return_msg_type=return_msg_type, |
| action_input_data=action_input_data, |
| **kwargs |
| ) |
| if return_action_input_data: |
| return message, action_input_data |
| return message |
|
|
| def chat( |
| self, |
| user_prompt: str, |
| *, |
| conversation_id: Optional[str] = None, |
| top_k: Optional[int] = None, |
| metadata_filters: Optional[dict] = None, |
| return_message: bool = True, |
| **kwargs |
| ): |
| action_input_data = { |
| "user_prompt": user_prompt, |
| "conversation_id": conversation_id or self._default_conversation_id(), |
| "top_k": top_k if top_k is not None else 3, |
| "metadata_filters": metadata_filters or {}, |
| } |
| msg = self.execute( |
| action_name="MemoryAction", |
| action_input_data=action_input_data, |
| return_msg_type=MessageType.RESPONSE, |
| **kwargs |
| ) |
| return msg if return_message else (getattr(msg, "content", None) or str(msg)) |
|
|
|
|
| async def async_chat( |
| self, |
| user_prompt: str, |
| *, |
| conversation_id: Optional[str] = None, |
| top_k: Optional[int] = None, |
| metadata_filters: Optional[dict] = None, |
| return_message: bool = True, |
| **kwargs |
| ): |
| action_input_data = { |
| "user_prompt": user_prompt, |
| "conversation_id": conversation_id or self._default_conversation_id(), |
| "top_k": top_k if top_k is not None else 3, |
| "metadata_filters": metadata_filters or {}, |
| } |
| msg = await self.async_execute( |
| action_name="MemoryAction", |
| action_input_data=action_input_data, |
| return_msg_type=MessageType.RESPONSE, |
| **kwargs |
| ) |
| return msg if return_message else (getattr(msg, "content", None) or str(msg)) |
|
|
|
|
| def _default_conversation_id(self) -> str: |
| """ |
| Session scope: By default, a new uuid4() is returned (new session). |
| User/global scope: Reuse LongTermMemory.default_corpus_id (stable namespace). |
| Note: The final ID is still uniformly managed by MemoryAgent._prepare_execution() (which will override based on the scope). |
| """ |
| scope = getattr(self, "conversation_scope", "session") |
| if scope == "session": |
| return str(uuid4()) |
| return getattr(getattr(self, "long_term_memory", None), "default_corpus_id", None) or "global_corpus" |
| |
| async def interactive_chat( |
| self, |
| conversation_id: Optional[str] = None, |
| top_k: int = 3, |
| metadata_filters: Optional[dict] = None |
| ): |
| """ |
| In interactive chat, each round of input will: |
| 1. Retrieve from memory |
| 2. Generate a response based on historical context |
| 3. Write the input/output to long-term memory and refresh the index |
| """ |
| conversation_id = conversation_id or self._default_conversation_id() |
| metadata_filters = metadata_filters or {} |
|
|
| print("💬 MemoryAgent has been started (type 'exit' to quit)\n") |
|
|
| while True: |
| user_prompt = input("You: ").strip() |
| if user_prompt.lower() in ["exit", "quit"]: |
| print("🔚 Conversation ended") |
| break |
|
|
| |
| retrieved_memories = await self.memory_manager.handle_memory( |
| action="search", |
| user_prompt=user_prompt, |
| top_k=top_k, |
| metadata_filters=metadata_filters |
| ) |
|
|
| context_texts = [] |
| for msg, _ in retrieved_memories: |
| if hasattr(msg, "content") and msg.content: |
| context_texts.append(msg.content) |
| context_str = "\n".join(context_texts) |
|
|
| |
| |
|
|
| |
| full_prompt = f"Context:\n{context_str}\n\nUser: {user_prompt}" if context_str else user_prompt |
| msg = await self.async_chat( |
| user_prompt=full_prompt, |
| conversation_id=conversation_id, |
| top_k=top_k, |
| metadata_filters=metadata_filters |
| ) |
|
|
| print(f"Agent: {msg.content}\n") |
|
|
| |
| if hasattr(self.memory_manager, "handle_memory_flush"): |
| await self.memory_manager.handle_memory_flush() |
| else: |
| await asyncio.sleep(0.1) |
|
|
|
|
|
|
| def save_module(self, path: str, ignore: List[str] = ["llm", "llm_config", "memory_manager"], **kwargs) -> str: |
| """ |
| Save the agent's configuration to a JSON file, excluding memory_manager by default. |
| |
| Args: |
| path: File path to save the configuration |
| ignore: List of keys to exclude from the saved configuration |
| **kwargs: Additional parameters for saving |
| |
| Returns: |
| str: The path where the configuration was saved |
| """ |
| return super().save_module(path=path, ignore=ignore, **kwargs) |
|
|
| @classmethod |
| def from_file(cls, path: str, llm_config: OpenAILLMConfig, storage_handler: Optional[StorageHandler] = None, rag_config: Optional[RAGConfig] = None, **kwargs) -> "MemoryAgent": |
| """ |
| Load a MemoryAgent from a JSON configuration file. |
| |
| Args: |
| path: Path to the JSON configuration file |
| llm_config: LLM configuration |
| storage_handler: Optional storage handler |
| rag_config: Optional RAG configuration |
| **kwargs: Additional parameters |
| |
| Returns: |
| MemoryAgent: The loaded agent instance |
| """ |
| with open(path, 'r', encoding='utf-8') as f: |
| config = json.load(f) |
| return cls( |
| name=config.get("name", "MemoryAgent"), |
| description=config.get("description", "An agent that uses long-term memory"), |
| llm_config=llm_config, |
| storage_handler=storage_handler, |
| rag_config=rag_config, |
| system_prompt=config.get("system_prompt"), |
| prompt=config.get("prompt"), |
| use_long_term_memory=config.get("use_long_term_memory", True), |
| **kwargs |
| ) |