/** * Stateless Multi-Agent Generation * * Single-pass generation with structured JSON Array output format: * [{"type":"action","name":"...","params":{...}},{"type":"text","content":"natural speech"},...] * * Key design decisions: * - Backend is stateless (all state in request/response) * - Single generation pass (no generate/tool/loop) * - Text is natural teacher speech, NOT meta-commentary * - Tool calls are silent actions - students see results only * - Action and text objects can freely interleave in the array * - Uses partial-json for robust streaming of incomplete JSON * * Multi-agent orchestration: * - When multiple agents are configured, a director agent decides who speaks * - Uses LangGraph StateGraph for the orchestration loop * - Events are streamed via LangGraph's custom stream mode */ import type { LanguageModel } from 'ai'; import type { StatelessChatRequest, StatelessEvent, ParsedAction } from '@/lib/types/chat'; import type { ThinkingConfig } from '@/lib/types/provider'; import type { WhiteboardActionRecord } from './types'; import { createOrchestrationGraph, buildInitialState } from './director-graph'; import { parse as parsePartialJson, Allow } from 'partial-json'; import { jsonrepair } from 'jsonrepair'; import { createLogger } from '@/lib/logger'; const log = createLogger('StatelessGenerate'); // ==================== Structured Output Parser ==================== /** * Parser state for incremental JSON Array parsing. * * Accumulates raw text from the LLM stream. Once the opening `[` is found, * uses `partial-json` to incrementally parse the growing array. Emits new * complete items as they appear, and streams partial text content deltas * for the last (potentially incomplete) text item. */ interface ParserState { /** Accumulated raw text from the LLM */ buffer: string; /** Whether we've found the opening `[` */ jsonStarted: boolean; /** Number of fully processed (emitted) items */ lastParsedItemCount: number; /** Length of text content already emitted for the trailing partial text item */ lastPartialTextLength: number; /** Whether parsing is complete (closing `]` found) */ isDone: boolean; } /** * Create initial parser state */ export function createParserState(): ParserState { return { buffer: '', jsonStarted: false, lastParsedItemCount: 0, lastPartialTextLength: 0, isDone: false, }; } /** * Result from parsing a chunk */ export interface ParseResult { textChunks: string[]; actions: ParsedAction[]; isDone: boolean; /** Ordered sequence recording original interleaving of text and action segments */ ordered: Array<{ type: 'text'; index: number } | { type: 'action'; index: number }>; } /** * Emit a single parsed item into the result, returning updated segment indices. */ function emitItem( item: Record, result: ParseResult, textSegmentIndex: number, actionSegmentIndex: number, ): { textSegmentIndex: number; actionSegmentIndex: number } { if (item.type === 'text') { const content = (item.content as string) || ''; if (content) { result.textChunks.push(content); // Use per-call array index (not cumulative segment index) so that // director-graph can read result.textChunks[entry.index] correctly. result.ordered.push({ type: 'text', index: result.textChunks.length - 1, }); return { textSegmentIndex: textSegmentIndex + 1, actionSegmentIndex }; } } else if (item.type === 'action') { // Support both new format (name/params) and legacy format (tool_name/parameters) const action: ParsedAction = { actionId: (item.action_id as string) || `action-${Date.now()}-${Math.random().toString(36).slice(2)}`, actionName: (item.name || item.tool_name) as string, params: (item.params || item.parameters || {}) as Record, }; result.actions.push(action); // Use per-call array index (not cumulative segment index) so that // director-graph can read result.actions[entry.index] correctly. result.ordered.push({ type: 'action', index: result.actions.length - 1 }); return { textSegmentIndex, actionSegmentIndex: actionSegmentIndex + 1 }; } return { textSegmentIndex, actionSegmentIndex }; } /** * Parse streaming chunks of structured JSON Array output. * * The LLM is expected to produce a JSON array like: * [{"type":"action","name":"spotlight","params":{"elementId":"img_1"}}, * {"type":"text","content":"Hello students..."},...] * * This parser: * 1. Accumulates chunks into a buffer * 2. Skips any prefix before `[` (e.g. ```json\n, explanatory text) * 3. Uses partial-json to incrementally parse the growing array * 4. Emits new complete items (action→toolCall, text→textChunk) * 5. For the trailing incomplete text item, emits content deltas for streaming * 6. Marks done when the buffer contains the closing `]` * * @param chunk - New chunk of text to parse * @param state - Current parser state (mutated in place) * @returns Parsed text chunks and tool calls from this chunk */ export function parseStructuredChunk(chunk: string, state: ParserState): ParseResult { const result: ParseResult = { textChunks: [], actions: [], isDone: false, ordered: [], }; if (state.isDone) { return result; } state.buffer += chunk; // Step 1: Find the opening `[` if not yet found if (!state.jsonStarted) { const bracketIndex = state.buffer.indexOf('['); if (bracketIndex === -1) { return result; } // Trim everything before `[` (markdown fences, explanatory text, etc.) state.buffer = state.buffer.slice(bracketIndex); state.jsonStarted = true; } // Step 2: Check if the array is complete (closing `]` found) const trimmed = state.buffer.trimEnd(); const isArrayClosed = trimmed.endsWith(']') && trimmed.length > 1; // Step 3: Try incremental parse — jsonrepair first (fixes unescaped quotes), fallback to partial-json // eslint-disable-next-line @typescript-eslint/no-explicit-any -- partial-json returns any[] let parsed: any[]; try { const repaired = jsonrepair(state.buffer); parsed = JSON.parse(repaired); } catch { try { parsed = parsePartialJson( state.buffer, Allow.ARR | Allow.OBJ | Allow.STR | Allow.NUM | Allow.BOOL | Allow.NULL, ); } catch { return result; } } if (!Array.isArray(parsed)) { return result; } // Step 4: Determine how many items are fully complete // When the array is closed, all items are complete. // When still streaming, items [0..N-2] are complete; item [N-1] may be partial. const completeUpTo = isArrayClosed ? parsed.length : Math.max(0, parsed.length - 1); // Count segment indices for items already emitted let textSegmentIndex = 0; let actionSegmentIndex = 0; for (let i = 0; i < state.lastParsedItemCount && i < parsed.length; i++) { const item = parsed[i]; if (item?.type === 'text') textSegmentIndex++; else if (item?.type === 'action') actionSegmentIndex++; } // Step 5: Emit newly completed items for (let i = state.lastParsedItemCount; i < completeUpTo; i++) { const item = parsed[i]; if (!item || typeof item !== 'object') continue; // If this item was previously the trailing partial text item, we've already // streamed its content incrementally. Only emit the remaining delta, not the full content. if ( i === state.lastParsedItemCount && state.lastPartialTextLength > 0 && item.type === 'text' ) { const content = item.content || ''; const remaining = content.slice(state.lastPartialTextLength); if (remaining) { result.textChunks.push(remaining); // Only push ordered entry when there is actual content to emit result.ordered.push({ type: 'text', index: result.textChunks.length - 1, }); } textSegmentIndex++; state.lastPartialTextLength = 0; continue; } const indices = emitItem(item, result, textSegmentIndex, actionSegmentIndex); textSegmentIndex = indices.textSegmentIndex; actionSegmentIndex = indices.actionSegmentIndex; } state.lastParsedItemCount = completeUpTo; // Step 6: Stream partial text delta for the trailing item if (!isArrayClosed && parsed.length > completeUpTo) { const lastItem = parsed[parsed.length - 1]; if (lastItem && typeof lastItem === 'object' && lastItem.type === 'text') { const content = lastItem.content || ''; if (content.length > state.lastPartialTextLength) { result.textChunks.push(content.slice(state.lastPartialTextLength)); state.lastPartialTextLength = content.length; } } } // Step 7: Mark done if array is closed if (isArrayClosed) { state.isDone = true; result.isDone = true; state.lastParsedItemCount = parsed.length; state.lastPartialTextLength = 0; } return result; } /** * Finalize parsing after the stream ends. * * Handles the case where the model never produced a valid JSON array — * e.g. it output plain text instead of the expected `[...]` format. * Emits whatever content is in the buffer as a single text item so the * frontend can still display something rather than showing nothing. */ export function finalizeParser(state: ParserState): ParseResult { const result: ParseResult = { textChunks: [], actions: [], isDone: true, ordered: [], }; if (state.isDone) { return result; } const content = state.buffer.trim(); if (!content) { return result; } if (!state.jsonStarted) { // Model never output `[` — treat entire buffer as plain text result.textChunks.push(content); result.ordered.push({ type: 'text', index: 0 }); } else { // JSON started but never closed — try one final parse const finalChunk = parseStructuredChunk('', state); result.textChunks.push(...finalChunk.textChunks); result.actions.push(...finalChunk.actions); result.ordered.push(...finalChunk.ordered); // If final parse yielded nothing, emit raw text after `[` as fallback if (result.textChunks.length === 0 && result.actions.length === 0) { const bracketIndex = content.indexOf('['); const raw = content.slice(bracketIndex + 1).trim(); if (raw) { result.textChunks.push(raw); result.ordered.push({ type: 'text', index: 0 }); } } } state.isDone = true; return result; } // ==================== Main Generation Function ==================== /** * Stateless generation with streaming via LangGraph orchestration * * @param request - The chat request with full state * @param abortSignal - Signal for cancellation * @yields StatelessEvent objects for streaming */ export async function* statelessGenerate( request: StatelessChatRequest, abortSignal: AbortSignal, languageModel: LanguageModel, thinkingConfig?: ThinkingConfig, ): AsyncGenerator { log.info( `[StatelessGenerate] Starting orchestration for agents: ${request.config.agentIds.join(', ')}`, ); log.info( `[StatelessGenerate] Message count: ${request.messages.length}, turnCount: ${request.directorState?.turnCount ?? 0}`, ); try { const graph = createOrchestrationGraph(); const initialState = buildInitialState(request, languageModel, thinkingConfig); const stream = await graph.stream(initialState, { // eslint-disable-next-line @typescript-eslint/no-explicit-any streamMode: 'custom' as any, signal: abortSignal, }); let totalActions = 0; let totalAgents = 0; // Tracks whether the agent dispatched in this turn produced any text or actions. // Each statelessGenerate call handles exactly one agent turn (client loops externally). let agentHadContent = false; // Track current agent turn to build updated directorState let currentAgentId: string | null = null; let currentAgentName: string | null = null; let contentPreview = ''; let agentActionCount = 0; const agentWbActions: WhiteboardActionRecord[] = []; for await (const chunk of stream) { const event = chunk as StatelessEvent; if (event.type === 'agent_start') { totalAgents++; currentAgentId = event.data.agentId; currentAgentName = event.data.agentName; contentPreview = ''; agentActionCount = 0; agentWbActions.length = 0; } if (event.type === 'text_delta' && contentPreview.length < 100) { contentPreview = (contentPreview + event.data.content).slice(0, 100); agentHadContent = true; } if (event.type === 'action') { totalActions++; agentActionCount++; agentHadContent = true; if (event.data.actionName.startsWith('wb_')) { agentWbActions.push({ actionName: event.data.actionName as WhiteboardActionRecord['actionName'], agentId: event.data.agentId, agentName: currentAgentName || event.data.agentId, params: event.data.params, }); } } yield event; } // Build updated directorState from incoming state + this turn's data const incoming = request.directorState; const prevResponses = incoming?.agentResponses ?? []; const prevLedger = incoming?.whiteboardLedger ?? []; const prevTurnCount = incoming?.turnCount ?? 0; const directorState = totalAgents > 0 ? { turnCount: prevTurnCount + 1, agentResponses: [ ...prevResponses, { agentId: currentAgentId!, agentName: currentAgentName || currentAgentId!, contentPreview, actionCount: agentActionCount, whiteboardActions: [...agentWbActions], }, ], whiteboardLedger: [...prevLedger, ...agentWbActions], } : { turnCount: prevTurnCount, agentResponses: prevResponses, whiteboardLedger: prevLedger, }; yield { type: 'done', data: { totalActions, totalAgents, agentHadContent, directorState }, }; log.info( `[StatelessGenerate] Completed. Agents: ${totalAgents}, Actions: ${totalActions}, hadContent: ${agentHadContent}, turnCount: ${directorState.turnCount}`, ); } catch (error) { if (error instanceof Error && error.name === 'AbortError') { yield { type: 'error', data: { message: 'Request interrupted' } }; } else { log.error('[StatelessGenerate] Error:', error); yield { type: 'error', data: { message: error instanceof Error ? error.message : String(error), }, }; } } }