diff --git a/packages/ai/README.md b/packages/ai/README.md index 121493d0..42809c02 100644 --- a/packages/ai/README.md +++ b/packages/ai/README.md @@ -267,8 +267,8 @@ All streaming events emitted during assistant message generation: | `toolcall_start` | Tool call begins | `contentIndex`: Position in content array | | `toolcall_delta` | Tool arguments streaming | `delta`: JSON chunk, `partial.content[contentIndex].arguments`: Partial parsed args | | `toolcall_end` | Tool call complete | `toolCall`: Complete validated tool call with `id`, `name`, `arguments` | -| `done` | Stream complete | `reason`: Stop reason, `message`: Final assistant message | -| `error` | Error occurred | `error`: Error message, `partial`: Partial message before error | +| `done` | Stream complete | `reason`: Stop reason ("stop", "length", "toolUse"), `message`: Final assistant message | +| `error` | Error occurred | `reason`: Error type ("error" or "aborted"), `error`: AssistantMessage with partial content | ## Image Input @@ -399,16 +399,43 @@ for await (const event of s) { } ``` -## Errors & Abort Signal +## Stop Reasons -When a request ends with an error (including aborts and tool call validation errors), the API returns an `AssistantMessage` with: -- `stopReason: 'error'` - Indicates the request ended with an error -- `error: string` - Error message describing what happened -- `content: array` - **Partial content** accumulated before the error -- `usage: Usage` - **Token counts and costs** (may be incomplete depending on when error occurred) +Every `AssistantMessage` includes a `stopReason` field that indicates how the generation ended: -### Aborting -The abort signal allows you to cancel in-progress requests. Aborted requests return an `AssistantMessage` with `stopReason === 'error'`. +- `"stop"` - Normal completion, the model finished its response +- `"length"` - Output hit the maximum token limit +- `"toolUse"` - Model is calling tools and expects tool results +- `"error"` - An error occurred during generation +- `"aborted"` - Request was cancelled via abort signal + +## Error Handling + +When a request ends with an error (including aborts and tool call validation errors), the streaming API emits an error event: + +```typescript +// In streaming +for await (const event of stream) { + if (event.type === 'error') { + // event.reason is either "error" or "aborted" + // event.error is the AssistantMessage with partial content + console.error(`Error (${event.reason}):`, event.error.errorMessage); + console.log('Partial content:', event.error.content); + } +} + +// The final message will have the error details +const message = await stream.result(); +if (message.stopReason === 'error' || message.stopReason === 'aborted') { + console.error('Request failed:', message.errorMessage); + // message.content contains any partial content received before the error + // message.usage contains partial token counts and costs +} +``` + +### Aborting Requests + +The abort signal allows you to cancel in-progress requests. Aborted requests have `stopReason === 'aborted'`: ```typescript import { getModel, stream } from '@mariozechner/pi-ai'; @@ -429,14 +456,15 @@ for await (const event of s) { if (event.type === 'text_delta') { process.stdout.write(event.delta); } else if (event.type === 'error') { - console.log('Error:', event.error); + // event.reason tells you if it was "error" or "aborted" + console.log(`${event.reason === 'aborted' ? 'Aborted' : 'Error'}:`, event.error.errorMessage); } } // Get results (may be partial if aborted) const response = await s.result(); -if (response.stopReason === 'error') { - console.log('Error:', response.error); +if (response.stopReason === 'aborted') { + console.log('Request was aborted:', response.errorMessage); console.log('Partial content received:', response.content); console.log('Tokens used:', response.usage); } diff --git a/packages/ai/src/agent/agent.ts b/packages/ai/src/agent/agent-loop.ts similarity index 88% rename from packages/ai/src/agent/agent.ts rename to packages/ai/src/agent/agent-loop.ts index 75d5ec12..a929c0fe 100644 --- a/packages/ai/src/agent/agent.ts +++ b/packages/ai/src/agent/agent-loop.ts @@ -1,11 +1,11 @@ -import { EventStream } from "../event-stream.js"; import { streamSimple } from "../stream.js"; import type { AssistantMessage, Context, Message, ToolResultMessage, UserMessage } from "../types.js"; -import { validateToolArguments } from "../validation.js"; +import { EventStream } from "../utils/event-stream.js"; +import { validateToolArguments } from "../utils/validation.js"; import type { AgentContext, AgentEvent, AgentTool, AgentToolResult, PromptConfig } from "./types.js"; // Main prompt function - returns a stream of events -export function prompt( +export function agentLoop( prompt: UserMessage, context: AgentContext, config: PromptConfig, @@ -46,21 +46,29 @@ export function prompt( firstTurn = false; } // Stream assistant response - const assistantMessage = await streamAssistantResponse(currentContext, config, signal, stream, streamFn); - newMessages.push(assistantMessage); + const message = await streamAssistantResponse(currentContext, config, signal, stream, streamFn); + newMessages.push(message); + + if (message.stopReason === "error" || message.stopReason === "aborted") { + // Stop the loop on error or abort + stream.push({ type: "turn_end", message, toolResults: [] }); + stream.push({ type: "agent_end", messages: newMessages }); + stream.end(newMessages); + return; + } // Check for tool calls - const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall"); + const toolCalls = message.content.filter((c) => c.type === "toolCall"); hasMoreToolCalls = toolCalls.length > 0; const toolResults: ToolResultMessage[] = []; if (hasMoreToolCalls) { // Execute tool calls - toolResults.push(...(await executeToolCalls(currentContext.tools, assistantMessage, signal, stream))); + toolResults.push(...(await executeToolCalls(currentContext.tools, message, signal, stream))); currentContext.messages.push(...toolResults); newMessages.push(...toolResults); } - stream.push({ type: "turn_end", assistantMessage, toolResults: toolResults }); + stream.push({ type: "turn_end", message, toolResults: toolResults }); } stream.push({ type: "agent_end", messages: newMessages }); stream.end(newMessages); diff --git a/packages/ai/src/agent/index.ts b/packages/ai/src/agent/index.ts index 196d66f7..22cb56da 100644 --- a/packages/ai/src/agent/index.ts +++ b/packages/ai/src/agent/index.ts @@ -1,3 +1,3 @@ -export { prompt } from "./agent.js"; +export { agentLoop } from "./agent-loop.js"; export * from "./tools/index.js"; export type { AgentContext, AgentEvent, AgentTool, PromptConfig } from "./types.js"; diff --git a/packages/ai/src/agent/types.ts b/packages/ai/src/agent/types.ts index 68e6b9d9..da8b7c5c 100644 --- a/packages/ai/src/agent/types.ts +++ b/packages/ai/src/agent/types.ts @@ -57,7 +57,7 @@ export type AgentEvent = isError: boolean; } // Emitted when a full turn completes - | { type: "turn_end"; assistantMessage: AssistantMessage; toolResults: ToolResultMessage[] } + | { type: "turn_end"; message: AssistantMessage; toolResults: ToolResultMessage[] } // Emitted when the agent has completed all its turns. All messages from every turn are // contained in messages, which can be appended to the context | { type: "agent_end"; messages: AgentContext["messages"] }; diff --git a/packages/ai/src/index.ts b/packages/ai/src/index.ts index 212ec1e4..1689d0ef 100644 --- a/packages/ai/src/index.ts +++ b/packages/ai/src/index.ts @@ -5,5 +5,5 @@ export * from "./providers/google.js"; export * from "./providers/openai-completions.js"; export * from "./providers/openai-responses.js"; export * from "./stream.js"; -export * from "./typebox-helpers.js"; export * from "./types.js"; +export * from "./utils/typebox-helpers.js"; diff --git a/packages/ai/src/models.generated.ts b/packages/ai/src/models.generated.ts index 407aa07a..48f60989 100644 --- a/packages/ai/src/models.generated.ts +++ b/packages/ai/src/models.generated.ts @@ -2994,23 +2994,6 @@ export const MODELS = { contextWindow: 32768, maxTokens: 4096, } satisfies Model<"openai-completions">, - "cohere/command-r-plus-08-2024": { - id: "cohere/command-r-plus-08-2024", - name: "Cohere: Command R+ (08-2024)", - api: "openai-completions", - provider: "openrouter", - baseUrl: "https://openrouter.ai/api/v1", - reasoning: false, - input: ["text"], - cost: { - input: 2.5, - output: 10, - cacheRead: 0, - cacheWrite: 0, - }, - contextWindow: 128000, - maxTokens: 4000, - } satisfies Model<"openai-completions">, "cohere/command-r-08-2024": { id: "cohere/command-r-08-2024", name: "Cohere: Command R (08-2024)", @@ -3028,6 +3011,23 @@ export const MODELS = { contextWindow: 128000, maxTokens: 4000, } satisfies Model<"openai-completions">, + "cohere/command-r-plus-08-2024": { + id: "cohere/command-r-plus-08-2024", + name: "Cohere: Command R+ (08-2024)", + api: "openai-completions", + provider: "openrouter", + baseUrl: "https://openrouter.ai/api/v1", + reasoning: false, + input: ["text"], + cost: { + input: 2.5, + output: 10, + cacheRead: 0, + cacheWrite: 0, + }, + contextWindow: 128000, + maxTokens: 4000, + } satisfies Model<"openai-completions">, "microsoft/phi-3.5-mini-128k-instruct": { id: "microsoft/phi-3.5-mini-128k-instruct", name: "Microsoft: Phi-3.5 Mini 128K Instruct", @@ -3130,6 +3130,23 @@ export const MODELS = { contextWindow: 131072, maxTokens: 16384, } satisfies Model<"openai-completions">, + "mistralai/mistral-7b-instruct-v0.3": { + id: "mistralai/mistral-7b-instruct-v0.3", + name: "Mistral: Mistral 7B Instruct v0.3", + api: "openai-completions", + provider: "openrouter", + baseUrl: "https://openrouter.ai/api/v1", + reasoning: false, + input: ["text"], + cost: { + input: 0.028, + output: 0.054, + cacheRead: 0, + cacheWrite: 0, + }, + contextWindow: 32768, + maxTokens: 16384, + } satisfies Model<"openai-completions">, "mistralai/mistral-7b-instruct:free": { id: "mistralai/mistral-7b-instruct:free", name: "Mistral: Mistral 7B Instruct (free)", @@ -3164,23 +3181,6 @@ export const MODELS = { contextWindow: 32768, maxTokens: 16384, } satisfies Model<"openai-completions">, - "mistralai/mistral-7b-instruct-v0.3": { - id: "mistralai/mistral-7b-instruct-v0.3", - name: "Mistral: Mistral 7B Instruct v0.3", - api: "openai-completions", - provider: "openrouter", - baseUrl: "https://openrouter.ai/api/v1", - reasoning: false, - input: ["text"], - cost: { - input: 0.028, - output: 0.054, - cacheRead: 0, - cacheWrite: 0, - }, - contextWindow: 32768, - maxTokens: 16384, - } satisfies Model<"openai-completions">, "microsoft/phi-3-mini-128k-instruct": { id: "microsoft/phi-3-mini-128k-instruct", name: "Microsoft: Phi-3 Mini 128K Instruct", @@ -3351,23 +3351,6 @@ export const MODELS = { contextWindow: 128000, maxTokens: 4096, } satisfies Model<"openai-completions">, - "mistralai/mistral-small": { - id: "mistralai/mistral-small", - name: "Mistral Small", - api: "openai-completions", - provider: "openrouter", - baseUrl: "https://openrouter.ai/api/v1", - reasoning: false, - input: ["text"], - cost: { - input: 0.19999999999999998, - output: 0.6, - cacheRead: 0, - cacheWrite: 0, - }, - contextWindow: 32768, - maxTokens: 4096, - } satisfies Model<"openai-completions">, "mistralai/mistral-tiny": { id: "mistralai/mistral-tiny", name: "Mistral Tiny", @@ -3385,6 +3368,23 @@ export const MODELS = { contextWindow: 32768, maxTokens: 4096, } satisfies Model<"openai-completions">, + "mistralai/mistral-small": { + id: "mistralai/mistral-small", + name: "Mistral Small", + api: "openai-completions", + provider: "openrouter", + baseUrl: "https://openrouter.ai/api/v1", + reasoning: false, + input: ["text"], + cost: { + input: 0.19999999999999998, + output: 0.6, + cacheRead: 0, + cacheWrite: 0, + }, + contextWindow: 32768, + maxTokens: 4096, + } satisfies Model<"openai-completions">, "mistralai/mixtral-8x7b-instruct": { id: "mistralai/mixtral-8x7b-instruct", name: "Mistral: Mixtral 8x7B Instruct", diff --git a/packages/ai/src/providers/anthropic.ts b/packages/ai/src/providers/anthropic.ts index 57cea2ac..ba808876 100644 --- a/packages/ai/src/providers/anthropic.ts +++ b/packages/ai/src/providers/anthropic.ts @@ -4,8 +4,6 @@ import type { MessageCreateParamsStreaming, MessageParam, } from "@anthropic-ai/sdk/resources/messages.js"; -import { AssistantMessageEventStream } from "../event-stream.js"; -import { parseStreamingJson } from "../json-parse.js"; import { calculateCost } from "../models.js"; import type { Api, @@ -22,7 +20,9 @@ import type { ToolCall, ToolResultMessage, } from "../types.js"; -import { validateToolArguments } from "../validation.js"; +import { AssistantMessageEventStream } from "../utils/event-stream.js"; +import { parseStreamingJson } from "../utils/json-parse.js"; +import { validateToolArguments } from "../utils/validation.js"; import { transformMessages } from "./transorm-messages.js"; export interface AnthropicOptions extends StreamOptions { @@ -196,13 +196,17 @@ export const streamAnthropic: StreamFunction<"anthropic-messages"> = ( throw new Error("Request was aborted"); } + if (output.stopReason === "aborted" || output.stopReason === "error") { + throw new Error("An unkown error ocurred"); + } + stream.push({ type: "done", reason: output.stopReason, message: output }); stream.end(); } catch (error) { for (const block of output.content) delete (block as any).index; - output.stopReason = "error"; - output.error = error instanceof Error ? error.message : JSON.stringify(error); - stream.push({ type: "error", error: output.error, partial: output }); + output.stopReason = options?.signal?.aborted ? "aborted" : "error"; + output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error); + stream.push({ type: "error", reason: output.stopReason, error: output }); stream.end(); } })(); @@ -466,7 +470,7 @@ function mapStopReason(reason: Anthropic.Messages.StopReason): StopReason { case "tool_use": return "toolUse"; case "refusal": - return "safety"; + return "error"; case "pause_turn": // Stop is good enough -> resubmit return "stop"; case "stop_sequence": diff --git a/packages/ai/src/providers/google.ts b/packages/ai/src/providers/google.ts index d8a7c60a..bd231ee1 100644 --- a/packages/ai/src/providers/google.ts +++ b/packages/ai/src/providers/google.ts @@ -7,7 +7,6 @@ import { GoogleGenAI, type Part, } from "@google/genai"; -import { AssistantMessageEventStream } from "../event-stream.js"; import { calculateCost } from "../models.js"; import type { Api, @@ -22,7 +21,8 @@ import type { Tool, ToolCall, } from "../types.js"; -import { validateToolArguments } from "../validation.js"; +import { AssistantMessageEventStream } from "../utils/event-stream.js"; +import { validateToolArguments } from "../utils/validation.js"; import { transformMessages } from "./transorm-messages.js"; export interface GoogleOptions extends StreamOptions { @@ -226,12 +226,21 @@ export const streamGoogle: StreamFunction<"google-generative-ai"> = ( } } + if (options?.signal?.aborted) { + throw new Error("Request was aborted"); + } + + if (output.stopReason === "aborted" || output.stopReason === "error") { + throw new Error("An unkown error ocurred"); + } + stream.push({ type: "done", reason: output.stopReason, message: output }); stream.end(); } catch (error) { - output.stopReason = "error"; - output.error = error instanceof Error ? error.message : JSON.stringify(error); - stream.push({ type: "error", error: output.error, partial: output }); + for (const block of output.content) delete (block as any).index; + output.stopReason = options?.signal?.aborted ? "aborted" : "error"; + output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error); + stream.push({ type: "error", reason: output.stopReason, error: output }); stream.end(); } })(); @@ -424,7 +433,7 @@ function mapStopReason(reason: FinishReason): StopReason { case FinishReason.SAFETY: case FinishReason.IMAGE_SAFETY: case FinishReason.RECITATION: - return "safety"; + return "error"; case FinishReason.FINISH_REASON_UNSPECIFIED: case FinishReason.OTHER: case FinishReason.LANGUAGE: diff --git a/packages/ai/src/providers/openai-completions.ts b/packages/ai/src/providers/openai-completions.ts index 11398b5a..22a2c74d 100644 --- a/packages/ai/src/providers/openai-completions.ts +++ b/packages/ai/src/providers/openai-completions.ts @@ -7,8 +7,6 @@ import type { ChatCompletionContentPartText, ChatCompletionMessageParam, } from "openai/resources/chat/completions.js"; -import { AssistantMessageEventStream } from "../event-stream.js"; -import { parseStreamingJson } from "../json-parse.js"; import { calculateCost } from "../models.js"; import type { AssistantMessage, @@ -22,7 +20,9 @@ import type { Tool, ToolCall, } from "../types.js"; -import { validateToolArguments } from "../validation.js"; +import { AssistantMessageEventStream } from "../utils/event-stream.js"; +import { parseStreamingJson } from "../utils/json-parse.js"; +import { validateToolArguments } from "../utils/validation.js"; import { transformMessages } from "./transorm-messages.js"; export interface OpenAICompletionsOptions extends StreamOptions { @@ -231,13 +231,17 @@ export const streamOpenAICompletions: StreamFunction<"openai-completions"> = ( throw new Error("Request was aborted"); } + if (output.stopReason === "aborted" || output.stopReason === "error") { + throw new Error("An unkown error ocurred"); + } + stream.push({ type: "done", reason: output.stopReason, message: output }); stream.end(); - return output; } catch (error) { - output.stopReason = "error"; - output.error = error instanceof Error ? error.message : String(error); - stream.push({ type: "error", error: output.error, partial: output }); + for (const block of output.content) delete (block as any).index; + output.stopReason = options?.signal?.aborted ? "aborted" : "error"; + output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error); + stream.push({ type: "error", reason: output.stopReason, error: output }); stream.end(); } })(); @@ -413,7 +417,7 @@ function mapStopReason(reason: ChatCompletionChunk.Choice["finish_reason"]): Sto case "tool_calls": return "toolUse"; case "content_filter": - return "safety"; + return "error"; default: { const _exhaustive: never = reason; throw new Error(`Unhandled stop reason: ${_exhaustive}`); diff --git a/packages/ai/src/providers/openai-responses.ts b/packages/ai/src/providers/openai-responses.ts index 8cf05228..9e673cf8 100644 --- a/packages/ai/src/providers/openai-responses.ts +++ b/packages/ai/src/providers/openai-responses.ts @@ -10,8 +10,6 @@ import type { ResponseOutputMessage, ResponseReasoningItem, } from "openai/resources/responses/responses.js"; -import { AssistantMessageEventStream } from "../event-stream.js"; -import { parseStreamingJson } from "../json-parse.js"; import { calculateCost } from "../models.js"; import type { Api, @@ -26,7 +24,9 @@ import type { Tool, ToolCall, } from "../types.js"; -import { validateToolArguments } from "../validation.js"; +import { AssistantMessageEventStream } from "../utils/event-stream.js"; +import { parseStreamingJson } from "../utils/json-parse.js"; +import { validateToolArguments } from "../utils/validation.js"; import { transformMessages } from "./transorm-messages.js"; // OpenAI Responses-specific options @@ -268,17 +268,9 @@ export const streamOpenAIResponses: StreamFunction<"openai-responses"> = ( } // Handle errors else if (event.type === "error") { - output.stopReason = "error"; - output.error = `Code ${event.code}: ${event.message}` || "Unknown error"; - stream.push({ type: "error", error: output.error, partial: output }); - stream.end(); - return output; + throw new Error(`Error Code ${event.code}: ${event.message}` || "Unknown error"); } else if (event.type === "response.failed") { - output.stopReason = "error"; - output.error = "Unknown error"; - stream.push({ type: "error", error: output.error, partial: output }); - stream.end(); - return output; + throw new Error("Unknown error"); } } @@ -286,12 +278,17 @@ export const streamOpenAIResponses: StreamFunction<"openai-responses"> = ( throw new Error("Request was aborted"); } + if (output.stopReason === "aborted" || output.stopReason === "error") { + throw new Error("An unkown error ocurred"); + } + stream.push({ type: "done", reason: output.stopReason, message: output }); stream.end(); } catch (error) { - output.stopReason = "error"; - output.error = error instanceof Error ? error.message : JSON.stringify(error); - stream.push({ type: "error", error: output.error, partial: output }); + for (const block of output.content) delete (block as any).index; + output.stopReason = options?.signal?.aborted ? "aborted" : "error"; + output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error); + stream.push({ type: "error", reason: output.stopReason, error: output }); stream.end(); } })(); diff --git a/packages/ai/src/types.ts b/packages/ai/src/types.ts index 4bd7cbf6..24f55aa8 100644 --- a/packages/ai/src/types.ts +++ b/packages/ai/src/types.ts @@ -1,10 +1,10 @@ -import type { AssistantMessageEventStream } from "./event-stream.js"; import type { AnthropicOptions } from "./providers/anthropic.js"; import type { GoogleOptions } from "./providers/google.js"; import type { OpenAICompletionsOptions } from "./providers/openai-completions.js"; import type { OpenAIResponsesOptions } from "./providers/openai-responses.js"; +import type { AssistantMessageEventStream } from "./utils/event-stream.js"; -export type { AssistantMessageEventStream } from "./event-stream.js"; +export type { AssistantMessageEventStream } from "./utils/event-stream.js"; export type Api = "openai-completions" | "openai-responses" | "anthropic-messages" | "google-generative-ai"; @@ -90,7 +90,7 @@ export interface Usage { }; } -export type StopReason = "stop" | "length" | "toolUse" | "safety" | "error"; +export type StopReason = "stop" | "length" | "toolUse" | "error" | "aborted"; export interface UserMessage { role: "user"; @@ -105,7 +105,7 @@ export interface AssistantMessage { model: string; usage: Usage; stopReason: StopReason; - error?: string; + errorMessage?: string; } export interface ToolResultMessage { @@ -144,8 +144,8 @@ export type AssistantMessageEvent = | { type: "toolcall_start"; contentIndex: number; partial: AssistantMessage } | { type: "toolcall_delta"; contentIndex: number; delta: string; partial: AssistantMessage } | { type: "toolcall_end"; contentIndex: number; toolCall: ToolCall; partial: AssistantMessage } - | { type: "done"; reason: StopReason; message: AssistantMessage } - | { type: "error"; error: string; partial: AssistantMessage }; + | { type: "done"; reason: Extract; message: AssistantMessage } + | { type: "error"; reason: Extract; error: AssistantMessage }; // Model interface for the unified model system export interface Model { diff --git a/packages/ai/src/event-stream.ts b/packages/ai/src/utils/event-stream.ts similarity index 95% rename from packages/ai/src/event-stream.ts rename to packages/ai/src/utils/event-stream.ts index deead693..74947477 100644 --- a/packages/ai/src/event-stream.ts +++ b/packages/ai/src/utils/event-stream.ts @@ -1,4 +1,4 @@ -import type { AssistantMessage, AssistantMessageEvent } from "./types.js"; +import type { AssistantMessage, AssistantMessageEvent } from "../types.js"; // Generic event stream class for async iteration export class EventStream implements AsyncIterable { @@ -73,7 +73,7 @@ export class AssistantMessageEventStream extends EventStream(llm: Model, options: Opti const msg = await response.result(); // If we get here without throwing, the abort didn't work - expect(msg.stopReason).toBe("error"); + expect(msg.stopReason).toBe("aborted"); expect(msg.content.length).toBeGreaterThan(0); context.messages.push(msg); @@ -46,7 +46,7 @@ async function testImmediateAbort(llm: Model, options: O }; const response = await complete(llm, context, { ...options, signal: controller.signal }); - expect(response.stopReason).toBe("error"); + expect(response.stopReason).toBe("aborted"); } describe("AI Providers Abort Tests", () => { diff --git a/packages/ai/test/agent.test.ts b/packages/ai/test/agent.test.ts index fcaa3096..452b00cf 100644 --- a/packages/ai/test/agent.test.ts +++ b/packages/ai/test/agent.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { prompt } from "../src/agent/agent.js"; +import { agentLoop } from "../src/agent/agent-loop.js"; import { calculateTool } from "../src/agent/tools/calculate.js"; import type { AgentContext, AgentEvent, PromptConfig } from "../src/agent/types.js"; import { getModel } from "../src/models.js"; @@ -42,7 +42,7 @@ async function calculateTest(model: Model, options: Opti let finalAnswer: number | undefined; // Execute the prompt - const stream = prompt(userPrompt, context, config); + const stream = agentLoop(userPrompt, context, config); for await (const event of stream) { events.push(event); @@ -55,7 +55,7 @@ async function calculateTest(model: Model, options: Opti case "turn_end": console.log(`=== Turn ${turns} ended with ${event.toolResults.length} tool results ===`); - console.log(event.assistantMessage); + console.log(event.message); break; case "tool_execution_end": @@ -188,7 +188,7 @@ async function abortTest(model: Model, options: OptionsF let finalMessages: Message[] | undefined; // Execute the prompt - const stream = prompt(userPrompt, context, config, abortController.signal); + const stream = agentLoop(userPrompt, context, config, abortController.signal); // Abort after first tool execution const abortPromise = (async () => { @@ -222,7 +222,7 @@ async function abortTest(model: Model, options: OptionsF // Should have executed 1 tool call before abort expect(toolCallCount).toBeGreaterThanOrEqual(1); - expect(assistantMessage.stopReason).toBe("error"); + expect(assistantMessage.stopReason).toBe("aborted"); return { toolCallCount, diff --git a/packages/ai/test/empty.test.ts b/packages/ai/test/empty.test.ts index 30568cde..722191da 100644 --- a/packages/ai/test/empty.test.ts +++ b/packages/ai/test/empty.test.ts @@ -21,7 +21,7 @@ async function testEmptyMessage(llm: Model, options: Opt expect(response.role).toBe("assistant"); // Should handle empty string gracefully if (response.stopReason === "error") { - expect(response.error).toBeDefined(); + expect(response.errorMessage).toBeDefined(); } else { expect(response.content).toBeDefined(); } @@ -45,7 +45,7 @@ async function testEmptyStringMessage(llm: Model, option // Should handle empty string gracefully if (response.stopReason === "error") { - expect(response.error).toBeDefined(); + expect(response.errorMessage).toBeDefined(); } else { expect(response.content).toBeDefined(); } @@ -69,7 +69,7 @@ async function testWhitespaceOnlyMessage(llm: Model, opt // Should handle whitespace-only gracefully if (response.stopReason === "error") { - expect(response.error).toBeDefined(); + expect(response.errorMessage).toBeDefined(); } else { expect(response.content).toBeDefined(); } @@ -115,7 +115,7 @@ async function testEmptyAssistantMessage(llm: Model, opt // Should handle empty assistant message in context gracefully if (response.stopReason === "error") { - expect(response.error).toBeDefined(); + expect(response.errorMessage).toBeDefined(); } else { expect(response.content).toBeDefined(); expect(response.content.length).toBeGreaterThan(0); diff --git a/packages/ai/test/enum-test.ts b/packages/ai/test/enum-test.ts index 42918b2c..b4c4ed81 100644 --- a/packages/ai/test/enum-test.ts +++ b/packages/ai/test/enum-test.ts @@ -1,7 +1,7 @@ import { Type } from "@sinclair/typebox"; import { z } from "zod"; import { zodToJsonSchema } from "zod-to-json-schema"; -import { StringEnum } from "../src/typebox-helpers.js"; +import { StringEnum } from "../src/utils/typebox-helpers.js"; // Zod version const zodSchema = z.object({ diff --git a/packages/ai/test/handoff.test.ts b/packages/ai/test/handoff.test.ts index 3ad16256..0690bf36 100644 --- a/packages/ai/test/handoff.test.ts +++ b/packages/ai/test/handoff.test.ts @@ -238,7 +238,7 @@ const providerContexts = { cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, }, stopReason: "error", - error: "Request was aborted", + errorMessage: "Request was aborted", } satisfies AssistantMessage, toolResult: null, facts: { @@ -293,7 +293,7 @@ async function testProviderHandoff( // Check for error if (response.stopReason === "error") { - console.log(`[${sourceLabel} → ${targetModel.provider}] Failed with error: ${response.error}`); + console.log(`[${sourceLabel} → ${targetModel.provider}] Failed with error: ${response.errorMessage}`); return false; } diff --git a/packages/ai/test/generate.test.ts b/packages/ai/test/stream.test.ts similarity index 98% rename from packages/ai/test/generate.test.ts rename to packages/ai/test/stream.test.ts index 90c16b99..af5a1ef2 100644 --- a/packages/ai/test/generate.test.ts +++ b/packages/ai/test/stream.test.ts @@ -6,8 +6,8 @@ import { fileURLToPath } from "url"; import { afterAll, beforeAll, describe, expect, it } from "vitest"; import { getModel } from "../src/models.js"; import { complete, stream } from "../src/stream.js"; -import { StringEnum } from "../src/typebox-helpers.js"; import type { Api, Context, ImageContent, Model, OptionsForApi, Tool, ToolResultMessage } from "../src/types.js"; +import { StringEnum } from "../src/utils/typebox-helpers.js"; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); @@ -40,7 +40,7 @@ async function basicTextGeneration(model: Model, options expect(response.content).toBeTruthy(); expect(response.usage.input + response.usage.cacheRead).toBeGreaterThan(0); expect(response.usage.output).toBeGreaterThan(0); - expect(response.error).toBeFalsy(); + expect(response.errorMessage).toBeFalsy(); expect(response.content.map((b) => (b.type === "text" ? b.text : "")).join("")).toContain("Hello test successful"); context.messages.push(response); @@ -52,7 +52,7 @@ async function basicTextGeneration(model: Model, options expect(secondResponse.content).toBeTruthy(); expect(secondResponse.usage.input + secondResponse.usage.cacheRead).toBeGreaterThan(0); expect(secondResponse.usage.output).toBeGreaterThan(0); - expect(secondResponse.error).toBeFalsy(); + expect(secondResponse.errorMessage).toBeFalsy(); expect(secondResponse.content.map((b) => (b.type === "text" ? b.text : "")).join("")).toContain( "Goodbye test successful", ); @@ -192,7 +192,7 @@ async function handleThinking(model: Model, options?: Op const response = await s.result(); - expect(response.stopReason, `Error: ${response.error}`).toBe("stop"); + expect(response.stopReason, `Error: ${response.errorMessage}`).toBe("stop"); expect(thinkingStarted).toBe(true); expect(thinkingChunks.length).toBeGreaterThan(0); expect(thinkingCompleted).toBe(true); diff --git a/plan.md b/plan.md new file mode 100644 index 00000000..6c37bbce --- /dev/null +++ b/plan.md @@ -0,0 +1,514 @@ +# Agent API Rework Plan + +## Executive Summary + +Complete rewrite of the agent API with no backward compatibility constraints to support: + +1. **Granular streaming events** - Fine-grained deltas (`text_delta`, `thinking_delta`, `toolcall_delta`) with clear message boundaries +2. **Streaming tool execution** - Tools can progressively stream output via `AsyncIterable` +3. **Remote tool execution** - Automatically detect remote tools (missing `execute` function) and suspend/resume execution +4. **HTTP-friendly transport** - Minimize data transfer with server-managed sessions and SSE streaming + +Key design: +- `AgentEvent` is completely redesigned for granular streaming +- `AgentTool.execute` can return `Promise` or `AsyncIterable` (detected at runtime) +- Clean session-based API: `createSession()` and `execute()` +- `ExecuteStream` with `result()` method for getting pending tool calls +- Server manages all state, client is stateless +- No legacy code or compatibility layers + +## Problem Statement + +The current agent API has several limitations that prevent it from being used effectively in distributed environments: + +1. **Coarse Event Granularity**: `message_update` events send the full message instead of deltas, making HTTP streaming inefficient +2. **Blocking Tool Execution**: Tools must complete execution before any results can be streamed +3. **Local-Only Tools**: Tools must run in the same environment as the `prompt()` call, preventing remote tool execution +4. **No Suspension/Resume**: The API cannot pause execution for external tool handling and resume later + +## Requirements + +### R1: Fine-Grained Streaming Events +- Stream text deltas, not full messages +- Stream thinking/reasoning deltas +- Stream tool call argument deltas +- Stream tool execution output progressively +- Clear message boundaries with start/end events + +### R2: Remote Tool Execution +- Detect remote tools by absence of `execute` function (JSON.stringify naturally omits functions) +- Pause execution when remote tool is called +- Resume execution when remote tool completes +- Support both synchronous (local) and asynchronous (remote) tools + +### R3: HTTP-Friendly +- Events should be efficiently serializable for Server-Sent Events (SSE) +- Minimize data transfer - no large context objects in events +- Server manages session state, client sends only deltas + +### R4: Minimal API Surface +- Keep the API simple and intuitive +- No backward compatibility constraints - clean slate design +- Avoid complex state management on the client side + +## Proposed Solution + +### Core Concepts + +#### 1. Agent Sessions +Sessions that can be suspended and resumed for remote tool execution: + +```typescript +interface AgentSession { + id: string; + status: 'running' | 'awaiting_tool_execution' | 'completed' | 'error'; + model: Model; // Last used model + messages: Message[]; // Full conversation context + totalCost: number; // Accumulated cost across all turns + pendingToolCalls?: ToolCall[]; // When status is 'awaiting_tool_execution' +} +``` + +#### 2. Tool Execution +Tools always return an async iterable stream, making the API consistent: + +```typescript +type ToolExecutionEventStream = AsyncIterable; + +interface AgentTool extends Tool { + label: string; // Already exists + // If execute is defined -> local tool + // If execute is undefined (omitted during JSON serialization) -> remote tool + execute?: (id: string, params: TParams) => ToolExecutionEventStream; +} + +// Tool execution event for streaming +type ToolExecutionEvent = + | { type: 'delta'; delta: string } + | { type: 'complete'; output: string; details?: any }; + +// Detection logic remains simple +function isLocalTool(tool: AgentTool): boolean { + return tool.execute !== undefined; +} +``` + +#### 3. Message Types +Extend the Message union to handle errors and aborts cleanly: + +```typescript +// Extended Message type for cleaner error handling +type Message = + | UserMessage + | AssistantMessage // stopReason: 'stop' | 'length' | 'tool_calls' + | ToolResultMessage + | ErrorMessage // New: explicit error messages + | AbortedMessage; // New: explicit abort messages + +interface ErrorMessage { + role: 'error'; + error: string; + partial?: AssistantMessage; // Partial message before error +} + +interface AbortedMessage { + role: 'aborted'; + partial: AssistantMessage; // Partial message before abort +} +``` + +#### 4. Granular Event System +Fine-grained events with message boundaries: + +```typescript +type AgentEvent = + // Session lifecycle + | { type: 'session_start'; sessionId: string } + | { type: 'session_end'; sessionId: string; messages: Message[] } // Only NEW messages + | { type: 'awaiting_tool_execution'; sessionId: string; toolCalls: ToolCall[] } + + // Message boundaries (role determined by message.role) + | { type: 'message_start'; role: Message['role'] } + | { type: 'message_end'; message: Message } // Includes usage/cost for assistant messages + + // Assistant streaming (only during assistant messages) + | { type: 'text_start' } + | { type: 'text_delta'; delta: string } + | { type: 'text_end'; text: string } + + | { type: 'thinking_start' } + | { type: 'thinking_delta'; delta: string } + | { type: 'thinking_end'; thinking: string } + + | { type: 'toolcall_start'; index: number } + | { type: 'toolcall_delta'; index: number; delta: string } + | { type: 'toolcall_end'; index: number; toolCall: ToolCall } + + // Tool execution (local tools only) + | { type: 'tool_execution_start'; toolCallId: string; toolName: string; args: any } + | { type: 'tool_execution_delta'; toolCallId: string; delta: string } + | { type: 'tool_execution_end'; toolCallId: string; output: string; details?: any } + + // Errors + | { type: 'error'; error: string; message?: Message } +``` + +### New API Design + +```typescript +// Create or restore a session +function createSession( + sessionId: string, + context: AgentContext, + options?: PromptOptions // Model, temperature, etc. +): AgentSession; + +// Execute with event streaming +interface ExecuteResult { + status: 'completed' | 'awaiting_tool_execution'; + messages: Message[]; // New messages added during execution + pendingToolCalls?: ToolCall[]; // Present when status is 'awaiting_tool_execution' + cost: number; // Cost for this execution +} + +interface ExecuteStream extends AsyncIterable { + result(): Promise; +} + +function execute( + session: AgentSession, + input: UserMessage | ToolResultMessage[] +): ExecuteStream; + +// Example usage - clean flow without nested loops +const session = createSession('session-123', { + messages: [], + tools: [localTool, remoteTool] +}); + +// Execute and handle events +const stream = execute(session, { role: 'user', content: 'Hello' }); +for await (const event of stream) { + // Handle events for UI updates + console.log(event); +} + +// Check result after stream completes +const result = await stream.result(); +if (result.status === 'awaiting_tool_execution') { + // Execute remote tools externally + const toolResults = await executeRemoteTools(result.pendingToolCalls); + + // Resume execution with tool results + const resumeStream = execute(session, toolResults); + for await (const event of resumeStream) { + console.log(event); + } + + const finalResult = await resumeStream.result(); + // Continue until no more pending tools... +} +``` + +### Tool Execution Streaming + +All tools return a `ToolExecutionEventStream` for consistency: + +```typescript +// Example streaming tool with progressive output +const streamingSearchTool: AgentTool = { + name: 'search', + label: 'Search', + parameters: Type.Object({ query: Type.String() }), + async *execute(id, { query }) { + // Stream deltas as they become available + yield { type: 'delta', delta: 'Searching for: ' }; + yield { type: 'delta', delta: query }; + yield { type: 'delta', delta: '\nResults:\n' }; + + for (const result of await search(query)) { + yield { type: 'delta', delta: `- ${result.title}\n` }; + } + + // Final complete event with full output and optional details + yield { + type: 'complete', + output: fullOutput, + details: { resultCount: 10 } + }; + } +}; + +// Example simple tool (still returns AsyncIterable for consistency) +const simpleTool: AgentTool = { + name: 'calculate', + label: 'Calculate', + parameters: Type.Object({ expression: Type.String() }), + async *execute(id, { expression }) { + const result = eval(expression); // Don't do this in production! + + // Even simple tools emit complete event + yield { + type: 'complete', + output: `${expression} = ${result}`, + details: { result } + }; + } +}; + +// Helper for simple tools that don't need streaming +function createSimpleTool( + config: Omit, 'execute'>, + handler: (id: string, params: TParams) => Promise<{ output: string; details?: any }> +): AgentTool { + return { + ...config, + async *execute(id, params) { + const result = await handler(id, params); + yield { type: 'complete', ...result }; + } + }; +} + +// Usage with helper +const calculateTool = createSimpleTool( + { + name: 'calculate', + label: 'Calculate', + parameters: Type.Object({ expression: Type.String() }) + }, + async (id, { expression }) => { + const result = eval(expression); + return { + output: `${expression} = ${result}`, + details: { result } + }; + } +); +``` + +### HTTP Transport Layer + +Server manages session state, client only sends inputs: + +```typescript +// Server-side session store +class SessionStore { + private sessions = new Map(); + + create(sessionId: string, context: AgentContext, options?: PromptOptions): AgentSession { + const session = createSession(sessionId, context, options); + this.sessions.set(sessionId, session); + return session; + } + + get(sessionId: string): AgentSession | undefined { + return this.sessions.get(sessionId); + } + + // Optional: Get context for UI sync + getContext(sessionId: string): AgentContext | undefined { + return this.sessions.get(sessionId)?.messages; + } +} + +// API Endpoints +interface ExecuteRequest { + sessionId?: string; // Optional, server generates if not provided + input: UserMessage | ToolResultMessage[]; + context?: AgentContext; // Only for new sessions + options?: PromptOptions; // Model, temperature, etc. +} + +interface ExecuteResponse { + sessionId: string; + // SSE stream of AgentEvents +} + +// Server implementation +app.post('/api/agent/execute', async (req, res) => { + const { sessionId = generateId(), input, context, options } = req.body; + + // Get or create session + let session = sessionStore.get(sessionId); + if (!session) { + if (!context) { + return res.status(400).json({ error: 'Context required for new session' }); + } + session = sessionStore.create(sessionId, context, options); + } + + // Set up SSE + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'X-Session-Id': sessionId, // Return session ID in header + }); + + // Stream events + const stream = execute(session, input); + for await (const event of stream) { + res.write(`data: ${JSON.stringify(event)}\n\n`); + } + + // Get final result + const result = await stream.result(); + + // Send final event with status + res.write(`data: ${JSON.stringify({ + type: 'execute_complete', + status: result.status, + pendingToolCalls: result.pendingToolCalls + })}\n\n`); + + res.end(); +}); + +// Optional: Get session context for UI sync +app.get('/api/agent/session/:sessionId', (req, res) => { + const context = sessionStore.getContext(req.params.sessionId); + if (!context) { + return res.status(404).json({ error: 'Session not found' }); + } + res.json({ messages: context }); +}); + +// Client implementation +async function executeOnServer(input: UserMessage, tools: AgentTool[]) { + // Note: tools with execute functions will have them stripped during JSON.stringify + const response = await fetch('/api/agent/execute', { + method: 'POST', + body: JSON.stringify({ + input, + context: { messages: [], tools }, // Tools without execute = remote + options: { model: 'gpt-4o-mini' } + }) + }); + + const sessionId = response.headers.get('X-Session-Id'); + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + + let pendingToolCalls = []; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const lines = decoder.decode(value).split('\n'); + for (const line of lines) { + if (line.startsWith('data: ')) { + const event = JSON.parse(line.slice(6)); + + // Update UI with event + handleEvent(event); + + if (event.type === 'execute_complete' && event.status === 'awaiting_tool_execution') { + pendingToolCalls = event.pendingToolCalls; + } + } + } + } + + // Execute remote tools if needed + if (pendingToolCalls.length > 0) { + const toolResults = await executeLocalTools(pendingToolCalls); + + // Resume on server + await fetch('/api/agent/execute', { + method: 'POST', + body: JSON.stringify({ + sessionId, + input: toolResults + }) + }); + } +} +``` + +## Implementation Plan + +### Phase 1: Core Event System Refactor +1. Define new granular `AgentEvent` types with message boundaries +2. Update agent loop to detect local vs remote tools (via `execute` presence) +3. Implement `ExecuteStream` with `result()` method +4. Map underlying streaming events to granular agent events + +### Phase 2: Session Management +1. Implement `AgentSession` type with status tracking +2. Create `createSession()` and `execute()` functions +3. Handle suspension when remote tools detected +4. Track cost and model per session + +### Phase 3: Streaming Tool Execution +1. Define `ToolExecutionEventStream` type alias +2. Update all tools to return `ToolExecutionEventStream` +3. Emit `tool_execution_delta` events during streaming +4. Provide `createSimpleTool` helper for non-streaming tools + +### Phase 4: HTTP Transport +1. Implement server-side `SessionStore` +2. Create `/api/agent/execute` endpoint with SSE +3. Add `/api/agent/session/:id` for context sync +4. Build client SDK for SSE consumption + +### Phase 5: Documentation & Examples +1. Create comprehensive documentation for new API +2. Build example applications demonstrating all features +3. Add TypeScript types and JSDoc comments +4. Create cookbook for common patterns + +## Key Design Decisions + +### Why AgentSession Instead of ExecutionSession? +- Clearer naming that matches the agent concept +- Consistent with `AgentContext`, `AgentTool`, `AgentEvent` + +### Why Detect Remote Tools via Missing execute? +- JSON.stringify naturally omits functions +- No need for explicit `mode` field +- Zero configuration for remote tools + +### Why ExecuteStream with result()? +- Follows the existing EventStream pattern +- Separates streaming (UI updates) from final state +- Clean way to get pending tool calls after streaming + +### Why awaiting_tool_execution Instead of suspended? +- More descriptive of the actual state +- Clear indication that client action is required +- Better for debugging and logging + +## Clean Break Strategy + +Since we're not maintaining backward compatibility, we can: + +1. **Remove all legacy code** - No need for wrapper functions or event mappers +2. **Simplify type definitions** - No union types for old/new formats +3. **Optimize for the new use cases** - Design purely for distributed execution +4. **Clear naming** - No need to avoid conflicts with existing names +5. **Breaking changes allowed** - Can restructure packages and exports as needed + +This is a complete rewrite of the agent module with a new, cleaner API that fully supports: +- Granular streaming events +- Progressive tool execution +- Remote tool suspension/resumption +- Efficient HTTP transport + +## Open Questions + +1. **Tool Timeout**: How long should we wait for remote tool execution before timing out? +2. **Session Persistence**: Should sessions be stored in memory or persisted (Redis/DB)? +3. **Tool Chaining**: Can a remote tool trigger another LLM call that uses more tools? +4. **Error Recovery**: How to handle partial tool execution failures? +5. **Rate Limiting**: How to prevent abuse of the HTTP API? +6. **Error/Abort Messages**: Should we use explicit `ErrorMessage`/`AbortedMessage` types or keep errors in `AssistantMessage` with `stopReason: 'error'`? Explicit types are cleaner but add complexity. + +## Next Steps + +1. Implement Phase 1 - Core event system with granular events +2. Test with existing tools to ensure compatibility +3. Add streaming support to one built-in tool as proof of concept +4. Build minimal HTTP server to validate the design +5. Gather feedback and iterate before full implementation \ No newline at end of file