purpose-agent / purpose_agent /mas_generator.py
Rohan03's picture
v3.0.0 Production Release: Hardened framework, strict tool validation, test suite robustification
36d2671
"""
mas_generator.py β€” Multi-Agent System Generator.
Takes a use-case description and outputs a complete generated system:
- Agent specs with roles
- Flow (workflow graph)
- Tools needed
- Memory budget
- Eval suite
- Routing policy
Usage:
from purpose_agent.mas_generator import generate
mas = generate("Monitor GitHub repos for CVEs and alert the team")
# β†’ GeneratedMAS with agents, flow, tools, evals, routing
# Run it
team = mas.to_team(model=backend)
result = team.run("Check for new CVEs today")
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any
from purpose_agent.routing import RoutingPolicy, TaskComplexity
from purpose_agent.memory_homeostasis import MemoryBudget
logger = logging.getLogger(__name__)
@dataclass
class GeneratedAgent:
"""Spec for a generated agent."""
name: str
role: str
expertise: list[str] = field(default_factory=list)
tools_needed: list[str] = field(default_factory=list)
@dataclass
class GeneratedFlow:
"""Spec for a generated workflow."""
nodes: list[str] = field(default_factory=list) # Node names in order
edges: list[tuple[str, str]] = field(default_factory=list) # (from, to)
conditional: dict[str, dict[str, str]] = field(default_factory=dict) # node β†’ {condition: target}
@dataclass
class GeneratedEval:
"""A generated evaluation case."""
id: str
purpose: str
expected_behavior: str
category: str = "general"
@dataclass
class GeneratedMAS:
"""Complete generated multi-agent system."""
purpose: str
agents: list[GeneratedAgent] = field(default_factory=list)
flow: GeneratedFlow = field(default_factory=GeneratedFlow)
tools: list[str] = field(default_factory=list)
memory_budget: MemoryBudget = field(default_factory=MemoryBudget)
eval_suite: list[GeneratedEval] = field(default_factory=list)
routing_policy: RoutingPolicy = field(default_factory=RoutingPolicy)
metadata: dict[str, Any] = field(default_factory=dict)
def to_team(self, model=None):
"""Convert to a Purpose Agent Team for execution."""
from purpose_agent.easy import Team
agent_specs = [{"name": a.name, "role": a.role} for a in self.agents]
return Team(purpose=self.purpose, agents=agent_specs, model=model)
# ═══════════════════════════════════════════════════════════════
# Templates β€” deterministic generation (no LLM needed)
# ═══════════════════════════════════════════════════════════════
_TEMPLATES = {
"code": {
"keywords": ["code", "program", "develop", "build", "software", "python", "debug", "function", "api", "script"],
"agents": [
GeneratedAgent("architect", "Design solution architecture and break into subtasks", ["design", "planning"]),
GeneratedAgent("coder", "Write clean, tested code following best practices", ["coding", "python"], ["python_exec"]),
GeneratedAgent("tester", "Review code for bugs, edge cases, and improvements", ["testing", "review"]),
],
"flow_nodes": ["architect", "coder", "tester"],
"flow_type": "sequential_with_review",
"tools": ["python_exec", "read_file", "write_file"],
},
"security": {
"keywords": ["security", "cve", "cves", "vulnerability", "audit", "penetration", "threat", "monitor", "alert"],
"agents": [
GeneratedAgent("scanner", "Scan and identify potential security issues", ["scanning", "detection"]),
GeneratedAgent("analyst", "Analyze severity and impact of findings", ["analysis", "risk"]),
GeneratedAgent("reporter", "Create clear security reports with recommendations", ["reporting"]),
GeneratedAgent("security_critic", "Verify findings and check for false positives", ["verification"]),
],
"flow_nodes": ["scanner", "analyst", "security_critic", "reporter"],
"flow_type": "sequential_with_critic",
"tools": ["read_file", "calculator"],
},
"research": {
"keywords": ["research", "find", "search", "discover", "papers", "study", "investigate", "analyze"],
"agents": [
GeneratedAgent("researcher", "Find and gather relevant information", ["search", "gathering"]),
GeneratedAgent("verifier", "Cross-check facts and verify sources", ["verification", "fact-check"]),
GeneratedAgent("synthesizer", "Combine findings into coherent summary", ["synthesis", "writing"]),
],
"flow_nodes": ["researcher", "verifier", "synthesizer"],
"flow_type": "sequential",
"tools": ["calculator"],
},
"data": {
"keywords": ["data", "csv", "excel", "database", "analytics", "chart", "statistics", "report"],
"agents": [
GeneratedAgent("loader", "Load and validate data from sources", ["data_loading"], ["read_file"]),
GeneratedAgent("analyst", "Analyze data, compute statistics, find patterns", ["analysis"], ["python_exec", "calculator"]),
GeneratedAgent("validator", "Validate results and check for errors", ["validation"]),
GeneratedAgent("reporter", "Present findings in clear format", ["reporting"]),
],
"flow_nodes": ["loader", "analyst", "validator", "reporter"],
"flow_type": "sequential",
"tools": ["python_exec", "read_file", "calculator"],
},
"operations": {
"keywords": ["deploy", "monitor", "operate", "maintain", "alert", "incident", "pipeline"],
"agents": [
GeneratedAgent("planner", "Plan operations and identify risks", ["planning"]),
GeneratedAgent("executor", "Execute planned operations carefully", ["execution"], ["python_exec"]),
GeneratedAgent("auditor", "Verify operations completed correctly", ["auditing"]),
],
"flow_nodes": ["planner", "executor", "auditor"],
"flow_type": "sequential_with_approval",
"tools": ["python_exec", "read_file"],
},
}
def _match_template(use_case: str) -> dict:
"""Match use case to best template by keyword overlap."""
words = set(use_case.lower().split())
best_template = None
best_score = 0
for name, template in _TEMPLATES.items():
score = len(words & set(template["keywords"]))
if score > best_score:
best_score = score
best_template = template
# Default to research if no match
return best_template or _TEMPLATES["research"]
def _generate_flow(template: dict) -> GeneratedFlow:
"""Generate flow graph from template."""
nodes = template["flow_nodes"]
flow = GeneratedFlow(nodes=list(nodes))
# Sequential edges
for i in range(len(nodes) - 1):
flow.edges.append((nodes[i], nodes[i + 1]))
# Add review loop for certain types
if template.get("flow_type") == "sequential_with_review":
# Last node can loop back to second node on failure
flow.conditional[nodes[-1]] = {"pass": "__END__", "fail": nodes[1]}
elif template.get("flow_type") == "sequential_with_critic":
# Critic can reject and loop back
critic = [n for n in nodes if "critic" in n]
if critic:
flow.conditional[critic[0]] = {"approve": nodes[-1], "reject": nodes[0]}
return flow
def _generate_evals(use_case: str, agents: list[GeneratedAgent]) -> list[GeneratedEval]:
"""Generate evaluation cases for the system."""
evals = [
GeneratedEval(
id="eval_runs",
purpose=f"System can process: {use_case}",
expected_behavior="Completes without error",
category="basic",
),
GeneratedEval(
id="eval_agents_contribute",
purpose="Each agent contributes to the output",
expected_behavior="All agents produce non-empty output",
category="coverage",
),
]
# Add per-agent evals
for agent in agents[:3]:
evals.append(GeneratedEval(
id=f"eval_{agent.name}",
purpose=f"{agent.name} performs its role: {agent.role}",
expected_behavior=f"{agent.name} produces relevant output for its expertise",
category="agent_role",
))
return evals
def generate(use_case: str, constraints: dict[str, Any] | None = None) -> GeneratedMAS:
"""
Generate a complete multi-agent system from a use-case description.
Uses deterministic templates (no LLM required).
Args:
use_case: Plain English description of what the system should do
constraints: Optional overrides (max_agents, prefer_local, etc.)
Returns:
GeneratedMAS with agents, flow, tools, evals, and routing policy
"""
constraints = constraints or {}
template = _match_template(use_case)
agents = template["agents"]
flow = _generate_flow(template)
tools = template["tools"]
evals = _generate_evals(use_case, agents)
# Routing policy based on template complexity
n_agents = len(agents)
policy = RoutingPolicy(
prefer_local=constraints.get("prefer_local", True),
max_cost_per_task_usd=constraints.get("max_cost", 0.10),
)
# Memory budget scaled to system size
budget = MemoryBudget(
max_active_cards=min(512, n_agents * 128),
max_injected_tokens=min(500, n_agents * 150),
)
mas = GeneratedMAS(
purpose=use_case,
agents=agents,
flow=flow,
tools=tools,
memory_budget=budget,
eval_suite=evals,
routing_policy=policy,
metadata={"template": next((k for k, v in _TEMPLATES.items() if v is template), "research")},
)
logger.info(f"MAS generated: {len(agents)} agents, {len(flow.nodes)} nodes, {len(evals)} evals")
return mas