diff --git a/packages/ai/src/providers/azure-openai-responses.ts b/packages/ai/src/providers/azure-openai-responses.ts index 185e19e8..7b503079 100644 --- a/packages/ai/src/providers/azure-openai-responses.ts +++ b/packages/ai/src/providers/azure-openai-responses.ts @@ -1,51 +1,12 @@ -import type OpenAI from "openai"; import { AzureOpenAI } from "openai"; -import type { - Tool as OpenAITool, - ResponseCreateParamsStreaming, - ResponseFunctionToolCall, - ResponseInput, - ResponseInputContent, - ResponseInputImage, - ResponseInputText, - ResponseOutputMessage, - ResponseReasoningItem, -} from "openai/resources/responses/responses.js"; -import { calculateCost } from "../models.js"; +import type { ResponseCreateParamsStreaming } from "openai/resources/responses/responses.js"; import { getEnvApiKey } from "../stream.js"; -import type { - Api, - AssistantMessage, - Context, - Model, - StopReason, - StreamFunction, - StreamOptions, - TextContent, - ThinkingContent, - Tool, - ToolCall, -} from "../types.js"; +import type { Api, AssistantMessage, Context, Model, StreamFunction, StreamOptions } from "../types.js"; import { AssistantMessageEventStream } from "../utils/event-stream.js"; -import { parseStreamingJson } from "../utils/json-parse.js"; -import { sanitizeSurrogates } from "../utils/sanitize-unicode.js"; -import { transformMessages } from "./transform-messages.js"; - -/** Fast deterministic hash to shorten long strings */ -function shortHash(str: string): string { - let h1 = 0xdeadbeef; - let h2 = 0x41c6ce57; - for (let i = 0; i < str.length; i++) { - const ch = str.charCodeAt(i); - h1 = Math.imul(h1 ^ ch, 2654435761); - h2 = Math.imul(h2 ^ ch, 1597334677); - } - h1 = Math.imul(h1 ^ (h1 >>> 16), 2246822507) ^ Math.imul(h2 ^ (h2 >>> 13), 3266489909); - h2 = Math.imul(h2 ^ (h2 >>> 16), 2246822507) ^ Math.imul(h1 ^ (h1 >>> 13), 3266489909); - return (h2 >>> 0).toString(36) + (h1 >>> 0).toString(36); -} +import { convertResponsesMessages, convertResponsesTools, processResponsesStream } from "./openai-responses-shared.js"; const DEFAULT_AZURE_API_VERSION = "v1"; +const AZURE_TOOL_CALL_PROVIDERS = new Set(["openai", "openai-codex", "opencode", "azure-openai-responses"]); function parseDeploymentNameMap(value: string | undefined): Map { const map = new Map(); @@ -122,213 +83,7 @@ export const streamAzureOpenAIResponses: StreamFunction<"azure-openai-responses" ); stream.push({ type: "start", partial: output }); - let currentItem: ResponseReasoningItem | ResponseOutputMessage | ResponseFunctionToolCall | null = null; - let currentBlock: ThinkingContent | TextContent | (ToolCall & { partialJson: string }) | null = null; - const blocks = output.content; - const blockIndex = () => blocks.length - 1; - - for await (const event of openaiStream) { - // Handle output item start - if (event.type === "response.output_item.added") { - const item = event.item; - if (item.type === "reasoning") { - currentItem = item; - currentBlock = { type: "thinking", thinking: "" }; - output.content.push(currentBlock); - stream.push({ type: "thinking_start", contentIndex: blockIndex(), partial: output }); - } else if (item.type === "message") { - currentItem = item; - currentBlock = { type: "text", text: "" }; - output.content.push(currentBlock); - stream.push({ type: "text_start", contentIndex: blockIndex(), partial: output }); - } else if (item.type === "function_call") { - currentItem = item; - currentBlock = { - type: "toolCall", - id: `${item.call_id}|${item.id}`, - name: item.name, - arguments: {}, - partialJson: item.arguments || "", - }; - output.content.push(currentBlock); - stream.push({ type: "toolcall_start", contentIndex: blockIndex(), partial: output }); - } - } - // Handle reasoning summary deltas - else if (event.type === "response.reasoning_summary_part.added") { - if (currentItem && currentItem.type === "reasoning") { - currentItem.summary = currentItem.summary || []; - currentItem.summary.push(event.part); - } - } else if (event.type === "response.reasoning_summary_text.delta") { - if ( - currentItem && - currentItem.type === "reasoning" && - currentBlock && - currentBlock.type === "thinking" - ) { - currentItem.summary = currentItem.summary || []; - const lastPart = currentItem.summary[currentItem.summary.length - 1]; - if (lastPart) { - currentBlock.thinking += event.delta; - lastPart.text += event.delta; - stream.push({ - type: "thinking_delta", - contentIndex: blockIndex(), - delta: event.delta, - partial: output, - }); - } - } - } - // Add a new line between summary parts (hack...) - else if (event.type === "response.reasoning_summary_part.done") { - if ( - currentItem && - currentItem.type === "reasoning" && - currentBlock && - currentBlock.type === "thinking" - ) { - currentItem.summary = currentItem.summary || []; - const lastPart = currentItem.summary[currentItem.summary.length - 1]; - if (lastPart) { - currentBlock.thinking += "\n\n"; - lastPart.text += "\n\n"; - stream.push({ - type: "thinking_delta", - contentIndex: blockIndex(), - delta: "\n\n", - partial: output, - }); - } - } - } - // Handle text output deltas - else if (event.type === "response.content_part.added") { - if (currentItem && currentItem.type === "message") { - currentItem.content = currentItem.content || []; - // Filter out ReasoningText, only accept output_text and refusal - if (event.part.type === "output_text" || event.part.type === "refusal") { - currentItem.content.push(event.part); - } - } - } else if (event.type === "response.output_text.delta") { - if (currentItem && currentItem.type === "message" && currentBlock && currentBlock.type === "text") { - if (!currentItem.content || currentItem.content.length === 0) { - continue; - } - const lastPart = currentItem.content[currentItem.content.length - 1]; - if (lastPart && lastPart.type === "output_text") { - currentBlock.text += event.delta; - lastPart.text += event.delta; - stream.push({ - type: "text_delta", - contentIndex: blockIndex(), - delta: event.delta, - partial: output, - }); - } - } - } else if (event.type === "response.refusal.delta") { - if (currentItem && currentItem.type === "message" && currentBlock && currentBlock.type === "text") { - if (!currentItem.content || currentItem.content.length === 0) { - continue; - } - const lastPart = currentItem.content[currentItem.content.length - 1]; - if (lastPart && lastPart.type === "refusal") { - currentBlock.text += event.delta; - lastPart.refusal += event.delta; - stream.push({ - type: "text_delta", - contentIndex: blockIndex(), - delta: event.delta, - partial: output, - }); - } - } - } - // Handle function call argument deltas - else if (event.type === "response.function_call_arguments.delta") { - if ( - currentItem && - currentItem.type === "function_call" && - currentBlock && - currentBlock.type === "toolCall" - ) { - currentBlock.partialJson += event.delta; - currentBlock.arguments = parseStreamingJson(currentBlock.partialJson); - stream.push({ - type: "toolcall_delta", - contentIndex: blockIndex(), - delta: event.delta, - partial: output, - }); - } - } - // Handle output item completion - else if (event.type === "response.output_item.done") { - const item = event.item; - - if (item.type === "reasoning" && currentBlock && currentBlock.type === "thinking") { - currentBlock.thinking = item.summary?.map((s) => s.text).join("\n\n") || ""; - currentBlock.thinkingSignature = JSON.stringify(item); - stream.push({ - type: "thinking_end", - contentIndex: blockIndex(), - content: currentBlock.thinking, - partial: output, - }); - currentBlock = null; - } else if (item.type === "message" && currentBlock && currentBlock.type === "text") { - currentBlock.text = item.content.map((c) => (c.type === "output_text" ? c.text : c.refusal)).join(""); - currentBlock.textSignature = item.id; - stream.push({ - type: "text_end", - contentIndex: blockIndex(), - content: currentBlock.text, - partial: output, - }); - currentBlock = null; - } else if (item.type === "function_call") { - const toolCall: ToolCall = { - type: "toolCall", - id: `${item.call_id}|${item.id}`, - name: item.name, - arguments: JSON.parse(item.arguments), - }; - - stream.push({ type: "toolcall_end", contentIndex: blockIndex(), toolCall, partial: output }); - } - } - // Handle completion - else if (event.type === "response.completed") { - const response = event.response; - if (response?.usage) { - const cachedTokens = response.usage.input_tokens_details?.cached_tokens || 0; - output.usage = { - // OpenAI includes cached tokens in input_tokens, so subtract to get non-cached input - input: (response.usage.input_tokens || 0) - cachedTokens, - output: response.usage.output_tokens || 0, - cacheRead: cachedTokens, - cacheWrite: 0, - totalTokens: response.usage.total_tokens || 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }; - } - calculateCost(model, output.usage); - // Map status to stop reason - output.stopReason = mapStopReason(response?.status); - if (output.content.some((b) => b.type === "toolCall") && output.stopReason === "stop") { - output.stopReason = "toolUse"; - } - } - // Handle errors - else if (event.type === "error") { - throw new Error(`Error Code ${event.code}: ${event.message}` || "Unknown error"); - } else if (event.type === "response.failed") { - throw new Error("Unknown error"); - } - } + await processResponsesStream(openaiStream, output, stream, model); if (options?.signal?.aborted) { throw new Error("Request was aborted"); @@ -341,7 +96,7 @@ export const streamAzureOpenAIResponses: StreamFunction<"azure-openai-responses" stream.push({ type: "done", reason: output.stopReason, message: output }); stream.end(); } catch (error) { - for (const block of output.content) delete (block as any).index; + for (const block of output.content) delete (block as { index?: number }).index; output.stopReason = options?.signal?.aborted ? "aborted" : "error"; output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error); stream.push({ type: "error", reason: output.stopReason, error: output }); @@ -424,7 +179,7 @@ function buildParams( options: AzureOpenAIResponsesOptions | undefined, deploymentName: string, ) { - const messages = convertMessages(model, context); + const messages = convertResponsesMessages(model, context, AZURE_TOOL_CALL_PROVIDERS); const params: ResponseCreateParamsStreaming = { model: deploymentName, @@ -442,7 +197,7 @@ function buildParams( } if (context.tools) { - params.tools = convertTools(context.tools); + params.tools = convertResponsesTools(context.tools); } if (model.reasoning) { @@ -470,182 +225,3 @@ function buildParams( return params; } - -function convertMessages(model: Model<"azure-openai-responses">, context: Context): ResponseInput { - const messages: ResponseInput = []; - - const normalizeToolCallId = (id: string): string => { - const allowedProviders = new Set(["openai", "openai-codex", "opencode", "azure-openai-responses"]); - if (!allowedProviders.has(model.provider)) return id; - if (!id.includes("|")) return id; - const [callId, itemId] = id.split("|"); - const sanitizedCallId = callId.replace(/[^a-zA-Z0-9_-]/g, "_"); - let sanitizedItemId = itemId.replace(/[^a-zA-Z0-9_-]/g, "_"); - // OpenAI Responses API requires item id to start with "fc" - if (!sanitizedItemId.startsWith("fc")) { - sanitizedItemId = `fc_${sanitizedItemId}`; - } - const normalizedCallId = sanitizedCallId.length > 64 ? sanitizedCallId.slice(0, 64) : sanitizedCallId; - const normalizedItemId = sanitizedItemId.length > 64 ? sanitizedItemId.slice(0, 64) : sanitizedItemId; - return `${normalizedCallId}|${normalizedItemId}`; - }; - - const transformedMessages = transformMessages(context.messages, model, normalizeToolCallId); - - if (context.systemPrompt) { - const role = model.reasoning ? "developer" : "system"; - messages.push({ - role, - content: sanitizeSurrogates(context.systemPrompt), - }); - } - - let msgIndex = 0; - for (const msg of transformedMessages) { - if (msg.role === "user") { - if (typeof msg.content === "string") { - messages.push({ - role: "user", - content: [{ type: "input_text", text: sanitizeSurrogates(msg.content) }], - }); - } else { - const content: ResponseInputContent[] = msg.content.map((item): ResponseInputContent => { - if (item.type === "text") { - return { - type: "input_text", - text: sanitizeSurrogates(item.text), - } satisfies ResponseInputText; - } else { - return { - type: "input_image", - detail: "auto", - image_url: `data:${item.mimeType};base64,${item.data}`, - } satisfies ResponseInputImage; - } - }); - const filteredContent = !model.input.includes("image") - ? content.filter((c) => c.type !== "input_image") - : content; - if (filteredContent.length === 0) continue; - messages.push({ - role: "user", - content: filteredContent, - }); - } - } else if (msg.role === "assistant") { - const output: ResponseInput = []; - - for (const block of msg.content) { - if (block.type === "thinking") { - if (block.thinkingSignature) { - const reasoningItem = JSON.parse(block.thinkingSignature); - output.push(reasoningItem); - } - } else if (block.type === "text") { - const textBlock = block as TextContent; - // OpenAI requires id to be max 64 characters - let msgId = textBlock.textSignature; - if (!msgId) { - msgId = `msg_${msgIndex}`; - } else if (msgId.length > 64) { - msgId = `msg_${shortHash(msgId)}`; - } - output.push({ - type: "message", - role: "assistant", - content: [{ type: "output_text", text: sanitizeSurrogates(textBlock.text), annotations: [] }], - status: "completed", - id: msgId, - } satisfies ResponseOutputMessage); - } else if (block.type === "toolCall") { - const toolCall = block as ToolCall; - output.push({ - type: "function_call", - id: toolCall.id.split("|")[1], - call_id: toolCall.id.split("|")[0], - name: toolCall.name, - arguments: JSON.stringify(toolCall.arguments), - }); - } - } - if (output.length === 0) continue; - messages.push(...output); - } else if (msg.role === "toolResult") { - // Extract text and image content - const textResult = msg.content - .filter((c) => c.type === "text") - .map((c) => (c as any).text) - .join("\n"); - const hasImages = msg.content.some((c) => c.type === "image"); - - // Always send function_call_output with text (or placeholder if only images) - const hasText = textResult.length > 0; - messages.push({ - type: "function_call_output", - call_id: msg.toolCallId.split("|")[0], - output: sanitizeSurrogates(hasText ? textResult : "(see attached image)"), - }); - - // If there are images and model supports them, send a follow-up user message with images - if (hasImages && model.input.includes("image")) { - const contentParts: ResponseInputContent[] = []; - - // Add text prefix - contentParts.push({ - type: "input_text", - text: "Attached image(s) from tool result:", - } satisfies ResponseInputText); - - // Add images - for (const block of msg.content) { - if (block.type === "image") { - contentParts.push({ - type: "input_image", - detail: "auto", - image_url: `data:${(block as any).mimeType};base64,${(block as any).data}`, - } satisfies ResponseInputImage); - } - } - - messages.push({ - role: "user", - content: contentParts, - }); - } - } - msgIndex++; - } - - return messages; -} - -function convertTools(tools: Tool[]): OpenAITool[] { - return tools.map((tool) => ({ - type: "function", - name: tool.name, - description: tool.description, - parameters: tool.parameters as any, // TypeBox already generates JSON Schema - strict: false, - })); -} - -function mapStopReason(status: OpenAI.Responses.ResponseStatus | undefined): StopReason { - if (!status) return "stop"; - switch (status) { - case "completed": - return "stop"; - case "incomplete": - return "length"; - case "failed": - case "cancelled": - return "error"; - // These two are wonky ... - case "in_progress": - case "queued": - return "stop"; - default: { - const _exhaustive: never = status; - throw new Error(`Unhandled stop reason: ${_exhaustive}`); - } - } -} diff --git a/packages/ai/src/providers/openai-responses-shared.ts b/packages/ai/src/providers/openai-responses-shared.ts new file mode 100644 index 00000000..52d8c745 --- /dev/null +++ b/packages/ai/src/providers/openai-responses-shared.ts @@ -0,0 +1,426 @@ +import type OpenAI from "openai"; +import type { + Tool as OpenAITool, + ResponseCreateParamsStreaming, + ResponseFunctionToolCall, + ResponseInput, + ResponseInputContent, + ResponseInputImage, + ResponseInputText, + ResponseOutputMessage, + ResponseReasoningItem, + ResponseStreamEvent, +} from "openai/resources/responses/responses.js"; +import { calculateCost } from "../models.js"; +import type { + Api, + AssistantMessage, + Context, + ImageContent, + Model, + StopReason, + TextContent, + ThinkingContent, + Tool, + ToolCall, + Usage, +} from "../types.js"; +import type { AssistantMessageEventStream } from "../utils/event-stream.js"; +import { parseStreamingJson } from "../utils/json-parse.js"; +import { sanitizeSurrogates } from "../utils/sanitize-unicode.js"; +import { transformMessages } from "./transform-messages.js"; + +/** Fast deterministic hash to shorten long strings */ +function shortHash(str: string): string { + let h1 = 0xdeadbeef; + let h2 = 0x41c6ce57; + for (let i = 0; i < str.length; i++) { + const ch = str.charCodeAt(i); + h1 = Math.imul(h1 ^ ch, 2654435761); + h2 = Math.imul(h2 ^ ch, 1597334677); + } + h1 = Math.imul(h1 ^ (h1 >>> 16), 2246822507) ^ Math.imul(h2 ^ (h2 >>> 13), 3266489909); + h2 = Math.imul(h2 ^ (h2 >>> 16), 2246822507) ^ Math.imul(h1 ^ (h1 >>> 13), 3266489909); + return (h2 >>> 0).toString(36) + (h1 >>> 0).toString(36); +} + +export interface OpenAIResponsesStreamOptions { + serviceTier?: ResponseCreateParamsStreaming["service_tier"]; + applyServiceTierPricing?: ( + usage: Usage, + serviceTier: ResponseCreateParamsStreaming["service_tier"] | undefined, + ) => void; +} + +export function convertResponsesMessages( + model: Model, + context: Context, + allowedToolCallProviders: ReadonlySet, +): ResponseInput { + const messages: ResponseInput = []; + + const normalizeToolCallId = (id: string): string => { + if (!allowedToolCallProviders.has(model.provider)) return id; + if (!id.includes("|")) return id; + const [callId, itemId] = id.split("|"); + const sanitizedCallId = callId.replace(/[^a-zA-Z0-9_-]/g, "_"); + let sanitizedItemId = itemId.replace(/[^a-zA-Z0-9_-]/g, "_"); + // OpenAI Responses API requires item id to start with "fc" + if (!sanitizedItemId.startsWith("fc")) { + sanitizedItemId = `fc_${sanitizedItemId}`; + } + const normalizedCallId = sanitizedCallId.length > 64 ? sanitizedCallId.slice(0, 64) : sanitizedCallId; + const normalizedItemId = sanitizedItemId.length > 64 ? sanitizedItemId.slice(0, 64) : sanitizedItemId; + return `${normalizedCallId}|${normalizedItemId}`; + }; + + const transformedMessages = transformMessages(context.messages, model, normalizeToolCallId); + + if (context.systemPrompt) { + const role = model.reasoning ? "developer" : "system"; + messages.push({ + role, + content: sanitizeSurrogates(context.systemPrompt), + }); + } + + let msgIndex = 0; + for (const msg of transformedMessages) { + if (msg.role === "user") { + if (typeof msg.content === "string") { + messages.push({ + role: "user", + content: [{ type: "input_text", text: sanitizeSurrogates(msg.content) }], + }); + } else { + const content: ResponseInputContent[] = msg.content.map((item): ResponseInputContent => { + if (item.type === "text") { + return { + type: "input_text", + text: sanitizeSurrogates(item.text), + } satisfies ResponseInputText; + } + return { + type: "input_image", + detail: "auto", + image_url: `data:${item.mimeType};base64,${item.data}`, + } satisfies ResponseInputImage; + }); + const filteredContent = !model.input.includes("image") + ? content.filter((c) => c.type !== "input_image") + : content; + if (filteredContent.length === 0) continue; + messages.push({ + role: "user", + content: filteredContent, + }); + } + } else if (msg.role === "assistant") { + const output: ResponseInput = []; + + for (const block of msg.content) { + if (block.type === "thinking") { + if (block.thinkingSignature) { + const reasoningItem = JSON.parse(block.thinkingSignature) as ResponseReasoningItem; + output.push(reasoningItem); + } + } else if (block.type === "text") { + const textBlock = block as TextContent; + // OpenAI requires id to be max 64 characters + let msgId = textBlock.textSignature; + if (!msgId) { + msgId = `msg_${msgIndex}`; + } else if (msgId.length > 64) { + msgId = `msg_${shortHash(msgId)}`; + } + output.push({ + type: "message", + role: "assistant", + content: [{ type: "output_text", text: sanitizeSurrogates(textBlock.text), annotations: [] }], + status: "completed", + id: msgId, + } satisfies ResponseOutputMessage); + } else if (block.type === "toolCall") { + const toolCall = block as ToolCall; + const [callId, itemId] = toolCall.id.split("|"); + output.push({ + type: "function_call", + id: itemId, + call_id: callId, + name: toolCall.name, + arguments: JSON.stringify(toolCall.arguments), + }); + } + } + if (output.length === 0) continue; + messages.push(...output); + } else if (msg.role === "toolResult") { + // Extract text and image content + const textResult = msg.content + .filter((c): c is TextContent => c.type === "text") + .map((c) => c.text) + .join("\n"); + const hasImages = msg.content.some((c): c is ImageContent => c.type === "image"); + + // Always send function_call_output with text (or placeholder if only images) + const hasText = textResult.length > 0; + const [callId] = msg.toolCallId.split("|"); + messages.push({ + type: "function_call_output", + call_id: callId, + output: sanitizeSurrogates(hasText ? textResult : "(see attached image)"), + }); + + // If there are images and model supports them, send a follow-up user message with images + if (hasImages && model.input.includes("image")) { + const contentParts: ResponseInputContent[] = []; + + // Add text prefix + contentParts.push({ + type: "input_text", + text: "Attached image(s) from tool result:", + } satisfies ResponseInputText); + + // Add images + for (const block of msg.content) { + if (block.type === "image") { + contentParts.push({ + type: "input_image", + detail: "auto", + image_url: `data:${block.mimeType};base64,${block.data}`, + } satisfies ResponseInputImage); + } + } + + messages.push({ + role: "user", + content: contentParts, + }); + } + } + msgIndex++; + } + + return messages; +} + +export function convertResponsesTools(tools: Tool[]): OpenAITool[] { + return tools.map((tool) => ({ + type: "function", + name: tool.name, + description: tool.description, + parameters: tool.parameters as any, // TypeBox already generates JSON Schema + strict: false, + })); +} + +export async function processResponsesStream( + openaiStream: AsyncIterable, + output: AssistantMessage, + stream: AssistantMessageEventStream, + model: Model, + options?: OpenAIResponsesStreamOptions, +): Promise { + let currentItem: ResponseReasoningItem | ResponseOutputMessage | ResponseFunctionToolCall | null = null; + let currentBlock: ThinkingContent | TextContent | (ToolCall & { partialJson: string }) | null = null; + const blocks = output.content; + const blockIndex = () => blocks.length - 1; + + for await (const event of openaiStream) { + if (event.type === "response.output_item.added") { + const item = event.item; + if (item.type === "reasoning") { + currentItem = item; + currentBlock = { type: "thinking", thinking: "" }; + output.content.push(currentBlock); + stream.push({ type: "thinking_start", contentIndex: blockIndex(), partial: output }); + } else if (item.type === "message") { + currentItem = item; + currentBlock = { type: "text", text: "" }; + output.content.push(currentBlock); + stream.push({ type: "text_start", contentIndex: blockIndex(), partial: output }); + } else if (item.type === "function_call") { + currentItem = item; + currentBlock = { + type: "toolCall", + id: `${item.call_id}|${item.id}`, + name: item.name, + arguments: {}, + partialJson: item.arguments || "", + }; + output.content.push(currentBlock); + stream.push({ type: "toolcall_start", contentIndex: blockIndex(), partial: output }); + } + } else if (event.type === "response.reasoning_summary_part.added") { + if (currentItem && currentItem.type === "reasoning") { + currentItem.summary = currentItem.summary || []; + currentItem.summary.push(event.part); + } + } else if (event.type === "response.reasoning_summary_text.delta") { + if (currentItem?.type === "reasoning" && currentBlock?.type === "thinking") { + currentItem.summary = currentItem.summary || []; + const lastPart = currentItem.summary[currentItem.summary.length - 1]; + if (lastPart) { + currentBlock.thinking += event.delta; + lastPart.text += event.delta; + stream.push({ + type: "thinking_delta", + contentIndex: blockIndex(), + delta: event.delta, + partial: output, + }); + } + } + } else if (event.type === "response.reasoning_summary_part.done") { + if (currentItem?.type === "reasoning" && currentBlock?.type === "thinking") { + currentItem.summary = currentItem.summary || []; + const lastPart = currentItem.summary[currentItem.summary.length - 1]; + if (lastPart) { + currentBlock.thinking += "\n\n"; + lastPart.text += "\n\n"; + stream.push({ + type: "thinking_delta", + contentIndex: blockIndex(), + delta: "\n\n", + partial: output, + }); + } + } + } else if (event.type === "response.content_part.added") { + if (currentItem?.type === "message") { + currentItem.content = currentItem.content || []; + // Filter out ReasoningText, only accept output_text and refusal + if (event.part.type === "output_text" || event.part.type === "refusal") { + currentItem.content.push(event.part); + } + } + } else if (event.type === "response.output_text.delta") { + if (currentItem?.type === "message" && currentBlock?.type === "text") { + if (!currentItem.content || currentItem.content.length === 0) { + continue; + } + const lastPart = currentItem.content[currentItem.content.length - 1]; + if (lastPart?.type === "output_text") { + currentBlock.text += event.delta; + lastPart.text += event.delta; + stream.push({ + type: "text_delta", + contentIndex: blockIndex(), + delta: event.delta, + partial: output, + }); + } + } + } else if (event.type === "response.refusal.delta") { + if (currentItem?.type === "message" && currentBlock?.type === "text") { + if (!currentItem.content || currentItem.content.length === 0) { + continue; + } + const lastPart = currentItem.content[currentItem.content.length - 1]; + if (lastPart?.type === "refusal") { + currentBlock.text += event.delta; + lastPart.refusal += event.delta; + stream.push({ + type: "text_delta", + contentIndex: blockIndex(), + delta: event.delta, + partial: output, + }); + } + } + } else if (event.type === "response.function_call_arguments.delta") { + if (currentItem?.type === "function_call" && currentBlock?.type === "toolCall") { + currentBlock.partialJson += event.delta; + currentBlock.arguments = parseStreamingJson(currentBlock.partialJson); + stream.push({ + type: "toolcall_delta", + contentIndex: blockIndex(), + delta: event.delta, + partial: output, + }); + } + } else if (event.type === "response.output_item.done") { + const item = event.item; + + if (item.type === "reasoning" && currentBlock?.type === "thinking") { + currentBlock.thinking = item.summary?.map((s) => s.text).join("\n\n") || ""; + currentBlock.thinkingSignature = JSON.stringify(item); + stream.push({ + type: "thinking_end", + contentIndex: blockIndex(), + content: currentBlock.thinking, + partial: output, + }); + currentBlock = null; + } else if (item.type === "message" && currentBlock?.type === "text") { + currentBlock.text = item.content.map((c) => (c.type === "output_text" ? c.text : c.refusal)).join(""); + currentBlock.textSignature = item.id; + stream.push({ + type: "text_end", + contentIndex: blockIndex(), + content: currentBlock.text, + partial: output, + }); + currentBlock = null; + } else if (item.type === "function_call") { + const toolCall: ToolCall = { + type: "toolCall", + id: `${item.call_id}|${item.id}`, + name: item.name, + arguments: JSON.parse(item.arguments), + }; + + stream.push({ type: "toolcall_end", contentIndex: blockIndex(), toolCall, partial: output }); + } + } else if (event.type === "response.completed") { + const response = event.response; + if (response?.usage) { + const cachedTokens = response.usage.input_tokens_details?.cached_tokens || 0; + output.usage = { + // OpenAI includes cached tokens in input_tokens, so subtract to get non-cached input + input: (response.usage.input_tokens || 0) - cachedTokens, + output: response.usage.output_tokens || 0, + cacheRead: cachedTokens, + cacheWrite: 0, + totalTokens: response.usage.total_tokens || 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }; + } + calculateCost(model, output.usage); + if (options?.applyServiceTierPricing) { + const serviceTier = response?.service_tier ?? options.serviceTier; + options.applyServiceTierPricing(output.usage, serviceTier); + } + // Map status to stop reason + output.stopReason = mapStopReason(response?.status); + if (output.content.some((b) => b.type === "toolCall") && output.stopReason === "stop") { + output.stopReason = "toolUse"; + } + } else if (event.type === "error") { + throw new Error(`Error Code ${event.code}: ${event.message}` || "Unknown error"); + } else if (event.type === "response.failed") { + throw new Error("Unknown error"); + } + } +} + +function mapStopReason(status: OpenAI.Responses.ResponseStatus | undefined): StopReason { + if (!status) return "stop"; + switch (status) { + case "completed": + return "stop"; + case "incomplete": + return "length"; + case "failed": + case "cancelled": + return "error"; + // These two are wonky ... + case "in_progress": + case "queued": + return "stop"; + default: { + const _exhaustive: never = status; + throw new Error(`Unhandled stop reason: ${_exhaustive}`); + } + } +} diff --git a/packages/ai/src/providers/openai-responses.ts b/packages/ai/src/providers/openai-responses.ts index 7deeecbe..eb3819e5 100644 --- a/packages/ai/src/providers/openai-responses.ts +++ b/packages/ai/src/providers/openai-responses.ts @@ -1,49 +1,11 @@ import OpenAI from "openai"; -import type { - Tool as OpenAITool, - ResponseCreateParamsStreaming, - ResponseFunctionToolCall, - ResponseInput, - ResponseInputContent, - ResponseInputImage, - ResponseInputText, - ResponseOutputMessage, - ResponseReasoningItem, -} from "openai/resources/responses/responses.js"; -import { calculateCost } from "../models.js"; +import type { ResponseCreateParamsStreaming } from "openai/resources/responses/responses.js"; import { getEnvApiKey } from "../stream.js"; -import type { - Api, - AssistantMessage, - Context, - Model, - StopReason, - StreamFunction, - StreamOptions, - TextContent, - ThinkingContent, - Tool, - ToolCall, - Usage, -} from "../types.js"; +import type { Api, AssistantMessage, Context, Model, StreamFunction, StreamOptions, Usage } from "../types.js"; import { AssistantMessageEventStream } from "../utils/event-stream.js"; -import { parseStreamingJson } from "../utils/json-parse.js"; -import { sanitizeSurrogates } from "../utils/sanitize-unicode.js"; -import { transformMessages } from "./transform-messages.js"; +import { convertResponsesMessages, convertResponsesTools, processResponsesStream } from "./openai-responses-shared.js"; -/** Fast deterministic hash to shorten long strings */ -function shortHash(str: string): string { - let h1 = 0xdeadbeef; - let h2 = 0x41c6ce57; - for (let i = 0; i < str.length; i++) { - const ch = str.charCodeAt(i); - h1 = Math.imul(h1 ^ ch, 2654435761); - h2 = Math.imul(h2 ^ ch, 1597334677); - } - h1 = Math.imul(h1 ^ (h1 >>> 16), 2246822507) ^ Math.imul(h2 ^ (h2 >>> 13), 3266489909); - h2 = Math.imul(h2 ^ (h2 >>> 16), 2246822507) ^ Math.imul(h1 ^ (h1 >>> 13), 3266489909); - return (h2 >>> 0).toString(36) + (h1 >>> 0).toString(36); -} +const OPENAI_TOOL_CALL_PROVIDERS = new Set(["openai", "openai-codex", "opencode"]); // OpenAI Responses-specific options export interface OpenAIResponsesOptions extends StreamOptions { @@ -94,219 +56,10 @@ export const streamOpenAIResponses: StreamFunction<"openai-responses"> = ( ); stream.push({ type: "start", partial: output }); - let currentItem: ResponseReasoningItem | ResponseOutputMessage | ResponseFunctionToolCall | null = null; - let currentBlock: ThinkingContent | TextContent | (ToolCall & { partialJson: string }) | null = null; - const blocks = output.content; - const blockIndex = () => blocks.length - 1; - - for await (const event of openaiStream) { - // Handle output item start - if (event.type === "response.output_item.added") { - const item = event.item; - if (item.type === "reasoning") { - currentItem = item; - currentBlock = { type: "thinking", thinking: "" }; - output.content.push(currentBlock); - stream.push({ type: "thinking_start", contentIndex: blockIndex(), partial: output }); - } else if (item.type === "message") { - currentItem = item; - currentBlock = { type: "text", text: "" }; - output.content.push(currentBlock); - stream.push({ type: "text_start", contentIndex: blockIndex(), partial: output }); - } else if (item.type === "function_call") { - currentItem = item; - currentBlock = { - type: "toolCall", - id: `${item.call_id}|${item.id}`, - name: item.name, - arguments: {}, - partialJson: item.arguments || "", - }; - output.content.push(currentBlock); - stream.push({ type: "toolcall_start", contentIndex: blockIndex(), partial: output }); - } - } - // Handle reasoning summary deltas - else if (event.type === "response.reasoning_summary_part.added") { - if (currentItem && currentItem.type === "reasoning") { - currentItem.summary = currentItem.summary || []; - currentItem.summary.push(event.part); - } - } else if (event.type === "response.reasoning_summary_text.delta") { - if ( - currentItem && - currentItem.type === "reasoning" && - currentBlock && - currentBlock.type === "thinking" - ) { - currentItem.summary = currentItem.summary || []; - const lastPart = currentItem.summary[currentItem.summary.length - 1]; - if (lastPart) { - currentBlock.thinking += event.delta; - lastPart.text += event.delta; - stream.push({ - type: "thinking_delta", - contentIndex: blockIndex(), - delta: event.delta, - partial: output, - }); - } - } - } - // Add a new line between summary parts (hack...) - else if (event.type === "response.reasoning_summary_part.done") { - if ( - currentItem && - currentItem.type === "reasoning" && - currentBlock && - currentBlock.type === "thinking" - ) { - currentItem.summary = currentItem.summary || []; - const lastPart = currentItem.summary[currentItem.summary.length - 1]; - if (lastPart) { - currentBlock.thinking += "\n\n"; - lastPart.text += "\n\n"; - stream.push({ - type: "thinking_delta", - contentIndex: blockIndex(), - delta: "\n\n", - partial: output, - }); - } - } - } - // Handle text output deltas - else if (event.type === "response.content_part.added") { - if (currentItem && currentItem.type === "message") { - currentItem.content = currentItem.content || []; - // Filter out ReasoningText, only accept output_text and refusal - if (event.part.type === "output_text" || event.part.type === "refusal") { - currentItem.content.push(event.part); - } - } - } else if (event.type === "response.output_text.delta") { - if (currentItem && currentItem.type === "message" && currentBlock && currentBlock.type === "text") { - const lastPart = currentItem.content[currentItem.content.length - 1]; - if (lastPart && lastPart.type === "output_text") { - currentBlock.text += event.delta; - lastPart.text += event.delta; - stream.push({ - type: "text_delta", - contentIndex: blockIndex(), - delta: event.delta, - partial: output, - }); - } - } - } else if (event.type === "response.refusal.delta") { - if (currentItem && currentItem.type === "message" && currentBlock && currentBlock.type === "text") { - const lastPart = currentItem.content[currentItem.content.length - 1]; - if (lastPart && lastPart.type === "refusal") { - currentBlock.text += event.delta; - lastPart.refusal += event.delta; - stream.push({ - type: "text_delta", - contentIndex: blockIndex(), - delta: event.delta, - partial: output, - }); - } - } - } - // Handle function call argument deltas - else if (event.type === "response.function_call_arguments.delta") { - if ( - currentItem && - currentItem.type === "function_call" && - currentBlock && - currentBlock.type === "toolCall" - ) { - currentBlock.partialJson += event.delta; - currentBlock.arguments = parseStreamingJson(currentBlock.partialJson); - stream.push({ - type: "toolcall_delta", - contentIndex: blockIndex(), - delta: event.delta, - partial: output, - }); - } - } - // Handle function call arguments done (some providers send this instead of deltas) - else if (event.type === "response.function_call_arguments.done") { - if (currentItem?.type === "function_call" && currentBlock?.type === "toolCall") { - currentBlock.partialJson = event.arguments; - currentBlock.arguments = parseStreamingJson(currentBlock.partialJson); - } - } - // Handle output item completion - else if (event.type === "response.output_item.done") { - const item = event.item; - - if (item.type === "reasoning" && currentBlock && currentBlock.type === "thinking") { - currentBlock.thinking = item.summary?.map((s) => s.text).join("\n\n") || ""; - currentBlock.thinkingSignature = JSON.stringify(item); - stream.push({ - type: "thinking_end", - contentIndex: blockIndex(), - content: currentBlock.thinking, - partial: output, - }); - currentBlock = null; - } else if (item.type === "message" && currentBlock && currentBlock.type === "text") { - currentBlock.text = item.content.map((c) => (c.type === "output_text" ? c.text : c.refusal)).join(""); - currentBlock.textSignature = item.id; - stream.push({ - type: "text_end", - contentIndex: blockIndex(), - content: currentBlock.text, - partial: output, - }); - currentBlock = null; - } else if (item.type === "function_call") { - const args = - currentBlock?.type === "toolCall" && currentBlock.partialJson - ? JSON.parse(currentBlock.partialJson) - : JSON.parse(item.arguments); - const toolCall: ToolCall = { - type: "toolCall", - id: `${item.call_id}|${item.id}`, - name: item.name, - arguments: args, - }; - currentBlock = null; - stream.push({ type: "toolcall_end", contentIndex: blockIndex(), toolCall, partial: output }); - } - } - // Handle completion - else if (event.type === "response.completed") { - const response = event.response; - if (response?.usage) { - const cachedTokens = response.usage.input_tokens_details?.cached_tokens || 0; - output.usage = { - // OpenAI includes cached tokens in input_tokens, so subtract to get non-cached input - input: (response.usage.input_tokens || 0) - cachedTokens, - output: response.usage.output_tokens || 0, - cacheRead: cachedTokens, - cacheWrite: 0, - totalTokens: response.usage.total_tokens || 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }; - } - calculateCost(model, output.usage); - applyServiceTierPricing(output.usage, response?.service_tier ?? options?.serviceTier); - // Map status to stop reason - output.stopReason = mapStopReason(response?.status); - if (output.content.some((b) => b.type === "toolCall") && output.stopReason === "stop") { - output.stopReason = "toolUse"; - } - } - // Handle errors - else if (event.type === "error") { - throw new Error(`Error Code ${event.code}: ${event.message}` || "Unknown error"); - } else if (event.type === "response.failed") { - throw new Error("Unknown error"); - } - } + await processResponsesStream(openaiStream, output, stream, model, { + serviceTier: options?.serviceTier, + applyServiceTierPricing, + }); if (options?.signal?.aborted) { throw new Error("Request was aborted"); @@ -319,7 +72,7 @@ export const streamOpenAIResponses: StreamFunction<"openai-responses"> = ( stream.push({ type: "done", reason: output.stopReason, message: output }); stream.end(); } catch (error) { - for (const block of output.content) delete (block as any).index; + for (const block of output.content) delete (block as { index?: number }).index; output.stopReason = options?.signal?.aborted ? "aborted" : "error"; output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error); stream.push({ type: "error", reason: output.stopReason, error: output }); @@ -385,7 +138,7 @@ function createClient( } function buildParams(model: Model<"openai-responses">, context: Context, options?: OpenAIResponsesOptions) { - const messages = convertMessages(model, context); + const messages = convertResponsesMessages(model, context, OPENAI_TOOL_CALL_PROVIDERS); const params: ResponseCreateParamsStreaming = { model: model.id, @@ -407,7 +160,7 @@ function buildParams(model: Model<"openai-responses">, context: Context, options } if (context.tools) { - params.tools = convertTools(context.tools); + params.tools = convertResponsesTools(context.tools); } if (model.reasoning) { @@ -436,183 +189,6 @@ function buildParams(model: Model<"openai-responses">, context: Context, options return params; } -function convertMessages(model: Model<"openai-responses">, context: Context): ResponseInput { - const messages: ResponseInput = []; - - const normalizeToolCallId = (id: string): string => { - const allowedProviders = new Set(["openai", "openai-codex", "opencode"]); - if (!allowedProviders.has(model.provider)) return id; - if (!id.includes("|")) return id; - const [callId, itemId] = id.split("|"); - const sanitizedCallId = callId.replace(/[^a-zA-Z0-9_-]/g, "_"); - let sanitizedItemId = itemId.replace(/[^a-zA-Z0-9_-]/g, "_"); - // OpenAI Responses API requires item id to start with "fc" - if (!sanitizedItemId.startsWith("fc")) { - sanitizedItemId = `fc_${sanitizedItemId}`; - } - const normalizedCallId = sanitizedCallId.length > 64 ? sanitizedCallId.slice(0, 64) : sanitizedCallId; - const normalizedItemId = sanitizedItemId.length > 64 ? sanitizedItemId.slice(0, 64) : sanitizedItemId; - return `${normalizedCallId}|${normalizedItemId}`; - }; - - const transformedMessages = transformMessages(context.messages, model, normalizeToolCallId); - - if (context.systemPrompt) { - const role = model.reasoning ? "developer" : "system"; - messages.push({ - role, - content: sanitizeSurrogates(context.systemPrompt), - }); - } - - let msgIndex = 0; - for (const msg of transformedMessages) { - if (msg.role === "user") { - if (typeof msg.content === "string") { - messages.push({ - role: "user", - content: [{ type: "input_text", text: sanitizeSurrogates(msg.content) }], - }); - } else { - const content: ResponseInputContent[] = msg.content.map((item): ResponseInputContent => { - if (item.type === "text") { - return { - type: "input_text", - text: sanitizeSurrogates(item.text), - } satisfies ResponseInputText; - } else { - return { - type: "input_image", - detail: "auto", - image_url: `data:${item.mimeType};base64,${item.data}`, - } satisfies ResponseInputImage; - } - }); - const filteredContent = !model.input.includes("image") - ? content.filter((c) => c.type !== "input_image") - : content; - if (filteredContent.length === 0) continue; - messages.push({ - role: "user", - content: filteredContent, - }); - } - } else if (msg.role === "assistant") { - const output: ResponseInput = []; - const assistantMsg = msg as AssistantMessage; - - // Check if this message is from a different model (same provider, different model ID). - // For such messages, tool call IDs with fc_ prefix need to be stripped to avoid - // OpenAI's reasoning/function_call pairing validation errors. - const isDifferentModel = - assistantMsg.model !== model.id && - assistantMsg.provider === model.provider && - assistantMsg.api === model.api; - - for (const block of msg.content) { - if (block.type === "thinking") { - if (block.thinkingSignature) { - const reasoningItem = JSON.parse(block.thinkingSignature); - output.push(reasoningItem); - } - } else if (block.type === "text") { - const textBlock = block as TextContent; - // OpenAI requires id to be max 64 characters - let msgId = textBlock.textSignature; - if (!msgId) { - msgId = `msg_${msgIndex}`; - } else if (msgId.length > 64) { - msgId = `msg_${shortHash(msgId)}`; - } - output.push({ - type: "message", - role: "assistant", - content: [{ type: "output_text", text: sanitizeSurrogates(textBlock.text), annotations: [] }], - status: "completed", - id: msgId, - } satisfies ResponseOutputMessage); - } else if (block.type === "toolCall") { - const toolCall = block as ToolCall; - const callId = toolCall.id.split("|")[0]; - let itemId: string | undefined = toolCall.id.split("|")[1]; - - // For different-model messages, set id to undefined to avoid pairing validation. - // OpenAI tracks which fc_xxx IDs were paired with rs_xxx reasoning items. - // By omitting the id, we avoid triggering that validation (like cross-provider does). - if (isDifferentModel && itemId?.startsWith("fc_")) { - itemId = undefined; - } - - output.push({ - type: "function_call", - id: itemId, - call_id: callId, - name: toolCall.name, - arguments: JSON.stringify(toolCall.arguments), - }); - } - } - if (output.length === 0) continue; - messages.push(...output); - } else if (msg.role === "toolResult") { - // Extract text and image content - const textResult = msg.content - .filter((c) => c.type === "text") - .map((c) => (c as any).text) - .join("\n"); - const hasImages = msg.content.some((c) => c.type === "image"); - - // Always send function_call_output with text (or placeholder if only images) - const hasText = textResult.length > 0; - messages.push({ - type: "function_call_output", - call_id: msg.toolCallId.split("|")[0], - output: sanitizeSurrogates(hasText ? textResult : "(see attached image)"), - }); - - // If there are images and model supports them, send a follow-up user message with images - if (hasImages && model.input.includes("image")) { - const contentParts: ResponseInputContent[] = []; - - // Add text prefix - contentParts.push({ - type: "input_text", - text: "Attached image(s) from tool result:", - } satisfies ResponseInputText); - - // Add images - for (const block of msg.content) { - if (block.type === "image") { - contentParts.push({ - type: "input_image", - detail: "auto", - image_url: `data:${(block as any).mimeType};base64,${(block as any).data}`, - } satisfies ResponseInputImage); - } - } - - messages.push({ - role: "user", - content: contentParts, - }); - } - } - msgIndex++; - } - - return messages; -} - -function convertTools(tools: Tool[]): OpenAITool[] { - return tools.map((tool) => ({ - type: "function", - name: tool.name, - description: tool.description, - parameters: tool.parameters as any, // TypeBox already generates JSON Schema - strict: false, - })); -} - function getServiceTierCostMultiplier(serviceTier: ResponseCreateParamsStreaming["service_tier"] | undefined): number { switch (serviceTier) { case "flex": @@ -634,24 +210,3 @@ function applyServiceTierPricing(usage: Usage, serviceTier: ResponseCreateParams usage.cost.cacheWrite *= multiplier; usage.cost.total = usage.cost.input + usage.cost.output + usage.cost.cacheRead + usage.cost.cacheWrite; } - -function mapStopReason(status: OpenAI.Responses.ResponseStatus | undefined): StopReason { - if (!status) return "stop"; - switch (status) { - case "completed": - return "stop"; - case "incomplete": - return "length"; - case "failed": - case "cancelled": - return "error"; - // These two are wonky ... - case "in_progress": - case "queued": - return "stop"; - default: { - const _exhaustive: never = status; - throw new Error(`Unhandled stop reason: ${_exhaustive}`); - } - } -}