File size: 5,736 Bytes
31034f0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | import logging
logger = logging.getLogger(__name__)
"""
MiroFish Point 4: Continuous Multi-Agent Sandbox (Parallel Execution)
MiroFish Point 1: Dynamic Graph Evolution (Living Knowledge Graph)
Uses Celery for parallel execution to simulate agents interacting.
Every action taken by the LLM is pushed immediately as a temporal edge back into Neo4j.
"""
from typing import List, Dict, Any
from pydantic import BaseModel, Field
from datetime import datetime, timezone
import asyncio
from ..core.neo4j_store import Neo4jStore
from ..core.llm_factory import UnifiedLLMProvider
class AgentAction(BaseModel):
action_type: str = Field(description="The generic action type (e.g., 'DEBATED_WITH', 'AGREED_WITH', 'MENTIONED')")
target_id: str = Field(description="The ID of the target agent to interact with.")
content: str = Field(description="The textual interaction content (e.g., tweet or dialogue).")
confidence: float = Field(description="0.0 to 1.0 confidence in taking this action.")
class SimulationManager:
"""Manages parallel multi-agent loops and pushes real-time actions to Neo4j."""
def __init__(self, store: Neo4jStore, llm: UnifiedLLMProvider):
self.store = store
self.llm = llm
async def get_active_agents(self, limit: int = 20) -> List[Dict]:
"""Fetch agents with generated personas (from Point 2 PersonaGenerator)"""
query = """
MATCH (a:Entity {is_agent: true})
WHERE a.persona IS NOT NULL
RETURN a.id as id, a.name as name, a.persona as persona
LIMIT $limit
"""
return await self.store.execute_query(query, {"limit": limit})
async def run_simulation_tick(self) -> int:
"""
Runs one tick of the sandbox simulation:
1. Selects active agents
2. Retrieves their current graph environment ('memory')
3. Uses the LLM to decide on an action
4. Writes the action and content as a Temporal Edge into Neo4j
"""
agents = await self.get_active_agents()
if len(agents) < 2:
return 0 # Not enough agents
actions_taken = 0
tasks = [self._process_agent_turn(agent, agents) for agent in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
for res in results:
if res and not isinstance(res, Exception):
actions_taken += 1
return actions_taken
async def _process_agent_turn(self, agent: Dict, all_agents: List[Dict]) -> bool:
"""Process a single agent's turn asynchronously."""
# Exclude self from targets
potential_targets = [a for a in all_agents if a['id'] != agent['id']]
if not potential_targets:
return False
target_summaries = "\n".join([f"- ID: {t['id']} | Name: {t['name']} | Persona: {t['persona']}" for t in potential_targets])
# Get recent memories/interactions of this agent
memory_query = """
MATCH (a:Entity {id: $agent_id})-[r]->(t:Entity)
WHERE r.valid_from IS NOT NULL
RETURN type(r) as action, t.name as target, r.content as content
ORDER BY r.valid_from DESC LIMIT 3
"""
recent_memories = await self.store.execute_query(memory_query, {"agent_id": agent['id']})
memory_string = "\n".join([f"I {m['action']} {m['target']}: '{m['content']}'" for m in recent_memories])
prompt = (
f"You are the simulation engine roleplaying as: {agent['name']}\n"
f"Your Persona Profile: {agent['persona']}\n\n"
f"Your Recent Memory/Actions:\n{memory_string if recent_memories else 'No recent memories.'}\n\n"
f"Available Targets to interact with:\n{target_summaries}\n\n"
f"Decide on ONE action to take towards ONE target. Return valid JSON."
)
try:
# LLM Engine decides the action (Ollama by default)
action: AgentAction = await self.llm.complete_structured(
prompt=prompt,
response_model=AgentAction,
system_prompt="You are a strict simulator. Simulate the given character and dictate their next textual interaction."
)
# --- POINT 1: Dynamic Graph Evolution ---
# Save the action back into our Neo4j Knowledge Graph as a temporal event edge
timestamp = datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
import re
cypher_action = action.action_type.replace(" ", "_").upper()
cypher_action = re.sub(r'[^A-Z0-9_]', '', cypher_action)
if not cypher_action:
cypher_action = "INTERACTED_WITH"
update_query = f"""
MATCH (source:Entity {{id: $source_id}})
MATCH (target:Entity {{id: $target_id}})
CREATE (source)-[r:{cypher_action} {{
content: $content,
confidence: $confidence,
valid_from: $timestamp,
is_simulation_event: true,
ingested_at: datetime()
}}]->(target)
RETURN r
"""
await self.store.execute_query(
update_query,
params={
"source_id": agent['id'],
"target_id": action.target_id,
"content": action.content,
"confidence": action.confidence,
"timestamp": timestamp
}
)
return True
except Exception as e:
logger.info(f"Simulation failed for {agent['name']}: {e}")
return False
|