File size: 6,999 Bytes
f56a29b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
/**
 * Agent Loop β€” Shared core logic for the frontend-driven multi-agent loop.
 *
 * Extracted from use-chat-sessions.ts so both the frontend hook and the
 * eval harness share the same loop logic. No React dependency β€” pure
 * async function with callback injection for environment-specific behavior.
 *
 * The loop runs per-user-message: the director dispatches agents one at a
 * time, each agent generates a response, and the loop continues until the
 * director says END, cues the user, or maxTurns is reached.
 */

import type { StatelessEvent, DirectorState } from '@/lib/types/chat';
import type { ThinkingConfig } from '@/lib/types/provider';
import { createLogger } from '@/lib/logger';

const log = createLogger('AgentLoop');

// ==================== Types ====================

/** Store state snapshot sent with each /api/chat request */
export interface AgentLoopStoreState {
  stage: unknown;
  scenes: unknown[];
  currentSceneId: string | null;
  mode: string;
  whiteboardOpen: boolean;
}

/** Request template β€” fields that stay constant across loop iterations */
export interface AgentLoopRequest {
  config: {
    agentIds: string[];
    sessionType?: string;
    agentConfigs?: Record<string, unknown>[];
    [key: string]: unknown;
  };
  userProfile?: { nickname?: string; bio?: string };
  apiKey: string;
  baseUrl?: string;
  model?: string;
  providerType?: string;
  thinkingConfig?: ThinkingConfig;
}

/** Per-iteration outcome extracted from the done event */
export interface AgentLoopIterationResult {
  directorState?: DirectorState;
  totalAgents: number;
  agentHadContent: boolean;
  cueUserReceived: boolean;
}

/** Callbacks injected by the caller (frontend or eval) */
export interface AgentLoopCallbacks {
  /** Get fresh store state for each iteration (whiteboard may have changed) */
  getStoreState: () => AgentLoopStoreState;

  /** Get current messages for the request */
  getMessages: () => unknown[];

  /**
   * Make the HTTP request to /api/chat.
   * Returns a Response object (or equivalent with .body ReadableStream).
   */
  fetchChat: (body: Record<string, unknown>, signal: AbortSignal) => Promise<Response>;

  /**
   * Process a single SSE event. Called for every event in the stream.
   * The callback should handle action execution, text accumulation,
   * message construction, and UI updates.
   */
  onEvent: (event: StatelessEvent) => void;

  /**
   * Called after all SSE events for one iteration have been processed
   * and the stream is closed.
   *
   * Must return the iteration result (extracted from the 'done' event).
   * The frontend waits for buffer drain here before reading the result
   * from loopDoneDataRef. The eval harness returns a result it
   * accumulated during onEvent calls.
   */
  onIterationEnd: () => Promise<AgentLoopIterationResult | null>;
}

/** Final outcome of the agent loop */
export interface AgentLoopOutcome {
  /** Why the loop stopped */
  reason: 'end' | 'cue_user' | 'max_turns' | 'aborted' | 'empty_turns' | 'no_done';
  /** Accumulated director state */
  directorState?: DirectorState;
  /** Number of iterations completed */
  turnCount: number;
}

// ==================== Core Loop ====================

/**
 * Run the agent loop β€” shared between frontend and eval.
 *
 * Each iteration: refresh state β†’ POST /api/chat β†’ process SSE events
 * β†’ check exit conditions β†’ repeat.
 */
export async function runAgentLoop(
  request: AgentLoopRequest,
  callbacks: AgentLoopCallbacks,
  signal: AbortSignal,
  maxTurns: number,
): Promise<AgentLoopOutcome> {
  let directorState: DirectorState | undefined = undefined;
  let turnCount = 0;
  let consecutiveEmptyTurns = 0;

  while (turnCount < maxTurns) {
    if (signal.aborted) {
      return { reason: 'aborted', directorState, turnCount };
    }

    // Refresh store state each iteration β€” agent actions may have changed
    // whiteboard, scene, or mode between turns
    const freshStoreState = callbacks.getStoreState();
    const currentMessages = callbacks.getMessages();

    // Build request body
    const body: Record<string, unknown> = {
      messages: currentMessages,
      storeState: freshStoreState,
      config: request.config,
      directorState,
      userProfile: request.userProfile,
      apiKey: request.apiKey,
      baseUrl: request.baseUrl,
      model: request.model,
      providerType: request.providerType,
      thinkingConfig: request.thinkingConfig,
    };

    // Fetch
    const response = await callbacks.fetchChat(body, signal);

    if (!response.ok) {
      const errorText = await response.text();
      throw new Error(`API error: ${response.status} - ${errorText}`);
    }

    // Parse SSE stream and process events
    const reader = response.body?.getReader();
    if (!reader) throw new Error('No response body');

    const decoder = new TextDecoder();
    let sseBuffer = '';

    try {
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        sseBuffer += decoder.decode(value, { stream: true });
        const parts = sseBuffer.split('\n\n');
        sseBuffer = parts.pop() || '';

        for (const part of parts) {
          const line = part.trim();
          if (!line.startsWith('data: ')) continue;

          try {
            const event: StatelessEvent = JSON.parse(line.slice(6));
            callbacks.onEvent(event);
          } catch {
            // Skip malformed events (heartbeats, etc.)
          }
        }
      }
    } finally {
      reader.releaseLock();
    }

    if (signal.aborted) {
      return { reason: 'aborted', directorState, turnCount };
    }

    // Post-iteration: wait for buffer drain (frontend) or collect results (eval)
    const iterationResult = await callbacks.onIterationEnd();

    // Check exit conditions
    if (!iterationResult) {
      return { reason: 'no_done', directorState, turnCount };
    }

    // Update accumulated director state
    directorState = iterationResult.directorState;
    turnCount = directorState?.turnCount ?? turnCount + 1;

    // Director said USER β€” stop loop
    if (iterationResult.cueUserReceived) {
      return { reason: 'cue_user', directorState, turnCount };
    }

    // Director said END β€” no agent spoke
    if (iterationResult.totalAgents === 0) {
      return { reason: 'end', directorState, turnCount };
    }

    // Track consecutive empty responses
    if (!iterationResult.agentHadContent) {
      consecutiveEmptyTurns++;
      if (consecutiveEmptyTurns >= 2) {
        log.warn(
          `[AgentLoop] ${consecutiveEmptyTurns} consecutive empty agent responses, stopping loop`,
        );
        return { reason: 'empty_turns', directorState, turnCount };
      }
    } else {
      consecutiveEmptyTurns = 0;
    }
  }

  // maxTurns reached
  if (turnCount >= maxTurns) {
    log.info(`[AgentLoop] Max turns (${maxTurns}) reached`);
  }
  return { reason: 'max_turns', directorState, turnCount };
}