File size: 6,999 Bytes
f56a29b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 | /**
* Agent Loop β Shared core logic for the frontend-driven multi-agent loop.
*
* Extracted from use-chat-sessions.ts so both the frontend hook and the
* eval harness share the same loop logic. No React dependency β pure
* async function with callback injection for environment-specific behavior.
*
* The loop runs per-user-message: the director dispatches agents one at a
* time, each agent generates a response, and the loop continues until the
* director says END, cues the user, or maxTurns is reached.
*/
import type { StatelessEvent, DirectorState } from '@/lib/types/chat';
import type { ThinkingConfig } from '@/lib/types/provider';
import { createLogger } from '@/lib/logger';
const log = createLogger('AgentLoop');
// ==================== Types ====================
/** Store state snapshot sent with each /api/chat request */
export interface AgentLoopStoreState {
stage: unknown;
scenes: unknown[];
currentSceneId: string | null;
mode: string;
whiteboardOpen: boolean;
}
/** Request template β fields that stay constant across loop iterations */
export interface AgentLoopRequest {
config: {
agentIds: string[];
sessionType?: string;
agentConfigs?: Record<string, unknown>[];
[key: string]: unknown;
};
userProfile?: { nickname?: string; bio?: string };
apiKey: string;
baseUrl?: string;
model?: string;
providerType?: string;
thinkingConfig?: ThinkingConfig;
}
/** Per-iteration outcome extracted from the done event */
export interface AgentLoopIterationResult {
directorState?: DirectorState;
totalAgents: number;
agentHadContent: boolean;
cueUserReceived: boolean;
}
/** Callbacks injected by the caller (frontend or eval) */
export interface AgentLoopCallbacks {
/** Get fresh store state for each iteration (whiteboard may have changed) */
getStoreState: () => AgentLoopStoreState;
/** Get current messages for the request */
getMessages: () => unknown[];
/**
* Make the HTTP request to /api/chat.
* Returns a Response object (or equivalent with .body ReadableStream).
*/
fetchChat: (body: Record<string, unknown>, signal: AbortSignal) => Promise<Response>;
/**
* Process a single SSE event. Called for every event in the stream.
* The callback should handle action execution, text accumulation,
* message construction, and UI updates.
*/
onEvent: (event: StatelessEvent) => void;
/**
* Called after all SSE events for one iteration have been processed
* and the stream is closed.
*
* Must return the iteration result (extracted from the 'done' event).
* The frontend waits for buffer drain here before reading the result
* from loopDoneDataRef. The eval harness returns a result it
* accumulated during onEvent calls.
*/
onIterationEnd: () => Promise<AgentLoopIterationResult | null>;
}
/** Final outcome of the agent loop */
export interface AgentLoopOutcome {
/** Why the loop stopped */
reason: 'end' | 'cue_user' | 'max_turns' | 'aborted' | 'empty_turns' | 'no_done';
/** Accumulated director state */
directorState?: DirectorState;
/** Number of iterations completed */
turnCount: number;
}
// ==================== Core Loop ====================
/**
* Run the agent loop β shared between frontend and eval.
*
* Each iteration: refresh state β POST /api/chat β process SSE events
* β check exit conditions β repeat.
*/
export async function runAgentLoop(
request: AgentLoopRequest,
callbacks: AgentLoopCallbacks,
signal: AbortSignal,
maxTurns: number,
): Promise<AgentLoopOutcome> {
let directorState: DirectorState | undefined = undefined;
let turnCount = 0;
let consecutiveEmptyTurns = 0;
while (turnCount < maxTurns) {
if (signal.aborted) {
return { reason: 'aborted', directorState, turnCount };
}
// Refresh store state each iteration β agent actions may have changed
// whiteboard, scene, or mode between turns
const freshStoreState = callbacks.getStoreState();
const currentMessages = callbacks.getMessages();
// Build request body
const body: Record<string, unknown> = {
messages: currentMessages,
storeState: freshStoreState,
config: request.config,
directorState,
userProfile: request.userProfile,
apiKey: request.apiKey,
baseUrl: request.baseUrl,
model: request.model,
providerType: request.providerType,
thinkingConfig: request.thinkingConfig,
};
// Fetch
const response = await callbacks.fetchChat(body, signal);
if (!response.ok) {
const errorText = await response.text();
throw new Error(`API error: ${response.status} - ${errorText}`);
}
// Parse SSE stream and process events
const reader = response.body?.getReader();
if (!reader) throw new Error('No response body');
const decoder = new TextDecoder();
let sseBuffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
sseBuffer += decoder.decode(value, { stream: true });
const parts = sseBuffer.split('\n\n');
sseBuffer = parts.pop() || '';
for (const part of parts) {
const line = part.trim();
if (!line.startsWith('data: ')) continue;
try {
const event: StatelessEvent = JSON.parse(line.slice(6));
callbacks.onEvent(event);
} catch {
// Skip malformed events (heartbeats, etc.)
}
}
}
} finally {
reader.releaseLock();
}
if (signal.aborted) {
return { reason: 'aborted', directorState, turnCount };
}
// Post-iteration: wait for buffer drain (frontend) or collect results (eval)
const iterationResult = await callbacks.onIterationEnd();
// Check exit conditions
if (!iterationResult) {
return { reason: 'no_done', directorState, turnCount };
}
// Update accumulated director state
directorState = iterationResult.directorState;
turnCount = directorState?.turnCount ?? turnCount + 1;
// Director said USER β stop loop
if (iterationResult.cueUserReceived) {
return { reason: 'cue_user', directorState, turnCount };
}
// Director said END β no agent spoke
if (iterationResult.totalAgents === 0) {
return { reason: 'end', directorState, turnCount };
}
// Track consecutive empty responses
if (!iterationResult.agentHadContent) {
consecutiveEmptyTurns++;
if (consecutiveEmptyTurns >= 2) {
log.warn(
`[AgentLoop] ${consecutiveEmptyTurns} consecutive empty agent responses, stopping loop`,
);
return { reason: 'empty_turns', directorState, turnCount };
}
} else {
consecutiveEmptyTurns = 0;
}
}
// maxTurns reached
if (turnCount >= maxTurns) {
log.info(`[AgentLoop] Max turns (${maxTurns}) reached`);
}
return { reason: 'max_turns', directorState, turnCount };
}
|