| package stream
|
|
|
| import (
|
| "bufio"
|
| "crypto/rand"
|
| "encoding/hex"
|
| "encoding/json"
|
| "fmt"
|
| "io"
|
| "opus-api/internal/parser"
|
| "opus-api/internal/tokenizer"
|
| "opus-api/internal/types"
|
| "strings"
|
| )
|
|
|
|
|
| func TransformMorphToClaudeStream(morphStream io.Reader, model string, inputTokens int, writer io.Writer, onChunk func(string)) error {
|
| scanner := bufio.NewScanner(morphStream)
|
| scanner.Buffer(make([]byte, 64*1024), 1024*1024)
|
|
|
| messageID := "msg_" + generateUUID()
|
| hasStarted := false
|
| contentBlockStarted := false
|
| contentBlockClosed := false
|
| messageDeltaSent := false
|
| toolCallsEmitted := false
|
| fullText := ""
|
| contentBlockIndex := 0
|
| buffer := NewTextBuffer()
|
| nativeToolCalls := []types.ParsedToolCall{}
|
|
|
| emitSSE := func(event string, data interface{}) {
|
| sseData := FormatSSE(event, data)
|
| if onChunk != nil {
|
| onChunk(sseData)
|
| }
|
| writer.Write([]byte(sseData))
|
| }
|
|
|
| emitToolCall := func(toolCall types.ParsedToolCall) {
|
|
|
| if contentBlockStarted && !contentBlockClosed {
|
| emitSSE("content_block_stop", ContentBlockStopEvent{
|
| Type: "content_block_stop",
|
| Index: contentBlockIndex,
|
| })
|
| contentBlockClosed = true
|
| }
|
| contentBlockIndex++
|
|
|
| toolUseID := "toolu_" + generateShortUUID()
|
|
|
| emitSSE("content_block_start", ContentBlockStartEvent{
|
| Type: "content_block_start",
|
| Index: contentBlockIndex,
|
| ContentBlock: ToolUseContentBlock{
|
| Type: "tool_use",
|
| ID: toolUseID,
|
| Name: toolCall.Name,
|
| Input: toolCall.Input,
|
| },
|
| })
|
|
|
| emitSSE("content_block_delta", ContentBlockDeltaEvent{
|
| Type: "content_block_delta",
|
| Index: contentBlockIndex,
|
| Delta: InputJSONDelta{
|
| Type: "input_json_delta",
|
| PartialJSON: mustMarshalJSON(toolCall.Input),
|
| },
|
| })
|
|
|
| emitSSE("content_block_stop", ContentBlockStopEvent{
|
| Type: "content_block_stop",
|
| Index: contentBlockIndex,
|
| })
|
|
|
| contentBlockIndex++
|
| toolCallsEmitted = true
|
| }
|
|
|
| for scanner.Scan() {
|
| line := scanner.Text()
|
|
|
| if !strings.HasPrefix(line, "data: ") {
|
| continue
|
| }
|
|
|
| dataStr := strings.TrimPrefix(line, "data: ")
|
| dataStr = strings.TrimSpace(dataStr)
|
|
|
| if dataStr == "[DONE]" {
|
|
|
| outputTokens := tokenizer.CountTokens(fullText)
|
|
|
|
|
| if toolCallsEmitted {
|
| if !messageDeltaSent {
|
| emitSSE("message_delta", MessageDeltaEvent{
|
| Type: "message_delta",
|
| Delta: map[string]interface{}{
|
| "stop_reason": "tool_use",
|
| "stop_sequence": nil,
|
| },
|
| Usage: map[string]int{"output_tokens": outputTokens},
|
| })
|
| messageDeltaSent = true
|
| }
|
| emitSSE("message_stop", MessageStopEvent{Type: "message_stop"})
|
| continue
|
| }
|
|
|
|
|
| if len(nativeToolCalls) > 0 {
|
| for _, toolCall := range nativeToolCalls {
|
| emitToolCall(toolCall)
|
| }
|
| if !messageDeltaSent {
|
| emitSSE("message_delta", MessageDeltaEvent{
|
| Type: "message_delta",
|
| Delta: map[string]interface{}{
|
| "stop_reason": "tool_use",
|
| "stop_sequence": nil,
|
| },
|
| Usage: map[string]int{"output_tokens": outputTokens},
|
| })
|
| messageDeltaSent = true
|
| }
|
| emitSSE("message_stop", MessageStopEvent{Type: "message_stop"})
|
| continue
|
| }
|
|
|
|
|
| if !buffer.IsEmpty() {
|
| buffer.FlushAll(func(text string) {
|
| emitSSE("content_block_delta", ContentBlockDeltaEvent{
|
| Type: "content_block_delta",
|
| Index: contentBlockIndex,
|
| Delta: TextDelta{Type: "text_delta", Text: text},
|
| })
|
| })
|
| }
|
|
|
|
|
| if contentBlockStarted && !contentBlockClosed {
|
| emitSSE("content_block_stop", ContentBlockStopEvent{
|
| Type: "content_block_stop",
|
| Index: contentBlockIndex,
|
| })
|
| contentBlockClosed = true
|
| }
|
|
|
|
|
| if !messageDeltaSent {
|
| emitSSE("message_delta", MessageDeltaEvent{
|
| Type: "message_delta",
|
| Delta: map[string]interface{}{
|
| "stop_reason": "end_turn",
|
| "stop_sequence": nil,
|
| },
|
| Usage: map[string]int{"output_tokens": outputTokens},
|
| })
|
| messageDeltaSent = true
|
| }
|
|
|
| emitSSE("message_stop", MessageStopEvent{Type: "message_stop"})
|
| continue
|
| }
|
|
|
| var data map[string]interface{}
|
| if err := json.Unmarshal([]byte(dataStr), &data); err != nil {
|
| continue
|
| }
|
|
|
| dataType, _ := data["type"].(string)
|
|
|
| switch dataType {
|
| case "start":
|
| if !hasStarted {
|
| hasStarted = true
|
| emitSSE("message_start", MessageStartEvent{
|
| Type: "message_start",
|
| Message: MessageStart{
|
| ID: messageID,
|
| Type: "message",
|
| Role: "assistant",
|
| Content: []interface{}{},
|
| Model: model,
|
| StopReason: nil,
|
| StopSequence: nil,
|
| Usage: map[string]int{"input_tokens": inputTokens, "output_tokens": 0},
|
| },
|
| })
|
| }
|
|
|
| case "text-start":
|
| if !contentBlockStarted {
|
| contentBlockStarted = true
|
| contentBlockClosed = false
|
| emitSSE("content_block_start", ContentBlockStartEvent{
|
| Type: "content_block_start",
|
| Index: contentBlockIndex,
|
| ContentBlock: TextContentBlock{Type: "text", Text: ""},
|
| })
|
| } else if contentBlockClosed {
|
| contentBlockIndex++
|
| contentBlockClosed = false
|
| emitSSE("content_block_start", ContentBlockStartEvent{
|
| Type: "content_block_start",
|
| Index: contentBlockIndex,
|
| ContentBlock: TextContentBlock{Type: "text", Text: ""},
|
| })
|
| }
|
|
|
| case "text-delta":
|
| delta, _ := data["delta"].(string)
|
| fullText += delta
|
|
|
|
|
| if toolCallsEmitted {
|
| continue
|
| }
|
|
|
|
|
| if contentBlockClosed {
|
| contentBlockIndex++
|
| contentBlockClosed = false
|
| emitSSE("content_block_start", ContentBlockStartEvent{
|
| Type: "content_block_start",
|
| Index: contentBlockIndex,
|
| ContentBlock: TextContentBlock{Type: "text", Text: ""},
|
| })
|
| }
|
|
|
| buffer.Add(delta)
|
|
|
|
|
| for {
|
| result := parser.ParseNextToolCall(fullText)
|
| if !result.Found {
|
| break
|
| }
|
|
|
|
|
| textBefore := fullText[:strings.Index(fullText, "<invoke")]
|
| if textBefore != "" && !buffer.ToolCallDetected {
|
| emitSSE("content_block_delta", ContentBlockDeltaEvent{
|
| Type: "content_block_delta",
|
| Index: contentBlockIndex,
|
| Delta: TextDelta{Type: "text_delta", Text: textBefore},
|
| })
|
| }
|
| buffer.Clear()
|
| buffer.ToolCallDetected = true
|
|
|
|
|
| emitToolCall(*result.ToolCall)
|
|
|
|
|
| fullText = fullText[result.EndPosition:]
|
| }
|
|
|
|
|
| if parser.HasIncompleteToolCall(fullText) {
|
| buffer.ToolCallDetected = true
|
| buffer.Clear()
|
| } else if !buffer.ToolCallDetected {
|
|
|
| buffer.FlushSafeText(func(text string) {
|
| emitSSE("content_block_delta", ContentBlockDeltaEvent{
|
| Type: "content_block_delta",
|
| Index: contentBlockIndex,
|
| Delta: TextDelta{Type: "text_delta", Text: text},
|
| })
|
| })
|
| }
|
|
|
| case "text-end":
|
|
|
| result := parser.ParseToolCalls(fullText)
|
|
|
| if len(result.ToolCalls) == 0 {
|
|
|
| if !buffer.IsEmpty() {
|
| buffer.FlushAll(func(text string) {
|
| emitSSE("content_block_delta", ContentBlockDeltaEvent{
|
| Type: "content_block_delta",
|
| Index: contentBlockIndex,
|
| Delta: TextDelta{Type: "text_delta", Text: text},
|
| })
|
| })
|
| }
|
| }
|
|
|
| case "finish-step":
|
|
|
|
|
| result := parser.ParseToolCalls(fullText)
|
| if len(result.ToolCalls) > 0 && !toolCallsEmitted {
|
|
|
| if result.RemainingText != "" && !buffer.ToolCallDetected {
|
| emitSSE("content_block_delta", ContentBlockDeltaEvent{
|
| Type: "content_block_delta",
|
| Index: contentBlockIndex,
|
| Delta: TextDelta{Type: "text_delta", Text: result.RemainingText},
|
| })
|
| }
|
| buffer.Clear()
|
| buffer.ToolCallDetected = true
|
|
|
|
|
| for _, toolCall := range result.ToolCalls {
|
| emitToolCall(toolCall)
|
| }
|
| } else if !buffer.IsEmpty() && !buffer.ToolCallDetected {
|
|
|
| buffer.FlushAll(func(text string) {
|
| emitSSE("content_block_delta", ContentBlockDeltaEvent{
|
| Type: "content_block_delta",
|
| Index: contentBlockIndex,
|
| Delta: TextDelta{Type: "text_delta", Text: text},
|
| })
|
| })
|
| }
|
|
|
| case "start-step":
|
|
|
|
|
|
|
| case "finish":
|
| result := parser.ParseToolCalls(fullText)
|
|
|
| finishReason, _ := data["finishReason"].(string)
|
| if len(result.ToolCalls) == 0 && finishReason != "tool-calls" && !messageDeltaSent {
|
| stopReason := "end_turn"
|
| if finishReason != "" && finishReason != "stop" {
|
| stopReason = finishReason
|
| }
|
| outputTokens := tokenizer.CountTokens(fullText)
|
| emitSSE("message_delta", MessageDeltaEvent{
|
| Type: "message_delta",
|
| Delta: map[string]interface{}{
|
| "stop_reason": stopReason,
|
| "stop_sequence": nil,
|
| },
|
| Usage: map[string]int{"output_tokens": outputTokens},
|
| })
|
| messageDeltaSent = true
|
| }
|
|
|
| case "tool-input-error":
|
|
|
| toolName, _ := data["toolName"].(string)
|
| input, _ := data["input"].(map[string]interface{})
|
| if toolName != "" && input != nil {
|
| nativeToolCalls = append(nativeToolCalls, types.ParsedToolCall{
|
| Name: toolName,
|
| Input: input,
|
| })
|
| buffer.ToolCallDetected = true
|
| }
|
| }
|
| }
|
|
|
| if err := scanner.Err(); err != nil {
|
| return err
|
| }
|
|
|
| return nil
|
| }
|
|
|
| func generateUUID() string {
|
| bytes := make([]byte, 16)
|
| rand.Read(bytes)
|
| return fmt.Sprintf("%x-%x-%x-%x-%x", bytes[0:4], bytes[4:6], bytes[6:8], bytes[8:10], bytes[10:])
|
| }
|
|
|
| func generateShortUUID() string {
|
| bytes := make([]byte, 10)
|
| rand.Read(bytes)
|
| return hex.EncodeToString(bytes)
|
| }
|
|
|
| func mustMarshalJSON(v interface{}) string {
|
| bytes, _ := json.Marshal(v)
|
| return string(bytes)
|
| } |