V2 merge: purpose_agent/unified.py
Browse files- purpose_agent/unified.py +24 -2
purpose_agent/unified.py
CHANGED
|
@@ -73,6 +73,11 @@ END = "__END__"
|
|
| 73 |
START = "__START__"
|
| 74 |
|
| 75 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 76 |
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 77 |
# 1. PLUG-AND-PLAY β Agent() one-liner factory (OpenAI Agents SDK simplicity)
|
| 78 |
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
|
@@ -447,7 +452,22 @@ def parallel(
|
|
| 447 |
agent_list = agents
|
| 448 |
|
| 449 |
states = initial_states or [None] * len(normalized)
|
| 450 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 451 |
|
| 452 |
logger.info(f"Parallel: Running {len(normalized)} tasks with {workers} workers")
|
| 453 |
|
|
@@ -455,7 +475,9 @@ def parallel(
|
|
| 455 |
task = normalized[idx]
|
| 456 |
agent = agent_list[idx]
|
| 457 |
state = states[idx]
|
| 458 |
-
|
|
|
|
|
|
|
| 459 |
|
| 460 |
results: list[TaskResult | None] = [None] * len(normalized)
|
| 461 |
|
|
|
|
| 73 |
START = "__START__"
|
| 74 |
|
| 75 |
|
| 76 |
+
import threading
|
| 77 |
+
|
| 78 |
+
# Global lock for shared replay/optimizer in parallel execution
|
| 79 |
+
_parallel_lock = threading.Lock()
|
| 80 |
+
|
| 81 |
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 82 |
# 1. PLUG-AND-PLAY β Agent() one-liner factory (OpenAI Agents SDK simplicity)
|
| 83 |
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
|
|
|
| 452 |
agent_list = agents
|
| 453 |
|
| 454 |
states = initial_states or [None] * len(normalized)
|
| 455 |
+
|
| 456 |
+
# Thread safety: detect backend type for concurrency limit
|
| 457 |
+
# Local backends (Ollama, llama-cpp) share one GPU/CPU β serialize
|
| 458 |
+
# Cloud/API backends can parallelize
|
| 459 |
+
if max_workers is None:
|
| 460 |
+
sample_agent = agent_list[0] if agent_list else None
|
| 461 |
+
if sample_agent and hasattr(sample_agent, 'llm'):
|
| 462 |
+
backend_type = type(sample_agent.llm).__name__
|
| 463 |
+
if backend_type in ("OllamaBackend", "LlamaCppBackend", "MockLLMBackend"):
|
| 464 |
+
workers = 1 # Local model β serialize to avoid contention
|
| 465 |
+
else:
|
| 466 |
+
workers = min(len(normalized), 8)
|
| 467 |
+
else:
|
| 468 |
+
workers = min(len(normalized), 8)
|
| 469 |
+
else:
|
| 470 |
+
workers = max_workers
|
| 471 |
|
| 472 |
logger.info(f"Parallel: Running {len(normalized)} tasks with {workers} workers")
|
| 473 |
|
|
|
|
| 475 |
task = normalized[idx]
|
| 476 |
agent = agent_list[idx]
|
| 477 |
state = states[idx]
|
| 478 |
+
# Lock around shared replay/optimizer writes
|
| 479 |
+
with _parallel_lock:
|
| 480 |
+
return agent.run(task["purpose"], state=state)
|
| 481 |
|
| 482 |
results: list[TaskResult | None] = [None] * len(normalized)
|
| 483 |
|