| """ |
| Business logic for agents, workflows, and executions. |
| """ |
| import logging |
| |
| from datetime import datetime |
| from typing import List, Dict, Any, Optional, Tuple |
| from bson import ObjectId |
|
|
| from evoagentx.app.db import ( |
| Database, |
| AgentStatus, WorkflowStatus, ExecutionStatus |
| ) |
| from evoagentx.app.schemas import ( |
| AgentCreate, AgentUpdate, WorkflowCreate, WorkflowUpdate, |
| ExecutionCreate, PaginationParams, SearchParams |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| class AgentService: |
| @staticmethod |
| async def create_agent(agent_data: AgentCreate, user_id: Optional[str] = None) -> Dict[str, Any]: |
| """Create a new agent.""" |
| agent_dict = agent_data.dict() |
| agent_dict["created_by"] = user_id |
| agent_dict["created_at"] = datetime.utcnow() |
| agent_dict["updated_at"] = agent_dict["created_at"] |
| agent_dict["status"] = AgentStatus.CREATED |
| |
| |
| existing_agent = await Database.agents.find_one({"name": agent_dict["name"]}) |
| if existing_agent: |
| raise ValueError(f"Agent with name '{agent_dict['name']}' already exists") |
| |
| result = await Database.agents.insert_one(agent_dict) |
| agent_dict["_id"] = result.inserted_id |
| |
| logger.info(f"Created agent {agent_dict['name']} with ID {result.inserted_id}") |
| |
| return agent_dict |
| |
| @staticmethod |
| async def get_agent(agent_id: str) -> Optional[Dict[str, Any]]: |
| """Get an agent by ID.""" |
| if not ObjectId.is_valid(agent_id): |
| raise ValueError(f"Invalid agent ID: {agent_id}") |
| |
| agent = await Database.agents.find_one({"_id": ObjectId(agent_id)}) |
| return agent |
| |
| @staticmethod |
| async def get_agent_by_name(name: str) -> Optional[Dict[str, Any]]: |
| """Get an agent by name.""" |
| return await Database.agents.find_one({"name": name}) |
| |
| @staticmethod |
| async def update_agent(agent_id: str, agent_data: AgentUpdate) -> Optional[Dict[str, Any]]: |
| """Update an agent.""" |
| if not ObjectId.is_valid(agent_id): |
| raise ValueError(f"Invalid agent ID: {agent_id}") |
| |
| agent = await Database.agents.find_one({"_id": ObjectId(agent_id)}) |
| if not agent: |
| return None |
| |
| update_data = agent_data.dict(exclude_unset=True) |
| update_data["updated_at"] = datetime.utcnow() |
| |
| if "name" in update_data: |
| |
| existing = await Database.agents.find_one({ |
| "name": update_data["name"], |
| "_id": {"$ne": ObjectId(agent_id)} |
| }) |
| if existing: |
| raise ValueError(f"Agent with name '{update_data['name']}' already exists") |
| |
| await Database.agents.update_one( |
| {"_id": ObjectId(agent_id)}, |
| {"$set": update_data} |
| ) |
| |
| updated_agent = await Database.agents.find_one({"_id": ObjectId(agent_id)}) |
| logger.info(f"Updated agent {agent_id}") |
| |
| return updated_agent |
| |
| @staticmethod |
| async def delete_agent(agent_id: str) -> bool: |
| """Delete an agent.""" |
| if not ObjectId.is_valid(agent_id): |
| raise ValueError(f"Invalid agent ID: {agent_id}") |
| |
| |
| workflow_count = await Database.workflows.count_documents({"agent_ids": agent_id}) |
| if workflow_count > 0: |
| raise ValueError(f"Cannot delete agent {agent_id} as it is used in {workflow_count} workflows") |
| |
| result = await Database.agents.delete_one({"_id": ObjectId(agent_id)}) |
| if result.deleted_count: |
| logger.info(f"Deleted agent {agent_id}") |
| return True |
| return False |
| |
| @staticmethod |
| async def list_agents( |
| params: PaginationParams, |
| search: Optional[SearchParams] = None |
| ) -> Tuple[List[Dict[str, Any]], int]: |
| """List agents with pagination and search.""" |
| query = {} |
| |
| if search: |
| if search.query: |
| query["$text"] = {"$search": search.query} |
| |
| if search.tags: |
| query["tags"] = {"$all": search.tags} |
| |
| if search.status: |
| query["status"] = search.status |
| |
| if search.start_date and search.end_date: |
| query["created_at"] = { |
| "$gte": search.start_date, |
| "$lte": search.end_date |
| } |
| elif search.start_date: |
| query["created_at"] = {"$gte": search.start_date} |
| elif search.end_date: |
| query["created_at"] = {"$lte": search.end_date} |
| |
| total = await Database.agents.count_documents(query) |
| |
| cursor = Database.agents.find(query)\ |
| .sort("created_at", -1)\ |
| .skip(params.skip)\ |
| .limit(params.limit) |
| |
| agents = await cursor.to_list(length=params.limit) |
| return agents, total |
|
|
| |
| class WorkflowService: |
| @staticmethod |
| async def create_workflow(workflow_data: WorkflowCreate, user_id: Optional[str] = None) -> Dict[str, Any]: |
| """Create a new workflow.""" |
| workflow_dict = workflow_data.dict() |
| workflow_dict["created_by"] = user_id |
| workflow_dict["created_at"] = datetime.utcnow() |
| workflow_dict["updated_at"] = workflow_dict["created_at"] |
| workflow_dict["status"] = WorkflowStatus.CREATED |
| workflow_dict["version"] = 1 |
| |
| |
| agent_ids = set() |
| |
| |
| steps = workflow_dict["definition"].get("steps", []) |
| for step in steps: |
| if "agent_id" in step: |
| agent_id = step["agent_id"] |
| |
| agent = await AgentService.get_agent(agent_id) |
| if not agent: |
| raise ValueError(f"Agent with ID {agent_id} does not exist") |
| agent_ids.add(agent_id) |
| |
| workflow_dict["agent_ids"] = list(agent_ids) |
| |
| |
| existing = await Database.workflows.find_one({"name": workflow_dict["name"]}) |
| if existing: |
| raise ValueError(f"Workflow with name '{workflow_dict['name']}' already exists") |
| |
| result = await Database.workflows.insert_one(workflow_dict) |
| workflow_dict["_id"] = result.inserted_id |
| |
| logger.info(f"Created workflow {workflow_dict['name']} with ID {result.inserted_id}") |
| |
| return workflow_dict |
| |
| @staticmethod |
| async def get_workflow(workflow_id: str) -> Optional[Dict[str, Any]]: |
| """Get a workflow by ID.""" |
| if not ObjectId.is_valid(workflow_id): |
| raise ValueError(f"Invalid workflow ID: {workflow_id}") |
| workflow = await Database.workflows.find_one({"_id": ObjectId(workflow_id)}) |
| return workflow |
| |
| @staticmethod |
| async def get_workflow_by_name(name: str) -> Optional[Dict[str, Any]]: |
| """Get a workflow by name.""" |
| return await Database.workflows.find_one({"name": name}) |
| |
| @staticmethod |
| async def update_workflow(workflow_id: str, workflow_data: WorkflowUpdate) -> Optional[Dict[str, Any]]: |
| """Update a workflow.""" |
| if not ObjectId.is_valid(workflow_id): |
| raise ValueError(f"Invalid workflow ID: {workflow_id}") |
| |
| workflow = await Database.workflows.find_one({"_id": ObjectId(workflow_id)}) |
| if not workflow: |
| return None |
| |
| update_data = workflow_data.dict(exclude_unset=True) |
| update_data["updated_at"] = datetime.utcnow() |
| |
| |
| if "definition" in update_data: |
| update_data["version"] = workflow.get("version", 1) + 1 |
| |
| |
| agent_ids = set() |
| steps = update_data["definition"].get("steps", []) |
| for step in steps: |
| if "agent_id" in step: |
| agent_id = step["agent_id"] |
| |
| agent = await AgentService.get_agent(agent_id) |
| if not agent: |
| raise ValueError(f"Agent with ID {agent_id} does not exist") |
| agent_ids.add(agent_id) |
| |
| update_data["agent_ids"] = list(agent_ids) |
| |
| |
| if "name" in update_data: |
| existing = await Database.workflows.find_one({ |
| "name": update_data["name"], |
| "_id": {"$ne": ObjectId(workflow_id)} |
| }) |
| if existing: |
| raise ValueError(f"Workflow with name '{update_data['name']}' already exists") |
| |
| await Database.workflows.update_one( |
| {"_id": ObjectId(workflow_id)}, |
| {"$set": update_data} |
| ) |
| |
| updated_workflow = await Database.workflows.find_one({"_id": ObjectId(workflow_id)}) |
| logger.info(f"Updated workflow {workflow_id}") |
| |
| return updated_workflow |
| |
| @staticmethod |
| async def delete_workflow(workflow_id: str) -> bool: |
| """Delete a workflow.""" |
| if not ObjectId.is_valid(workflow_id): |
| raise ValueError(f"Invalid workflow ID: {workflow_id}") |
| |
| |
| recent_executions = await Database.executions.count_documents({ |
| "workflow_id": workflow_id, |
| "status": {"$in": [ |
| ExecutionStatus.PENDING, |
| ExecutionStatus.RUNNING |
| ]} |
| }) |
| |
| if recent_executions > 0: |
| raise ValueError(f"Cannot delete workflow {workflow_id} with {recent_executions} active executions") |
|
|
| result = await Database.workflows.delete_one({"_id": ObjectId(workflow_id)}) |
| if result.deleted_count: |
| |
| await Database.logs.delete_many({"workflow_id": workflow_id}) |
| await Database.executions.delete_many({"workflow_id": workflow_id}) |
| |
| logger.info(f"Deleted workflow {workflow_id}") |
| return True |
| return False |
| |
| @staticmethod |
| async def list_workflows( |
| params: PaginationParams, |
| search: Optional[SearchParams] = None |
| ) -> Tuple[List[Dict[str, Any]], int]: |
| """List workflows with pagination and search.""" |
| query = {} |
| |
| if search: |
| if search.query: |
| query["$text"] = {"$search": search.query} |
| |
| if search.tags: |
| query["tags"] = {"$all": search.tags} |
| |
| if search.status: |
| query["status"] = search.status |
| |
| if search.start_date and search.end_date: |
| query["created_at"] = { |
| "$gte": search.start_date, |
| "$lte": search.end_date |
| } |
| elif search.start_date: |
| query["created_at"] = {"$gte": search.start_date} |
| elif search.end_date: |
| query["created_at"] = {"$lte": search.end_date} |
| |
| total = await Database.workflows.count_documents(query) |
| |
| cursor = Database.workflows.find(query)\ |
| .sort("created_at", -1)\ |
| .skip(params.skip)\ |
| .limit(params.limit) |
| |
| workflows = await cursor.to_list(length=params.limit) |
| return workflows, total |
|
|
| |
| class WorkflowExecutionService: |
| @staticmethod |
| async def create_execution(execution_data: ExecutionCreate, user_id: Optional[str] = None) -> Dict[str, Any]: |
| """Create a new workflow execution.""" |
| |
| workflow = await WorkflowService.get_workflow(execution_data.workflow_id) |
| if not workflow: |
| raise ValueError(f"Workflow {execution_data.workflow_id} not found") |
| |
| |
| execution_dict = { |
| "workflow_id": execution_data.workflow_id, |
| "status": ExecutionStatus.PENDING, |
| "start_time": datetime.utcnow(), |
| "input_params": execution_data.input_params, |
| "created_by": user_id, |
| "created_at": datetime.utcnow(), |
| "step_results": {}, |
| "current_step": None, |
| "results": {}, |
| "error_message": None |
| } |
| |
| |
| result = await Database.executions.insert_one(execution_dict) |
| execution_dict["_id"] = result.inserted_id |
| |
| logger.info(f"Created workflow execution {result.inserted_id}") |
| |
| |
| |
| |
| |
| return execution_dict |
| |
| @staticmethod |
| async def get_execution(execution_id: str) -> Optional[Dict[str, Any]]: |
| """Get a workflow execution by ID.""" |
| if not ObjectId.is_valid(execution_id): |
| raise ValueError(f"Invalid execution ID: {execution_id}") |
| |
| execution = await Database.executions.find_one({"_id": ObjectId(execution_id)}) |
| return execution |
| |
| @staticmethod |
| async def update_execution_status(execution_id: str, status: ExecutionStatus, error_message: Optional[str] = None) -> Optional[Dict[str, Any]]: |
| """Update execution status.""" |
| if not ObjectId.is_valid(execution_id): |
| raise ValueError(f"Invalid execution ID: {execution_id}") |
| |
| update_data = { |
| "status": status, |
| "updated_at": datetime.utcnow() |
| } |
| |
| if status in [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED, ExecutionStatus.CANCELLED]: |
| update_data["end_time"] = datetime.utcnow() |
| |
| if error_message: |
| update_data["error_message"] = error_message |
| |
| result = await Database.executions.find_one_and_update( |
| {"_id": ObjectId(execution_id)}, |
| {"$set": update_data}, |
| return_document=True |
| ) |
| |
| return result |
| |
| @staticmethod |
| async def list_executions( |
| workflow_id: Optional[str] = None, |
| params: PaginationParams = PaginationParams(), |
| search: Optional[SearchParams] = None |
| ) -> Tuple[List[Dict[str, Any]], int]: |
| """List workflow executions with pagination and search.""" |
| query = {} |
| |
| if workflow_id: |
| query["workflow_id"] = workflow_id |
| |
| if search: |
| if search.status: |
| query["status"] = search.status |
| |
| if search.start_date and search.end_date: |
| query["created_at"] = { |
| "$gte": search.start_date, |
| "$lte": search.end_date |
| } |
| elif search.start_date: |
| query["created_at"] = {"$gte": search.start_date} |
| elif search.end_date: |
| query["created_at"] = {"$lte": search.end_date} |
| |
| total = await Database.executions.count_documents(query) |
| |
| cursor = Database.executions.find(query)\ |
| .sort("created_at", -1)\ |
| .skip(params.skip)\ |
| .limit(params.limit) |
| |
| executions = await cursor.to_list(length=params.limit) |
| return executions, total |
| |
| @staticmethod |
| async def log_execution_event( |
| workflow_id: str, |
| execution_id: str, |
| message: str, |
| step_id: Optional[str] = None, |
| agent_id: Optional[str] = None, |
| level: str = "INFO", |
| details: Optional[Dict[str, Any]] = None |
| ) -> Dict[str, Any]: |
| """Log an event in a workflow execution.""" |
| log_entry = { |
| "workflow_id": workflow_id, |
| "execution_id": execution_id, |
| "step_id": step_id, |
| "agent_id": agent_id, |
| "timestamp": datetime.utcnow(), |
| "level": level, |
| "message": message, |
| "details": details or {} |
| } |
| |
| result = await Database.logs.insert_one(log_entry) |
| log_entry["_id"] = result.inserted_id |
| |
| return log_entry |
| |
| @staticmethod |
| async def get_execution_logs( |
| execution_id: str, |
| params: PaginationParams = PaginationParams() |
| ) -> Tuple[List[Dict[str, Any]], int]: |
| """Retrieve logs for a specific execution.""" |
| query = {"execution_id": execution_id} |
| |
| total = await Database.logs.count_documents(query) |
| |
| cursor = Database.logs.find(query)\ |
| .sort("timestamp", 1)\ |
| .skip(params.skip)\ |
| .limit(params.limit) |
| |
| logs = await cursor.to_list(length=params.limit) |
| return logs, total |