Spaces:
Paused
Paused
| /** | |
| * Translate Codex Responses API SSE stream β Anthropic Messages API format. | |
| * | |
| * Codex SSE events: | |
| * response.created β extract response ID | |
| * response.reasoning_summary_text.delta β thinking block (if wantThinking) | |
| * response.output_text.delta β content_block_delta (text_delta) | |
| * response.completed β content_block_stop + message_delta + message_stop | |
| * | |
| * Non-streaming: collect all text, return Anthropic message response. | |
| */ | |
| import { randomUUID } from "crypto"; | |
| import type { CodexApi } from "../proxy/codex-api.js"; | |
| import type { | |
| AnthropicContentBlock, | |
| AnthropicMessagesResponse, | |
| AnthropicUsage, | |
| } from "../types/anthropic.js"; | |
| import { iterateCodexEvents, EmptyResponseError } from "./codex-event-extractor.js"; | |
| export interface AnthropicUsageInfo { | |
| input_tokens: number; | |
| output_tokens: number; | |
| cached_tokens?: number; | |
| reasoning_tokens?: number; | |
| } | |
| /** Format an Anthropic SSE event with named event type */ | |
| function formatSSE(eventType: string, data: unknown): string { | |
| return `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`; | |
| } | |
| /** | |
| * Stream Codex Responses API events as Anthropic Messages SSE. | |
| * Yields string chunks ready to write to the HTTP response. | |
| * | |
| * When wantThinking is true, reasoning summary deltas are emitted as | |
| * thinking content blocks before the text block. | |
| */ | |
| export async function* streamCodexToAnthropic( | |
| codexApi: CodexApi, | |
| rawResponse: Response, | |
| model: string, | |
| onUsage?: (usage: AnthropicUsageInfo) => void, | |
| onResponseId?: (id: string) => void, | |
| wantThinking?: boolean, | |
| ): AsyncGenerator<string> { | |
| const msgId = `msg_${randomUUID().replace(/-/g, "").slice(0, 24)}`; | |
| let outputTokens = 0; | |
| let inputTokens = 0; | |
| let cachedTokens: number | undefined; | |
| let hasToolCalls = false; | |
| let hasContent = false; | |
| let contentIndex = 0; | |
| let textBlockStarted = false; | |
| let thinkingBlockStarted = false; | |
| const callIdsWithDeltas = new Set<string>(); | |
| // Helper: close an open block and advance the index | |
| function* closeBlock(blockType: "thinking" | "text"): Generator<string> { | |
| yield formatSSE("content_block_stop", { | |
| type: "content_block_stop", | |
| index: contentIndex, | |
| }); | |
| contentIndex++; | |
| if (blockType === "thinking") thinkingBlockStarted = false; | |
| else textBlockStarted = false; | |
| } | |
| // Helper: ensure thinking block is closed before a non-thinking block | |
| function* closeThinkingIfOpen(): Generator<string> { | |
| if (thinkingBlockStarted) yield* closeBlock("thinking"); | |
| } | |
| // Helper: ensure text block is closed | |
| function* closeTextIfOpen(): Generator<string> { | |
| if (textBlockStarted) yield* closeBlock("text"); | |
| } | |
| // Helper: ensure a text block is open | |
| function* ensureTextBlock(): Generator<string> { | |
| if (!textBlockStarted) { | |
| yield formatSSE("content_block_start", { | |
| type: "content_block_start", | |
| index: contentIndex, | |
| content_block: { type: "text", text: "" }, | |
| }); | |
| textBlockStarted = true; | |
| } | |
| } | |
| // 1. message_start | |
| yield formatSSE("message_start", { | |
| type: "message_start", | |
| message: { | |
| id: msgId, | |
| type: "message", | |
| role: "assistant", | |
| content: [], | |
| model, | |
| stop_reason: null, | |
| stop_sequence: null, | |
| usage: { input_tokens: 0, output_tokens: 0 }, | |
| }, | |
| }); | |
| // Don't eagerly open a text block β wait for actual content so thinking can come first | |
| // 2. Process Codex stream events | |
| for await (const evt of iterateCodexEvents(codexApi, rawResponse)) { | |
| if (evt.responseId) onResponseId?.(evt.responseId); | |
| // Handle upstream error events | |
| if (evt.error) { | |
| yield* closeThinkingIfOpen(); | |
| yield* ensureTextBlock(); | |
| yield formatSSE("content_block_delta", { | |
| type: "content_block_delta", | |
| index: contentIndex, | |
| delta: { type: "text_delta", text: `[Error] ${evt.error.code}: ${evt.error.message}` }, | |
| }); | |
| yield* closeBlock("text"); | |
| yield formatSSE("error", { | |
| type: "error", | |
| error: { type: "api_error", message: `${evt.error.code}: ${evt.error.message}` }, | |
| }); | |
| yield formatSSE("message_stop", { type: "message_stop" }); | |
| return; | |
| } | |
| // Handle reasoning delta β thinking block (only if client wants thinking) | |
| if (evt.reasoningDelta && wantThinking) { | |
| hasContent = true; | |
| yield* closeTextIfOpen(); | |
| // Open thinking block if not already open | |
| if (!thinkingBlockStarted) { | |
| yield formatSSE("content_block_start", { | |
| type: "content_block_start", | |
| index: contentIndex, | |
| content_block: { type: "thinking", thinking: "" }, | |
| }); | |
| thinkingBlockStarted = true; | |
| } | |
| yield formatSSE("content_block_delta", { | |
| type: "content_block_delta", | |
| index: contentIndex, | |
| delta: { type: "thinking_delta", thinking: evt.reasoningDelta }, | |
| }); | |
| continue; | |
| } | |
| // Handle function call start β close open blocks, open tool_use block | |
| if (evt.functionCallStart) { | |
| hasToolCalls = true; | |
| hasContent = true; | |
| yield* closeThinkingIfOpen(); | |
| yield* closeTextIfOpen(); | |
| // Start tool_use block | |
| yield formatSSE("content_block_start", { | |
| type: "content_block_start", | |
| index: contentIndex, | |
| content_block: { | |
| type: "tool_use", | |
| id: evt.functionCallStart.callId, | |
| name: evt.functionCallStart.name, | |
| input: {}, | |
| }, | |
| }); | |
| continue; | |
| } | |
| if (evt.functionCallDelta) { | |
| callIdsWithDeltas.add(evt.functionCallDelta.callId); | |
| yield formatSSE("content_block_delta", { | |
| type: "content_block_delta", | |
| index: contentIndex, | |
| delta: { type: "input_json_delta", partial_json: evt.functionCallDelta.delta }, | |
| }); | |
| continue; | |
| } | |
| if (evt.functionCallDone) { | |
| // Emit full arguments if no deltas were streamed | |
| if (!callIdsWithDeltas.has(evt.functionCallDone.callId)) { | |
| yield formatSSE("content_block_delta", { | |
| type: "content_block_delta", | |
| index: contentIndex, | |
| delta: { type: "input_json_delta", partial_json: evt.functionCallDone.arguments }, | |
| }); | |
| } | |
| // Close this tool_use block | |
| yield formatSSE("content_block_stop", { | |
| type: "content_block_stop", | |
| index: contentIndex, | |
| }); | |
| contentIndex++; | |
| continue; | |
| } | |
| switch (evt.typed.type) { | |
| case "response.output_text.delta": { | |
| if (evt.textDelta) { | |
| hasContent = true; | |
| // Close thinking block if open (transition from thinking β text) | |
| yield* closeThinkingIfOpen(); | |
| // Open a text block if not already open | |
| yield* ensureTextBlock(); | |
| yield formatSSE("content_block_delta", { | |
| type: "content_block_delta", | |
| index: contentIndex, | |
| delta: { type: "text_delta", text: evt.textDelta }, | |
| }); | |
| } | |
| break; | |
| } | |
| case "response.completed": { | |
| if (evt.usage) { | |
| inputTokens = evt.usage.input_tokens; | |
| outputTokens = evt.usage.output_tokens; | |
| cachedTokens = evt.usage.cached_tokens; | |
| onUsage?.({ input_tokens: inputTokens, output_tokens: outputTokens, cached_tokens: cachedTokens, reasoning_tokens: evt.usage.reasoning_tokens }); | |
| } | |
| // Inject error text if stream completed with no content | |
| if (!hasContent) { | |
| yield* ensureTextBlock(); | |
| yield formatSSE("content_block_delta", { | |
| type: "content_block_delta", | |
| index: contentIndex, | |
| delta: { type: "text_delta", text: "[Error] Codex returned an empty response. Please retry." }, | |
| }); | |
| } | |
| break; | |
| } | |
| } | |
| } | |
| // 3. Close any open blocks | |
| yield* closeThinkingIfOpen(); | |
| yield* closeTextIfOpen(); | |
| // 4. message_delta with stop_reason and usage | |
| yield formatSSE("message_delta", { | |
| type: "message_delta", | |
| delta: { stop_reason: hasToolCalls ? "tool_use" : "end_turn" }, | |
| usage: { | |
| input_tokens: inputTokens, | |
| output_tokens: outputTokens, | |
| ...(cachedTokens != null ? { cache_read_input_tokens: cachedTokens } : {}), | |
| }, | |
| }); | |
| // 5. message_stop | |
| yield formatSSE("message_stop", { | |
| type: "message_stop", | |
| }); | |
| } | |
| /** | |
| * Consume a Codex Responses SSE stream and build a non-streaming | |
| * Anthropic Messages response. | |
| */ | |
| export async function collectCodexToAnthropicResponse( | |
| codexApi: CodexApi, | |
| rawResponse: Response, | |
| model: string, | |
| wantThinking?: boolean, | |
| ): Promise<{ | |
| response: AnthropicMessagesResponse; | |
| usage: AnthropicUsageInfo; | |
| responseId: string | null; | |
| }> { | |
| const id = `msg_${randomUUID().replace(/-/g, "").slice(0, 24)}`; | |
| let fullText = ""; | |
| let fullReasoning = ""; | |
| let inputTokens = 0; | |
| let outputTokens = 0; | |
| let cachedTokens: number | undefined; | |
| let responseId: string | null = null; | |
| // Collect tool calls | |
| const toolUseBlocks: AnthropicContentBlock[] = []; | |
| for await (const evt of iterateCodexEvents(codexApi, rawResponse)) { | |
| if (evt.responseId) responseId = evt.responseId; | |
| if (evt.error) { | |
| throw new Error(`Codex API error: ${evt.error.code}: ${evt.error.message}`); | |
| } | |
| if (evt.textDelta) fullText += evt.textDelta; | |
| if (evt.reasoningDelta) fullReasoning += evt.reasoningDelta; | |
| if (evt.usage) { | |
| inputTokens = evt.usage.input_tokens; | |
| outputTokens = evt.usage.output_tokens; | |
| cachedTokens = evt.usage.cached_tokens; | |
| } | |
| if (evt.functionCallDone) { | |
| let parsedInput: Record<string, unknown> = {}; | |
| try { | |
| parsedInput = JSON.parse(evt.functionCallDone.arguments) as Record<string, unknown>; | |
| } catch { /* use empty object */ } | |
| toolUseBlocks.push({ | |
| type: "tool_use", | |
| id: evt.functionCallDone.callId, | |
| name: evt.functionCallDone.name, | |
| input: parsedInput, | |
| }); | |
| } | |
| } | |
| // Detect empty response (HTTP 200 but no content) | |
| if (!fullText && toolUseBlocks.length === 0 && outputTokens === 0) { | |
| throw new EmptyResponseError(responseId, { input_tokens: inputTokens, output_tokens: outputTokens }); | |
| } | |
| const hasToolCalls = toolUseBlocks.length > 0; | |
| const content: AnthropicContentBlock[] = []; | |
| // Thinking block comes first if requested and available | |
| if (wantThinking && fullReasoning) { | |
| content.push({ type: "thinking", thinking: fullReasoning }); | |
| } | |
| if (fullText) { | |
| content.push({ type: "text", text: fullText }); | |
| } | |
| content.push(...toolUseBlocks); | |
| // Ensure at least one content block | |
| if (content.length === 0) { | |
| content.push({ type: "text", text: "" }); | |
| } | |
| const usage: AnthropicUsage = { | |
| input_tokens: inputTokens, | |
| output_tokens: outputTokens, | |
| ...(cachedTokens != null ? { cache_read_input_tokens: cachedTokens } : {}), | |
| }; | |
| return { | |
| response: { | |
| id, | |
| type: "message", | |
| role: "assistant", | |
| content, | |
| model, | |
| stop_reason: hasToolCalls ? "tool_use" : "end_turn", | |
| stop_sequence: null, | |
| usage, | |
| }, | |
| usage, | |
| responseId, | |
| }; | |
| } | |