import logging logger = logging.getLogger(__name__) """ MiroFish Point 4: Continuous Multi-Agent Sandbox (Parallel Execution) MiroFish Point 1: Dynamic Graph Evolution (Living Knowledge Graph) Uses Celery for parallel execution to simulate agents interacting. Every action taken by the LLM is pushed immediately as a temporal edge back into Neo4j. """ from typing import List, Dict, Any from pydantic import BaseModel, Field from datetime import datetime, timezone import asyncio from ..core.neo4j_store import Neo4jStore from ..core.llm_factory import UnifiedLLMProvider class AgentAction(BaseModel): action_type: str = Field(description="The generic action type (e.g., 'DEBATED_WITH', 'AGREED_WITH', 'MENTIONED')") target_id: str = Field(description="The ID of the target agent to interact with.") content: str = Field(description="The textual interaction content (e.g., tweet or dialogue).") confidence: float = Field(description="0.0 to 1.0 confidence in taking this action.") class SimulationManager: """Manages parallel multi-agent loops and pushes real-time actions to Neo4j.""" def __init__(self, store: Neo4jStore, llm: UnifiedLLMProvider): self.store = store self.llm = llm async def get_active_agents(self, limit: int = 20) -> List[Dict]: """Fetch agents with generated personas (from Point 2 PersonaGenerator)""" query = """ MATCH (a:Entity {is_agent: true}) WHERE a.persona IS NOT NULL RETURN a.id as id, a.name as name, a.persona as persona LIMIT $limit """ return await self.store.execute_query(query, {"limit": limit}) async def run_simulation_tick(self) -> int: """ Runs one tick of the sandbox simulation: 1. Selects active agents 2. Retrieves their current graph environment ('memory') 3. Uses the LLM to decide on an action 4. Writes the action and content as a Temporal Edge into Neo4j """ agents = await self.get_active_agents() if len(agents) < 2: return 0 # Not enough agents actions_taken = 0 tasks = [self._process_agent_turn(agent, agents) for agent in agents] results = await asyncio.gather(*tasks, return_exceptions=True) for res in results: if res and not isinstance(res, Exception): actions_taken += 1 return actions_taken async def _process_agent_turn(self, agent: Dict, all_agents: List[Dict]) -> bool: """Process a single agent's turn asynchronously.""" # Exclude self from targets potential_targets = [a for a in all_agents if a['id'] != agent['id']] if not potential_targets: return False target_summaries = "\n".join([f"- ID: {t['id']} | Name: {t['name']} | Persona: {t['persona']}" for t in potential_targets]) # Get recent memories/interactions of this agent memory_query = """ MATCH (a:Entity {id: $agent_id})-[r]->(t:Entity) WHERE r.valid_from IS NOT NULL RETURN type(r) as action, t.name as target, r.content as content ORDER BY r.valid_from DESC LIMIT 3 """ recent_memories = await self.store.execute_query(memory_query, {"agent_id": agent['id']}) memory_string = "\n".join([f"I {m['action']} {m['target']}: '{m['content']}'" for m in recent_memories]) prompt = ( f"You are the simulation engine roleplaying as: {agent['name']}\n" f"Your Persona Profile: {agent['persona']}\n\n" f"Your Recent Memory/Actions:\n{memory_string if recent_memories else 'No recent memories.'}\n\n" f"Available Targets to interact with:\n{target_summaries}\n\n" f"Decide on ONE action to take towards ONE target. Return valid JSON." ) try: # LLM Engine decides the action (Ollama by default) action: AgentAction = await self.llm.complete_structured( prompt=prompt, response_model=AgentAction, system_prompt="You are a strict simulator. Simulate the given character and dictate their next textual interaction." ) # --- POINT 1: Dynamic Graph Evolution --- # Save the action back into our Neo4j Knowledge Graph as a temporal event edge timestamp = datetime.now(timezone.utc).replace(tzinfo=None).isoformat() import re cypher_action = action.action_type.replace(" ", "_").upper() cypher_action = re.sub(r'[^A-Z0-9_]', '', cypher_action) if not cypher_action: cypher_action = "INTERACTED_WITH" update_query = f""" MATCH (source:Entity {{id: $source_id}}) MATCH (target:Entity {{id: $target_id}}) CREATE (source)-[r:{cypher_action} {{ content: $content, confidence: $confidence, valid_from: $timestamp, is_simulation_event: true, ingested_at: datetime() }}]->(target) RETURN r """ await self.store.execute_query( update_query, params={ "source_id": agent['id'], "target_id": action.target_id, "content": action.content, "confidence": action.confidence, "timestamp": timestamp } ) return True except Exception as e: logger.info(f"Simulation failed for {agent['name']}: {e}") return False