mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-15 08:03:39 +00:00
refactor(ai): improve error handling and stop reason types
- Add 'aborted' as a distinct stop reason separate from 'error'
- Change AssistantMessage.error to errorMessage for clarity
- Update error event to include reason field ('error' | 'aborted')
- Map provider-specific safety/refusal reasons to 'error' stop reason
- Reorganize utility functions into utils/ directory
- Rename agent.ts to agent-loop.ts for better clarity
- Fix error handling in all providers to properly distinguish abort from error
This commit is contained in:
parent
293a6e878d
commit
2296dc4052
22 changed files with 703 additions and 139 deletions
|
|
@ -267,8 +267,8 @@ All streaming events emitted during assistant message generation:
|
|||
| `toolcall_start` | Tool call begins | `contentIndex`: Position in content array |
|
||||
| `toolcall_delta` | Tool arguments streaming | `delta`: JSON chunk, `partial.content[contentIndex].arguments`: Partial parsed args |
|
||||
| `toolcall_end` | Tool call complete | `toolCall`: Complete validated tool call with `id`, `name`, `arguments` |
|
||||
| `done` | Stream complete | `reason`: Stop reason, `message`: Final assistant message |
|
||||
| `error` | Error occurred | `error`: Error message, `partial`: Partial message before error |
|
||||
| `done` | Stream complete | `reason`: Stop reason ("stop", "length", "toolUse"), `message`: Final assistant message |
|
||||
| `error` | Error occurred | `reason`: Error type ("error" or "aborted"), `error`: AssistantMessage with partial content |
|
||||
|
||||
## Image Input
|
||||
|
||||
|
|
@ -399,16 +399,43 @@ for await (const event of s) {
|
|||
}
|
||||
```
|
||||
|
||||
## Errors & Abort Signal
|
||||
## Stop Reasons
|
||||
|
||||
When a request ends with an error (including aborts and tool call validation errors), the API returns an `AssistantMessage` with:
|
||||
- `stopReason: 'error'` - Indicates the request ended with an error
|
||||
- `error: string` - Error message describing what happened
|
||||
- `content: array` - **Partial content** accumulated before the error
|
||||
- `usage: Usage` - **Token counts and costs** (may be incomplete depending on when error occurred)
|
||||
Every `AssistantMessage` includes a `stopReason` field that indicates how the generation ended:
|
||||
|
||||
### Aborting
|
||||
The abort signal allows you to cancel in-progress requests. Aborted requests return an `AssistantMessage` with `stopReason === 'error'`.
|
||||
- `"stop"` - Normal completion, the model finished its response
|
||||
- `"length"` - Output hit the maximum token limit
|
||||
- `"toolUse"` - Model is calling tools and expects tool results
|
||||
- `"error"` - An error occurred during generation
|
||||
- `"aborted"` - Request was cancelled via abort signal
|
||||
|
||||
## Error Handling
|
||||
|
||||
When a request ends with an error (including aborts and tool call validation errors), the streaming API emits an error event:
|
||||
|
||||
```typescript
|
||||
// In streaming
|
||||
for await (const event of stream) {
|
||||
if (event.type === 'error') {
|
||||
// event.reason is either "error" or "aborted"
|
||||
// event.error is the AssistantMessage with partial content
|
||||
console.error(`Error (${event.reason}):`, event.error.errorMessage);
|
||||
console.log('Partial content:', event.error.content);
|
||||
}
|
||||
}
|
||||
|
||||
// The final message will have the error details
|
||||
const message = await stream.result();
|
||||
if (message.stopReason === 'error' || message.stopReason === 'aborted') {
|
||||
console.error('Request failed:', message.errorMessage);
|
||||
// message.content contains any partial content received before the error
|
||||
// message.usage contains partial token counts and costs
|
||||
}
|
||||
```
|
||||
|
||||
### Aborting Requests
|
||||
|
||||
The abort signal allows you to cancel in-progress requests. Aborted requests have `stopReason === 'aborted'`:
|
||||
|
||||
```typescript
|
||||
import { getModel, stream } from '@mariozechner/pi-ai';
|
||||
|
|
@ -429,14 +456,15 @@ for await (const event of s) {
|
|||
if (event.type === 'text_delta') {
|
||||
process.stdout.write(event.delta);
|
||||
} else if (event.type === 'error') {
|
||||
console.log('Error:', event.error);
|
||||
// event.reason tells you if it was "error" or "aborted"
|
||||
console.log(`${event.reason === 'aborted' ? 'Aborted' : 'Error'}:`, event.error.errorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
// Get results (may be partial if aborted)
|
||||
const response = await s.result();
|
||||
if (response.stopReason === 'error') {
|
||||
console.log('Error:', response.error);
|
||||
if (response.stopReason === 'aborted') {
|
||||
console.log('Request was aborted:', response.errorMessage);
|
||||
console.log('Partial content received:', response.content);
|
||||
console.log('Tokens used:', response.usage);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
import { EventStream } from "../event-stream.js";
|
||||
import { streamSimple } from "../stream.js";
|
||||
import type { AssistantMessage, Context, Message, ToolResultMessage, UserMessage } from "../types.js";
|
||||
import { validateToolArguments } from "../validation.js";
|
||||
import { EventStream } from "../utils/event-stream.js";
|
||||
import { validateToolArguments } from "../utils/validation.js";
|
||||
import type { AgentContext, AgentEvent, AgentTool, AgentToolResult, PromptConfig } from "./types.js";
|
||||
|
||||
// Main prompt function - returns a stream of events
|
||||
export function prompt(
|
||||
export function agentLoop(
|
||||
prompt: UserMessage,
|
||||
context: AgentContext,
|
||||
config: PromptConfig,
|
||||
|
|
@ -46,21 +46,29 @@ export function prompt(
|
|||
firstTurn = false;
|
||||
}
|
||||
// Stream assistant response
|
||||
const assistantMessage = await streamAssistantResponse(currentContext, config, signal, stream, streamFn);
|
||||
newMessages.push(assistantMessage);
|
||||
const message = await streamAssistantResponse(currentContext, config, signal, stream, streamFn);
|
||||
newMessages.push(message);
|
||||
|
||||
if (message.stopReason === "error" || message.stopReason === "aborted") {
|
||||
// Stop the loop on error or abort
|
||||
stream.push({ type: "turn_end", message, toolResults: [] });
|
||||
stream.push({ type: "agent_end", messages: newMessages });
|
||||
stream.end(newMessages);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check for tool calls
|
||||
const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall");
|
||||
const toolCalls = message.content.filter((c) => c.type === "toolCall");
|
||||
hasMoreToolCalls = toolCalls.length > 0;
|
||||
|
||||
const toolResults: ToolResultMessage[] = [];
|
||||
if (hasMoreToolCalls) {
|
||||
// Execute tool calls
|
||||
toolResults.push(...(await executeToolCalls(currentContext.tools, assistantMessage, signal, stream)));
|
||||
toolResults.push(...(await executeToolCalls(currentContext.tools, message, signal, stream)));
|
||||
currentContext.messages.push(...toolResults);
|
||||
newMessages.push(...toolResults);
|
||||
}
|
||||
stream.push({ type: "turn_end", assistantMessage, toolResults: toolResults });
|
||||
stream.push({ type: "turn_end", message, toolResults: toolResults });
|
||||
}
|
||||
stream.push({ type: "agent_end", messages: newMessages });
|
||||
stream.end(newMessages);
|
||||
|
|
@ -1,3 +1,3 @@
|
|||
export { prompt } from "./agent.js";
|
||||
export { agentLoop } from "./agent-loop.js";
|
||||
export * from "./tools/index.js";
|
||||
export type { AgentContext, AgentEvent, AgentTool, PromptConfig } from "./types.js";
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ export type AgentEvent =
|
|||
isError: boolean;
|
||||
}
|
||||
// Emitted when a full turn completes
|
||||
| { type: "turn_end"; assistantMessage: AssistantMessage; toolResults: ToolResultMessage[] }
|
||||
| { type: "turn_end"; message: AssistantMessage; toolResults: ToolResultMessage[] }
|
||||
// Emitted when the agent has completed all its turns. All messages from every turn are
|
||||
// contained in messages, which can be appended to the context
|
||||
| { type: "agent_end"; messages: AgentContext["messages"] };
|
||||
|
|
|
|||
|
|
@ -5,5 +5,5 @@ export * from "./providers/google.js";
|
|||
export * from "./providers/openai-completions.js";
|
||||
export * from "./providers/openai-responses.js";
|
||||
export * from "./stream.js";
|
||||
export * from "./typebox-helpers.js";
|
||||
export * from "./types.js";
|
||||
export * from "./utils/typebox-helpers.js";
|
||||
|
|
|
|||
|
|
@ -2994,23 +2994,6 @@ export const MODELS = {
|
|||
contextWindow: 32768,
|
||||
maxTokens: 4096,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"cohere/command-r-plus-08-2024": {
|
||||
id: "cohere/command-r-plus-08-2024",
|
||||
name: "Cohere: Command R+ (08-2024)",
|
||||
api: "openai-completions",
|
||||
provider: "openrouter",
|
||||
baseUrl: "https://openrouter.ai/api/v1",
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: {
|
||||
input: 2.5,
|
||||
output: 10,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
},
|
||||
contextWindow: 128000,
|
||||
maxTokens: 4000,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"cohere/command-r-08-2024": {
|
||||
id: "cohere/command-r-08-2024",
|
||||
name: "Cohere: Command R (08-2024)",
|
||||
|
|
@ -3028,6 +3011,23 @@ export const MODELS = {
|
|||
contextWindow: 128000,
|
||||
maxTokens: 4000,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"cohere/command-r-plus-08-2024": {
|
||||
id: "cohere/command-r-plus-08-2024",
|
||||
name: "Cohere: Command R+ (08-2024)",
|
||||
api: "openai-completions",
|
||||
provider: "openrouter",
|
||||
baseUrl: "https://openrouter.ai/api/v1",
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: {
|
||||
input: 2.5,
|
||||
output: 10,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
},
|
||||
contextWindow: 128000,
|
||||
maxTokens: 4000,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"microsoft/phi-3.5-mini-128k-instruct": {
|
||||
id: "microsoft/phi-3.5-mini-128k-instruct",
|
||||
name: "Microsoft: Phi-3.5 Mini 128K Instruct",
|
||||
|
|
@ -3130,6 +3130,23 @@ export const MODELS = {
|
|||
contextWindow: 131072,
|
||||
maxTokens: 16384,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"mistralai/mistral-7b-instruct-v0.3": {
|
||||
id: "mistralai/mistral-7b-instruct-v0.3",
|
||||
name: "Mistral: Mistral 7B Instruct v0.3",
|
||||
api: "openai-completions",
|
||||
provider: "openrouter",
|
||||
baseUrl: "https://openrouter.ai/api/v1",
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: {
|
||||
input: 0.028,
|
||||
output: 0.054,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
},
|
||||
contextWindow: 32768,
|
||||
maxTokens: 16384,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"mistralai/mistral-7b-instruct:free": {
|
||||
id: "mistralai/mistral-7b-instruct:free",
|
||||
name: "Mistral: Mistral 7B Instruct (free)",
|
||||
|
|
@ -3164,23 +3181,6 @@ export const MODELS = {
|
|||
contextWindow: 32768,
|
||||
maxTokens: 16384,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"mistralai/mistral-7b-instruct-v0.3": {
|
||||
id: "mistralai/mistral-7b-instruct-v0.3",
|
||||
name: "Mistral: Mistral 7B Instruct v0.3",
|
||||
api: "openai-completions",
|
||||
provider: "openrouter",
|
||||
baseUrl: "https://openrouter.ai/api/v1",
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: {
|
||||
input: 0.028,
|
||||
output: 0.054,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
},
|
||||
contextWindow: 32768,
|
||||
maxTokens: 16384,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"microsoft/phi-3-mini-128k-instruct": {
|
||||
id: "microsoft/phi-3-mini-128k-instruct",
|
||||
name: "Microsoft: Phi-3 Mini 128K Instruct",
|
||||
|
|
@ -3351,23 +3351,6 @@ export const MODELS = {
|
|||
contextWindow: 128000,
|
||||
maxTokens: 4096,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"mistralai/mistral-small": {
|
||||
id: "mistralai/mistral-small",
|
||||
name: "Mistral Small",
|
||||
api: "openai-completions",
|
||||
provider: "openrouter",
|
||||
baseUrl: "https://openrouter.ai/api/v1",
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: {
|
||||
input: 0.19999999999999998,
|
||||
output: 0.6,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
},
|
||||
contextWindow: 32768,
|
||||
maxTokens: 4096,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"mistralai/mistral-tiny": {
|
||||
id: "mistralai/mistral-tiny",
|
||||
name: "Mistral Tiny",
|
||||
|
|
@ -3385,6 +3368,23 @@ export const MODELS = {
|
|||
contextWindow: 32768,
|
||||
maxTokens: 4096,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"mistralai/mistral-small": {
|
||||
id: "mistralai/mistral-small",
|
||||
name: "Mistral Small",
|
||||
api: "openai-completions",
|
||||
provider: "openrouter",
|
||||
baseUrl: "https://openrouter.ai/api/v1",
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: {
|
||||
input: 0.19999999999999998,
|
||||
output: 0.6,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
},
|
||||
contextWindow: 32768,
|
||||
maxTokens: 4096,
|
||||
} satisfies Model<"openai-completions">,
|
||||
"mistralai/mixtral-8x7b-instruct": {
|
||||
id: "mistralai/mixtral-8x7b-instruct",
|
||||
name: "Mistral: Mixtral 8x7B Instruct",
|
||||
|
|
|
|||
|
|
@ -4,8 +4,6 @@ import type {
|
|||
MessageCreateParamsStreaming,
|
||||
MessageParam,
|
||||
} from "@anthropic-ai/sdk/resources/messages.js";
|
||||
import { AssistantMessageEventStream } from "../event-stream.js";
|
||||
import { parseStreamingJson } from "../json-parse.js";
|
||||
import { calculateCost } from "../models.js";
|
||||
import type {
|
||||
Api,
|
||||
|
|
@ -22,7 +20,9 @@ import type {
|
|||
ToolCall,
|
||||
ToolResultMessage,
|
||||
} from "../types.js";
|
||||
import { validateToolArguments } from "../validation.js";
|
||||
import { AssistantMessageEventStream } from "../utils/event-stream.js";
|
||||
import { parseStreamingJson } from "../utils/json-parse.js";
|
||||
import { validateToolArguments } from "../utils/validation.js";
|
||||
import { transformMessages } from "./transorm-messages.js";
|
||||
|
||||
export interface AnthropicOptions extends StreamOptions {
|
||||
|
|
@ -196,13 +196,17 @@ export const streamAnthropic: StreamFunction<"anthropic-messages"> = (
|
|||
throw new Error("Request was aborted");
|
||||
}
|
||||
|
||||
if (output.stopReason === "aborted" || output.stopReason === "error") {
|
||||
throw new Error("An unkown error ocurred");
|
||||
}
|
||||
|
||||
stream.push({ type: "done", reason: output.stopReason, message: output });
|
||||
stream.end();
|
||||
} catch (error) {
|
||||
for (const block of output.content) delete (block as any).index;
|
||||
output.stopReason = "error";
|
||||
output.error = error instanceof Error ? error.message : JSON.stringify(error);
|
||||
stream.push({ type: "error", error: output.error, partial: output });
|
||||
output.stopReason = options?.signal?.aborted ? "aborted" : "error";
|
||||
output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error);
|
||||
stream.push({ type: "error", reason: output.stopReason, error: output });
|
||||
stream.end();
|
||||
}
|
||||
})();
|
||||
|
|
@ -466,7 +470,7 @@ function mapStopReason(reason: Anthropic.Messages.StopReason): StopReason {
|
|||
case "tool_use":
|
||||
return "toolUse";
|
||||
case "refusal":
|
||||
return "safety";
|
||||
return "error";
|
||||
case "pause_turn": // Stop is good enough -> resubmit
|
||||
return "stop";
|
||||
case "stop_sequence":
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import {
|
|||
GoogleGenAI,
|
||||
type Part,
|
||||
} from "@google/genai";
|
||||
import { AssistantMessageEventStream } from "../event-stream.js";
|
||||
import { calculateCost } from "../models.js";
|
||||
import type {
|
||||
Api,
|
||||
|
|
@ -22,7 +21,8 @@ import type {
|
|||
Tool,
|
||||
ToolCall,
|
||||
} from "../types.js";
|
||||
import { validateToolArguments } from "../validation.js";
|
||||
import { AssistantMessageEventStream } from "../utils/event-stream.js";
|
||||
import { validateToolArguments } from "../utils/validation.js";
|
||||
import { transformMessages } from "./transorm-messages.js";
|
||||
|
||||
export interface GoogleOptions extends StreamOptions {
|
||||
|
|
@ -226,12 +226,21 @@ export const streamGoogle: StreamFunction<"google-generative-ai"> = (
|
|||
}
|
||||
}
|
||||
|
||||
if (options?.signal?.aborted) {
|
||||
throw new Error("Request was aborted");
|
||||
}
|
||||
|
||||
if (output.stopReason === "aborted" || output.stopReason === "error") {
|
||||
throw new Error("An unkown error ocurred");
|
||||
}
|
||||
|
||||
stream.push({ type: "done", reason: output.stopReason, message: output });
|
||||
stream.end();
|
||||
} catch (error) {
|
||||
output.stopReason = "error";
|
||||
output.error = error instanceof Error ? error.message : JSON.stringify(error);
|
||||
stream.push({ type: "error", error: output.error, partial: output });
|
||||
for (const block of output.content) delete (block as any).index;
|
||||
output.stopReason = options?.signal?.aborted ? "aborted" : "error";
|
||||
output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error);
|
||||
stream.push({ type: "error", reason: output.stopReason, error: output });
|
||||
stream.end();
|
||||
}
|
||||
})();
|
||||
|
|
@ -424,7 +433,7 @@ function mapStopReason(reason: FinishReason): StopReason {
|
|||
case FinishReason.SAFETY:
|
||||
case FinishReason.IMAGE_SAFETY:
|
||||
case FinishReason.RECITATION:
|
||||
return "safety";
|
||||
return "error";
|
||||
case FinishReason.FINISH_REASON_UNSPECIFIED:
|
||||
case FinishReason.OTHER:
|
||||
case FinishReason.LANGUAGE:
|
||||
|
|
|
|||
|
|
@ -7,8 +7,6 @@ import type {
|
|||
ChatCompletionContentPartText,
|
||||
ChatCompletionMessageParam,
|
||||
} from "openai/resources/chat/completions.js";
|
||||
import { AssistantMessageEventStream } from "../event-stream.js";
|
||||
import { parseStreamingJson } from "../json-parse.js";
|
||||
import { calculateCost } from "../models.js";
|
||||
import type {
|
||||
AssistantMessage,
|
||||
|
|
@ -22,7 +20,9 @@ import type {
|
|||
Tool,
|
||||
ToolCall,
|
||||
} from "../types.js";
|
||||
import { validateToolArguments } from "../validation.js";
|
||||
import { AssistantMessageEventStream } from "../utils/event-stream.js";
|
||||
import { parseStreamingJson } from "../utils/json-parse.js";
|
||||
import { validateToolArguments } from "../utils/validation.js";
|
||||
import { transformMessages } from "./transorm-messages.js";
|
||||
|
||||
export interface OpenAICompletionsOptions extends StreamOptions {
|
||||
|
|
@ -231,13 +231,17 @@ export const streamOpenAICompletions: StreamFunction<"openai-completions"> = (
|
|||
throw new Error("Request was aborted");
|
||||
}
|
||||
|
||||
if (output.stopReason === "aborted" || output.stopReason === "error") {
|
||||
throw new Error("An unkown error ocurred");
|
||||
}
|
||||
|
||||
stream.push({ type: "done", reason: output.stopReason, message: output });
|
||||
stream.end();
|
||||
return output;
|
||||
} catch (error) {
|
||||
output.stopReason = "error";
|
||||
output.error = error instanceof Error ? error.message : String(error);
|
||||
stream.push({ type: "error", error: output.error, partial: output });
|
||||
for (const block of output.content) delete (block as any).index;
|
||||
output.stopReason = options?.signal?.aborted ? "aborted" : "error";
|
||||
output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error);
|
||||
stream.push({ type: "error", reason: output.stopReason, error: output });
|
||||
stream.end();
|
||||
}
|
||||
})();
|
||||
|
|
@ -413,7 +417,7 @@ function mapStopReason(reason: ChatCompletionChunk.Choice["finish_reason"]): Sto
|
|||
case "tool_calls":
|
||||
return "toolUse";
|
||||
case "content_filter":
|
||||
return "safety";
|
||||
return "error";
|
||||
default: {
|
||||
const _exhaustive: never = reason;
|
||||
throw new Error(`Unhandled stop reason: ${_exhaustive}`);
|
||||
|
|
|
|||
|
|
@ -10,8 +10,6 @@ import type {
|
|||
ResponseOutputMessage,
|
||||
ResponseReasoningItem,
|
||||
} from "openai/resources/responses/responses.js";
|
||||
import { AssistantMessageEventStream } from "../event-stream.js";
|
||||
import { parseStreamingJson } from "../json-parse.js";
|
||||
import { calculateCost } from "../models.js";
|
||||
import type {
|
||||
Api,
|
||||
|
|
@ -26,7 +24,9 @@ import type {
|
|||
Tool,
|
||||
ToolCall,
|
||||
} from "../types.js";
|
||||
import { validateToolArguments } from "../validation.js";
|
||||
import { AssistantMessageEventStream } from "../utils/event-stream.js";
|
||||
import { parseStreamingJson } from "../utils/json-parse.js";
|
||||
import { validateToolArguments } from "../utils/validation.js";
|
||||
import { transformMessages } from "./transorm-messages.js";
|
||||
|
||||
// OpenAI Responses-specific options
|
||||
|
|
@ -268,17 +268,9 @@ export const streamOpenAIResponses: StreamFunction<"openai-responses"> = (
|
|||
}
|
||||
// Handle errors
|
||||
else if (event.type === "error") {
|
||||
output.stopReason = "error";
|
||||
output.error = `Code ${event.code}: ${event.message}` || "Unknown error";
|
||||
stream.push({ type: "error", error: output.error, partial: output });
|
||||
stream.end();
|
||||
return output;
|
||||
throw new Error(`Error Code ${event.code}: ${event.message}` || "Unknown error");
|
||||
} else if (event.type === "response.failed") {
|
||||
output.stopReason = "error";
|
||||
output.error = "Unknown error";
|
||||
stream.push({ type: "error", error: output.error, partial: output });
|
||||
stream.end();
|
||||
return output;
|
||||
throw new Error("Unknown error");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -286,12 +278,17 @@ export const streamOpenAIResponses: StreamFunction<"openai-responses"> = (
|
|||
throw new Error("Request was aborted");
|
||||
}
|
||||
|
||||
if (output.stopReason === "aborted" || output.stopReason === "error") {
|
||||
throw new Error("An unkown error ocurred");
|
||||
}
|
||||
|
||||
stream.push({ type: "done", reason: output.stopReason, message: output });
|
||||
stream.end();
|
||||
} catch (error) {
|
||||
output.stopReason = "error";
|
||||
output.error = error instanceof Error ? error.message : JSON.stringify(error);
|
||||
stream.push({ type: "error", error: output.error, partial: output });
|
||||
for (const block of output.content) delete (block as any).index;
|
||||
output.stopReason = options?.signal?.aborted ? "aborted" : "error";
|
||||
output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error);
|
||||
stream.push({ type: "error", reason: output.stopReason, error: output });
|
||||
stream.end();
|
||||
}
|
||||
})();
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
import type { AssistantMessageEventStream } from "./event-stream.js";
|
||||
import type { AnthropicOptions } from "./providers/anthropic.js";
|
||||
import type { GoogleOptions } from "./providers/google.js";
|
||||
import type { OpenAICompletionsOptions } from "./providers/openai-completions.js";
|
||||
import type { OpenAIResponsesOptions } from "./providers/openai-responses.js";
|
||||
import type { AssistantMessageEventStream } from "./utils/event-stream.js";
|
||||
|
||||
export type { AssistantMessageEventStream } from "./event-stream.js";
|
||||
export type { AssistantMessageEventStream } from "./utils/event-stream.js";
|
||||
|
||||
export type Api = "openai-completions" | "openai-responses" | "anthropic-messages" | "google-generative-ai";
|
||||
|
||||
|
|
@ -90,7 +90,7 @@ export interface Usage {
|
|||
};
|
||||
}
|
||||
|
||||
export type StopReason = "stop" | "length" | "toolUse" | "safety" | "error";
|
||||
export type StopReason = "stop" | "length" | "toolUse" | "error" | "aborted";
|
||||
|
||||
export interface UserMessage {
|
||||
role: "user";
|
||||
|
|
@ -105,7 +105,7 @@ export interface AssistantMessage {
|
|||
model: string;
|
||||
usage: Usage;
|
||||
stopReason: StopReason;
|
||||
error?: string;
|
||||
errorMessage?: string;
|
||||
}
|
||||
|
||||
export interface ToolResultMessage<TDetails = any> {
|
||||
|
|
@ -144,8 +144,8 @@ export type AssistantMessageEvent =
|
|||
| { type: "toolcall_start"; contentIndex: number; partial: AssistantMessage }
|
||||
| { type: "toolcall_delta"; contentIndex: number; delta: string; partial: AssistantMessage }
|
||||
| { type: "toolcall_end"; contentIndex: number; toolCall: ToolCall; partial: AssistantMessage }
|
||||
| { type: "done"; reason: StopReason; message: AssistantMessage }
|
||||
| { type: "error"; error: string; partial: AssistantMessage };
|
||||
| { type: "done"; reason: Extract<StopReason, "stop" | "length" | "toolUse">; message: AssistantMessage }
|
||||
| { type: "error"; reason: Extract<StopReason, "aborted" | "error">; error: AssistantMessage };
|
||||
|
||||
// Model interface for the unified model system
|
||||
export interface Model<TApi extends Api> {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import type { AssistantMessage, AssistantMessageEvent } from "./types.js";
|
||||
import type { AssistantMessage, AssistantMessageEvent } from "../types.js";
|
||||
|
||||
// Generic event stream class for async iteration
|
||||
export class EventStream<T, R = T> implements AsyncIterable<T> {
|
||||
|
|
@ -73,7 +73,7 @@ export class AssistantMessageEventStream extends EventStream<AssistantMessageEve
|
|||
if (event.type === "done") {
|
||||
return event.message;
|
||||
} else if (event.type === "error") {
|
||||
return event.partial;
|
||||
return event.error;
|
||||
}
|
||||
throw new Error("Unexpected event type for final result");
|
||||
},
|
||||
|
|
@ -5,7 +5,7 @@ import addFormatsModule from "ajv-formats";
|
|||
const Ajv = (AjvModule as any).default || AjvModule;
|
||||
const addFormats = (addFormatsModule as any).default || addFormatsModule;
|
||||
|
||||
import type { Tool, ToolCall } from "./types.js";
|
||||
import type { Tool, ToolCall } from "../types.js";
|
||||
|
||||
// Create a singleton AJV instance with formats
|
||||
const ajv = new Ajv({ allErrors: true, strict: false });
|
||||
|
|
@ -25,7 +25,7 @@ async function testAbortSignal<TApi extends Api>(llm: Model<TApi>, options: Opti
|
|||
const msg = await response.result();
|
||||
|
||||
// If we get here without throwing, the abort didn't work
|
||||
expect(msg.stopReason).toBe("error");
|
||||
expect(msg.stopReason).toBe("aborted");
|
||||
expect(msg.content.length).toBeGreaterThan(0);
|
||||
|
||||
context.messages.push(msg);
|
||||
|
|
@ -46,7 +46,7 @@ async function testImmediateAbort<TApi extends Api>(llm: Model<TApi>, options: O
|
|||
};
|
||||
|
||||
const response = await complete(llm, context, { ...options, signal: controller.signal });
|
||||
expect(response.stopReason).toBe("error");
|
||||
expect(response.stopReason).toBe("aborted");
|
||||
}
|
||||
|
||||
describe("AI Providers Abort Tests", () => {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import { describe, expect, it } from "vitest";
|
||||
import { prompt } from "../src/agent/agent.js";
|
||||
import { agentLoop } from "../src/agent/agent-loop.js";
|
||||
import { calculateTool } from "../src/agent/tools/calculate.js";
|
||||
import type { AgentContext, AgentEvent, PromptConfig } from "../src/agent/types.js";
|
||||
import { getModel } from "../src/models.js";
|
||||
|
|
@ -42,7 +42,7 @@ async function calculateTest<TApi extends Api>(model: Model<TApi>, options: Opti
|
|||
let finalAnswer: number | undefined;
|
||||
|
||||
// Execute the prompt
|
||||
const stream = prompt(userPrompt, context, config);
|
||||
const stream = agentLoop(userPrompt, context, config);
|
||||
|
||||
for await (const event of stream) {
|
||||
events.push(event);
|
||||
|
|
@ -55,7 +55,7 @@ async function calculateTest<TApi extends Api>(model: Model<TApi>, options: Opti
|
|||
|
||||
case "turn_end":
|
||||
console.log(`=== Turn ${turns} ended with ${event.toolResults.length} tool results ===`);
|
||||
console.log(event.assistantMessage);
|
||||
console.log(event.message);
|
||||
break;
|
||||
|
||||
case "tool_execution_end":
|
||||
|
|
@ -188,7 +188,7 @@ async function abortTest<TApi extends Api>(model: Model<TApi>, options: OptionsF
|
|||
let finalMessages: Message[] | undefined;
|
||||
|
||||
// Execute the prompt
|
||||
const stream = prompt(userPrompt, context, config, abortController.signal);
|
||||
const stream = agentLoop(userPrompt, context, config, abortController.signal);
|
||||
|
||||
// Abort after first tool execution
|
||||
const abortPromise = (async () => {
|
||||
|
|
@ -222,7 +222,7 @@ async function abortTest<TApi extends Api>(model: Model<TApi>, options: OptionsF
|
|||
|
||||
// Should have executed 1 tool call before abort
|
||||
expect(toolCallCount).toBeGreaterThanOrEqual(1);
|
||||
expect(assistantMessage.stopReason).toBe("error");
|
||||
expect(assistantMessage.stopReason).toBe("aborted");
|
||||
|
||||
return {
|
||||
toolCallCount,
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ async function testEmptyMessage<TApi extends Api>(llm: Model<TApi>, options: Opt
|
|||
expect(response.role).toBe("assistant");
|
||||
// Should handle empty string gracefully
|
||||
if (response.stopReason === "error") {
|
||||
expect(response.error).toBeDefined();
|
||||
expect(response.errorMessage).toBeDefined();
|
||||
} else {
|
||||
expect(response.content).toBeDefined();
|
||||
}
|
||||
|
|
@ -45,7 +45,7 @@ async function testEmptyStringMessage<TApi extends Api>(llm: Model<TApi>, option
|
|||
|
||||
// Should handle empty string gracefully
|
||||
if (response.stopReason === "error") {
|
||||
expect(response.error).toBeDefined();
|
||||
expect(response.errorMessage).toBeDefined();
|
||||
} else {
|
||||
expect(response.content).toBeDefined();
|
||||
}
|
||||
|
|
@ -69,7 +69,7 @@ async function testWhitespaceOnlyMessage<TApi extends Api>(llm: Model<TApi>, opt
|
|||
|
||||
// Should handle whitespace-only gracefully
|
||||
if (response.stopReason === "error") {
|
||||
expect(response.error).toBeDefined();
|
||||
expect(response.errorMessage).toBeDefined();
|
||||
} else {
|
||||
expect(response.content).toBeDefined();
|
||||
}
|
||||
|
|
@ -115,7 +115,7 @@ async function testEmptyAssistantMessage<TApi extends Api>(llm: Model<TApi>, opt
|
|||
|
||||
// Should handle empty assistant message in context gracefully
|
||||
if (response.stopReason === "error") {
|
||||
expect(response.error).toBeDefined();
|
||||
expect(response.errorMessage).toBeDefined();
|
||||
} else {
|
||||
expect(response.content).toBeDefined();
|
||||
expect(response.content.length).toBeGreaterThan(0);
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import { Type } from "@sinclair/typebox";
|
||||
import { z } from "zod";
|
||||
import { zodToJsonSchema } from "zod-to-json-schema";
|
||||
import { StringEnum } from "../src/typebox-helpers.js";
|
||||
import { StringEnum } from "../src/utils/typebox-helpers.js";
|
||||
|
||||
// Zod version
|
||||
const zodSchema = z.object({
|
||||
|
|
|
|||
|
|
@ -238,7 +238,7 @@ const providerContexts = {
|
|||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||
},
|
||||
stopReason: "error",
|
||||
error: "Request was aborted",
|
||||
errorMessage: "Request was aborted",
|
||||
} satisfies AssistantMessage,
|
||||
toolResult: null,
|
||||
facts: {
|
||||
|
|
@ -293,7 +293,7 @@ async function testProviderHandoff<TApi extends Api>(
|
|||
|
||||
// Check for error
|
||||
if (response.stopReason === "error") {
|
||||
console.log(`[${sourceLabel} → ${targetModel.provider}] Failed with error: ${response.error}`);
|
||||
console.log(`[${sourceLabel} → ${targetModel.provider}] Failed with error: ${response.errorMessage}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,8 +6,8 @@ import { fileURLToPath } from "url";
|
|||
import { afterAll, beforeAll, describe, expect, it } from "vitest";
|
||||
import { getModel } from "../src/models.js";
|
||||
import { complete, stream } from "../src/stream.js";
|
||||
import { StringEnum } from "../src/typebox-helpers.js";
|
||||
import type { Api, Context, ImageContent, Model, OptionsForApi, Tool, ToolResultMessage } from "../src/types.js";
|
||||
import { StringEnum } from "../src/utils/typebox-helpers.js";
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = dirname(__filename);
|
||||
|
|
@ -40,7 +40,7 @@ async function basicTextGeneration<TApi extends Api>(model: Model<TApi>, options
|
|||
expect(response.content).toBeTruthy();
|
||||
expect(response.usage.input + response.usage.cacheRead).toBeGreaterThan(0);
|
||||
expect(response.usage.output).toBeGreaterThan(0);
|
||||
expect(response.error).toBeFalsy();
|
||||
expect(response.errorMessage).toBeFalsy();
|
||||
expect(response.content.map((b) => (b.type === "text" ? b.text : "")).join("")).toContain("Hello test successful");
|
||||
|
||||
context.messages.push(response);
|
||||
|
|
@ -52,7 +52,7 @@ async function basicTextGeneration<TApi extends Api>(model: Model<TApi>, options
|
|||
expect(secondResponse.content).toBeTruthy();
|
||||
expect(secondResponse.usage.input + secondResponse.usage.cacheRead).toBeGreaterThan(0);
|
||||
expect(secondResponse.usage.output).toBeGreaterThan(0);
|
||||
expect(secondResponse.error).toBeFalsy();
|
||||
expect(secondResponse.errorMessage).toBeFalsy();
|
||||
expect(secondResponse.content.map((b) => (b.type === "text" ? b.text : "")).join("")).toContain(
|
||||
"Goodbye test successful",
|
||||
);
|
||||
|
|
@ -192,7 +192,7 @@ async function handleThinking<TApi extends Api>(model: Model<TApi>, options?: Op
|
|||
|
||||
const response = await s.result();
|
||||
|
||||
expect(response.stopReason, `Error: ${response.error}`).toBe("stop");
|
||||
expect(response.stopReason, `Error: ${response.errorMessage}`).toBe("stop");
|
||||
expect(thinkingStarted).toBe(true);
|
||||
expect(thinkingChunks.length).toBeGreaterThan(0);
|
||||
expect(thinkingCompleted).toBe(true);
|
||||
514
plan.md
Normal file
514
plan.md
Normal file
|
|
@ -0,0 +1,514 @@
|
|||
# Agent API Rework Plan
|
||||
|
||||
## Executive Summary
|
||||
|
||||
Complete rewrite of the agent API with no backward compatibility constraints to support:
|
||||
|
||||
1. **Granular streaming events** - Fine-grained deltas (`text_delta`, `thinking_delta`, `toolcall_delta`) with clear message boundaries
|
||||
2. **Streaming tool execution** - Tools can progressively stream output via `AsyncIterable<ToolExecutionEvent>`
|
||||
3. **Remote tool execution** - Automatically detect remote tools (missing `execute` function) and suspend/resume execution
|
||||
4. **HTTP-friendly transport** - Minimize data transfer with server-managed sessions and SSE streaming
|
||||
|
||||
Key design:
|
||||
- `AgentEvent` is completely redesigned for granular streaming
|
||||
- `AgentTool.execute` can return `Promise` or `AsyncIterable` (detected at runtime)
|
||||
- Clean session-based API: `createSession()` and `execute()`
|
||||
- `ExecuteStream` with `result()` method for getting pending tool calls
|
||||
- Server manages all state, client is stateless
|
||||
- No legacy code or compatibility layers
|
||||
|
||||
## Problem Statement
|
||||
|
||||
The current agent API has several limitations that prevent it from being used effectively in distributed environments:
|
||||
|
||||
1. **Coarse Event Granularity**: `message_update` events send the full message instead of deltas, making HTTP streaming inefficient
|
||||
2. **Blocking Tool Execution**: Tools must complete execution before any results can be streamed
|
||||
3. **Local-Only Tools**: Tools must run in the same environment as the `prompt()` call, preventing remote tool execution
|
||||
4. **No Suspension/Resume**: The API cannot pause execution for external tool handling and resume later
|
||||
|
||||
## Requirements
|
||||
|
||||
### R1: Fine-Grained Streaming Events
|
||||
- Stream text deltas, not full messages
|
||||
- Stream thinking/reasoning deltas
|
||||
- Stream tool call argument deltas
|
||||
- Stream tool execution output progressively
|
||||
- Clear message boundaries with start/end events
|
||||
|
||||
### R2: Remote Tool Execution
|
||||
- Detect remote tools by absence of `execute` function (JSON.stringify naturally omits functions)
|
||||
- Pause execution when remote tool is called
|
||||
- Resume execution when remote tool completes
|
||||
- Support both synchronous (local) and asynchronous (remote) tools
|
||||
|
||||
### R3: HTTP-Friendly
|
||||
- Events should be efficiently serializable for Server-Sent Events (SSE)
|
||||
- Minimize data transfer - no large context objects in events
|
||||
- Server manages session state, client sends only deltas
|
||||
|
||||
### R4: Minimal API Surface
|
||||
- Keep the API simple and intuitive
|
||||
- No backward compatibility constraints - clean slate design
|
||||
- Avoid complex state management on the client side
|
||||
|
||||
## Proposed Solution
|
||||
|
||||
### Core Concepts
|
||||
|
||||
#### 1. Agent Sessions
|
||||
Sessions that can be suspended and resumed for remote tool execution:
|
||||
|
||||
```typescript
|
||||
interface AgentSession {
|
||||
id: string;
|
||||
status: 'running' | 'awaiting_tool_execution' | 'completed' | 'error';
|
||||
model: Model; // Last used model
|
||||
messages: Message[]; // Full conversation context
|
||||
totalCost: number; // Accumulated cost across all turns
|
||||
pendingToolCalls?: ToolCall[]; // When status is 'awaiting_tool_execution'
|
||||
}
|
||||
```
|
||||
|
||||
#### 2. Tool Execution
|
||||
Tools always return an async iterable stream, making the API consistent:
|
||||
|
||||
```typescript
|
||||
type ToolExecutionEventStream = AsyncIterable<ToolExecutionEvent>;
|
||||
|
||||
interface AgentTool<TParams = any> extends Tool {
|
||||
label: string; // Already exists
|
||||
// If execute is defined -> local tool
|
||||
// If execute is undefined (omitted during JSON serialization) -> remote tool
|
||||
execute?: (id: string, params: TParams) => ToolExecutionEventStream;
|
||||
}
|
||||
|
||||
// Tool execution event for streaming
|
||||
type ToolExecutionEvent =
|
||||
| { type: 'delta'; delta: string }
|
||||
| { type: 'complete'; output: string; details?: any };
|
||||
|
||||
// Detection logic remains simple
|
||||
function isLocalTool(tool: AgentTool): boolean {
|
||||
return tool.execute !== undefined;
|
||||
}
|
||||
```
|
||||
|
||||
#### 3. Message Types
|
||||
Extend the Message union to handle errors and aborts cleanly:
|
||||
|
||||
```typescript
|
||||
// Extended Message type for cleaner error handling
|
||||
type Message =
|
||||
| UserMessage
|
||||
| AssistantMessage // stopReason: 'stop' | 'length' | 'tool_calls'
|
||||
| ToolResultMessage
|
||||
| ErrorMessage // New: explicit error messages
|
||||
| AbortedMessage; // New: explicit abort messages
|
||||
|
||||
interface ErrorMessage {
|
||||
role: 'error';
|
||||
error: string;
|
||||
partial?: AssistantMessage; // Partial message before error
|
||||
}
|
||||
|
||||
interface AbortedMessage {
|
||||
role: 'aborted';
|
||||
partial: AssistantMessage; // Partial message before abort
|
||||
}
|
||||
```
|
||||
|
||||
#### 4. Granular Event System
|
||||
Fine-grained events with message boundaries:
|
||||
|
||||
```typescript
|
||||
type AgentEvent =
|
||||
// Session lifecycle
|
||||
| { type: 'session_start'; sessionId: string }
|
||||
| { type: 'session_end'; sessionId: string; messages: Message[] } // Only NEW messages
|
||||
| { type: 'awaiting_tool_execution'; sessionId: string; toolCalls: ToolCall[] }
|
||||
|
||||
// Message boundaries (role determined by message.role)
|
||||
| { type: 'message_start'; role: Message['role'] }
|
||||
| { type: 'message_end'; message: Message } // Includes usage/cost for assistant messages
|
||||
|
||||
// Assistant streaming (only during assistant messages)
|
||||
| { type: 'text_start' }
|
||||
| { type: 'text_delta'; delta: string }
|
||||
| { type: 'text_end'; text: string }
|
||||
|
||||
| { type: 'thinking_start' }
|
||||
| { type: 'thinking_delta'; delta: string }
|
||||
| { type: 'thinking_end'; thinking: string }
|
||||
|
||||
| { type: 'toolcall_start'; index: number }
|
||||
| { type: 'toolcall_delta'; index: number; delta: string }
|
||||
| { type: 'toolcall_end'; index: number; toolCall: ToolCall }
|
||||
|
||||
// Tool execution (local tools only)
|
||||
| { type: 'tool_execution_start'; toolCallId: string; toolName: string; args: any }
|
||||
| { type: 'tool_execution_delta'; toolCallId: string; delta: string }
|
||||
| { type: 'tool_execution_end'; toolCallId: string; output: string; details?: any }
|
||||
|
||||
// Errors
|
||||
| { type: 'error'; error: string; message?: Message }
|
||||
```
|
||||
|
||||
### New API Design
|
||||
|
||||
```typescript
|
||||
// Create or restore a session
|
||||
function createSession(
|
||||
sessionId: string,
|
||||
context: AgentContext,
|
||||
options?: PromptOptions // Model, temperature, etc.
|
||||
): AgentSession;
|
||||
|
||||
// Execute with event streaming
|
||||
interface ExecuteResult {
|
||||
status: 'completed' | 'awaiting_tool_execution';
|
||||
messages: Message[]; // New messages added during execution
|
||||
pendingToolCalls?: ToolCall[]; // Present when status is 'awaiting_tool_execution'
|
||||
cost: number; // Cost for this execution
|
||||
}
|
||||
|
||||
interface ExecuteStream extends AsyncIterable<AgentEvent> {
|
||||
result(): Promise<ExecuteResult>;
|
||||
}
|
||||
|
||||
function execute(
|
||||
session: AgentSession,
|
||||
input: UserMessage | ToolResultMessage[]
|
||||
): ExecuteStream;
|
||||
|
||||
// Example usage - clean flow without nested loops
|
||||
const session = createSession('session-123', {
|
||||
messages: [],
|
||||
tools: [localTool, remoteTool]
|
||||
});
|
||||
|
||||
// Execute and handle events
|
||||
const stream = execute(session, { role: 'user', content: 'Hello' });
|
||||
for await (const event of stream) {
|
||||
// Handle events for UI updates
|
||||
console.log(event);
|
||||
}
|
||||
|
||||
// Check result after stream completes
|
||||
const result = await stream.result();
|
||||
if (result.status === 'awaiting_tool_execution') {
|
||||
// Execute remote tools externally
|
||||
const toolResults = await executeRemoteTools(result.pendingToolCalls);
|
||||
|
||||
// Resume execution with tool results
|
||||
const resumeStream = execute(session, toolResults);
|
||||
for await (const event of resumeStream) {
|
||||
console.log(event);
|
||||
}
|
||||
|
||||
const finalResult = await resumeStream.result();
|
||||
// Continue until no more pending tools...
|
||||
}
|
||||
```
|
||||
|
||||
### Tool Execution Streaming
|
||||
|
||||
All tools return a `ToolExecutionEventStream` for consistency:
|
||||
|
||||
```typescript
|
||||
// Example streaming tool with progressive output
|
||||
const streamingSearchTool: AgentTool = {
|
||||
name: 'search',
|
||||
label: 'Search',
|
||||
parameters: Type.Object({ query: Type.String() }),
|
||||
async *execute(id, { query }) {
|
||||
// Stream deltas as they become available
|
||||
yield { type: 'delta', delta: 'Searching for: ' };
|
||||
yield { type: 'delta', delta: query };
|
||||
yield { type: 'delta', delta: '\nResults:\n' };
|
||||
|
||||
for (const result of await search(query)) {
|
||||
yield { type: 'delta', delta: `- ${result.title}\n` };
|
||||
}
|
||||
|
||||
// Final complete event with full output and optional details
|
||||
yield {
|
||||
type: 'complete',
|
||||
output: fullOutput,
|
||||
details: { resultCount: 10 }
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// Example simple tool (still returns AsyncIterable for consistency)
|
||||
const simpleTool: AgentTool = {
|
||||
name: 'calculate',
|
||||
label: 'Calculate',
|
||||
parameters: Type.Object({ expression: Type.String() }),
|
||||
async *execute(id, { expression }) {
|
||||
const result = eval(expression); // Don't do this in production!
|
||||
|
||||
// Even simple tools emit complete event
|
||||
yield {
|
||||
type: 'complete',
|
||||
output: `${expression} = ${result}`,
|
||||
details: { result }
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// Helper for simple tools that don't need streaming
|
||||
function createSimpleTool<TParams>(
|
||||
config: Omit<AgentTool<TParams>, 'execute'>,
|
||||
handler: (id: string, params: TParams) => Promise<{ output: string; details?: any }>
|
||||
): AgentTool<TParams> {
|
||||
return {
|
||||
...config,
|
||||
async *execute(id, params) {
|
||||
const result = await handler(id, params);
|
||||
yield { type: 'complete', ...result };
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Usage with helper
|
||||
const calculateTool = createSimpleTool(
|
||||
{
|
||||
name: 'calculate',
|
||||
label: 'Calculate',
|
||||
parameters: Type.Object({ expression: Type.String() })
|
||||
},
|
||||
async (id, { expression }) => {
|
||||
const result = eval(expression);
|
||||
return {
|
||||
output: `${expression} = ${result}`,
|
||||
details: { result }
|
||||
};
|
||||
}
|
||||
);
|
||||
```
|
||||
|
||||
### HTTP Transport Layer
|
||||
|
||||
Server manages session state, client only sends inputs:
|
||||
|
||||
```typescript
|
||||
// Server-side session store
|
||||
class SessionStore {
|
||||
private sessions = new Map<string, AgentSession>();
|
||||
|
||||
create(sessionId: string, context: AgentContext, options?: PromptOptions): AgentSession {
|
||||
const session = createSession(sessionId, context, options);
|
||||
this.sessions.set(sessionId, session);
|
||||
return session;
|
||||
}
|
||||
|
||||
get(sessionId: string): AgentSession | undefined {
|
||||
return this.sessions.get(sessionId);
|
||||
}
|
||||
|
||||
// Optional: Get context for UI sync
|
||||
getContext(sessionId: string): AgentContext | undefined {
|
||||
return this.sessions.get(sessionId)?.messages;
|
||||
}
|
||||
}
|
||||
|
||||
// API Endpoints
|
||||
interface ExecuteRequest {
|
||||
sessionId?: string; // Optional, server generates if not provided
|
||||
input: UserMessage | ToolResultMessage[];
|
||||
context?: AgentContext; // Only for new sessions
|
||||
options?: PromptOptions; // Model, temperature, etc.
|
||||
}
|
||||
|
||||
interface ExecuteResponse {
|
||||
sessionId: string;
|
||||
// SSE stream of AgentEvents
|
||||
}
|
||||
|
||||
// Server implementation
|
||||
app.post('/api/agent/execute', async (req, res) => {
|
||||
const { sessionId = generateId(), input, context, options } = req.body;
|
||||
|
||||
// Get or create session
|
||||
let session = sessionStore.get(sessionId);
|
||||
if (!session) {
|
||||
if (!context) {
|
||||
return res.status(400).json({ error: 'Context required for new session' });
|
||||
}
|
||||
session = sessionStore.create(sessionId, context, options);
|
||||
}
|
||||
|
||||
// Set up SSE
|
||||
res.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
'Connection': 'keep-alive',
|
||||
'X-Session-Id': sessionId, // Return session ID in header
|
||||
});
|
||||
|
||||
// Stream events
|
||||
const stream = execute(session, input);
|
||||
for await (const event of stream) {
|
||||
res.write(`data: ${JSON.stringify(event)}\n\n`);
|
||||
}
|
||||
|
||||
// Get final result
|
||||
const result = await stream.result();
|
||||
|
||||
// Send final event with status
|
||||
res.write(`data: ${JSON.stringify({
|
||||
type: 'execute_complete',
|
||||
status: result.status,
|
||||
pendingToolCalls: result.pendingToolCalls
|
||||
})}\n\n`);
|
||||
|
||||
res.end();
|
||||
});
|
||||
|
||||
// Optional: Get session context for UI sync
|
||||
app.get('/api/agent/session/:sessionId', (req, res) => {
|
||||
const context = sessionStore.getContext(req.params.sessionId);
|
||||
if (!context) {
|
||||
return res.status(404).json({ error: 'Session not found' });
|
||||
}
|
||||
res.json({ messages: context });
|
||||
});
|
||||
|
||||
// Client implementation
|
||||
async function executeOnServer(input: UserMessage, tools: AgentTool[]) {
|
||||
// Note: tools with execute functions will have them stripped during JSON.stringify
|
||||
const response = await fetch('/api/agent/execute', {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({
|
||||
input,
|
||||
context: { messages: [], tools }, // Tools without execute = remote
|
||||
options: { model: 'gpt-4o-mini' }
|
||||
})
|
||||
});
|
||||
|
||||
const sessionId = response.headers.get('X-Session-Id');
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
|
||||
let pendingToolCalls = [];
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
const lines = decoder.decode(value).split('\n');
|
||||
for (const line of lines) {
|
||||
if (line.startsWith('data: ')) {
|
||||
const event = JSON.parse(line.slice(6));
|
||||
|
||||
// Update UI with event
|
||||
handleEvent(event);
|
||||
|
||||
if (event.type === 'execute_complete' && event.status === 'awaiting_tool_execution') {
|
||||
pendingToolCalls = event.pendingToolCalls;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Execute remote tools if needed
|
||||
if (pendingToolCalls.length > 0) {
|
||||
const toolResults = await executeLocalTools(pendingToolCalls);
|
||||
|
||||
// Resume on server
|
||||
await fetch('/api/agent/execute', {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({
|
||||
sessionId,
|
||||
input: toolResults
|
||||
})
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Implementation Plan
|
||||
|
||||
### Phase 1: Core Event System Refactor
|
||||
1. Define new granular `AgentEvent` types with message boundaries
|
||||
2. Update agent loop to detect local vs remote tools (via `execute` presence)
|
||||
3. Implement `ExecuteStream` with `result()` method
|
||||
4. Map underlying streaming events to granular agent events
|
||||
|
||||
### Phase 2: Session Management
|
||||
1. Implement `AgentSession` type with status tracking
|
||||
2. Create `createSession()` and `execute()` functions
|
||||
3. Handle suspension when remote tools detected
|
||||
4. Track cost and model per session
|
||||
|
||||
### Phase 3: Streaming Tool Execution
|
||||
1. Define `ToolExecutionEventStream` type alias
|
||||
2. Update all tools to return `ToolExecutionEventStream`
|
||||
3. Emit `tool_execution_delta` events during streaming
|
||||
4. Provide `createSimpleTool` helper for non-streaming tools
|
||||
|
||||
### Phase 4: HTTP Transport
|
||||
1. Implement server-side `SessionStore`
|
||||
2. Create `/api/agent/execute` endpoint with SSE
|
||||
3. Add `/api/agent/session/:id` for context sync
|
||||
4. Build client SDK for SSE consumption
|
||||
|
||||
### Phase 5: Documentation & Examples
|
||||
1. Create comprehensive documentation for new API
|
||||
2. Build example applications demonstrating all features
|
||||
3. Add TypeScript types and JSDoc comments
|
||||
4. Create cookbook for common patterns
|
||||
|
||||
## Key Design Decisions
|
||||
|
||||
### Why AgentSession Instead of ExecutionSession?
|
||||
- Clearer naming that matches the agent concept
|
||||
- Consistent with `AgentContext`, `AgentTool`, `AgentEvent`
|
||||
|
||||
### Why Detect Remote Tools via Missing execute?
|
||||
- JSON.stringify naturally omits functions
|
||||
- No need for explicit `mode` field
|
||||
- Zero configuration for remote tools
|
||||
|
||||
### Why ExecuteStream with result()?
|
||||
- Follows the existing EventStream pattern
|
||||
- Separates streaming (UI updates) from final state
|
||||
- Clean way to get pending tool calls after streaming
|
||||
|
||||
### Why awaiting_tool_execution Instead of suspended?
|
||||
- More descriptive of the actual state
|
||||
- Clear indication that client action is required
|
||||
- Better for debugging and logging
|
||||
|
||||
## Clean Break Strategy
|
||||
|
||||
Since we're not maintaining backward compatibility, we can:
|
||||
|
||||
1. **Remove all legacy code** - No need for wrapper functions or event mappers
|
||||
2. **Simplify type definitions** - No union types for old/new formats
|
||||
3. **Optimize for the new use cases** - Design purely for distributed execution
|
||||
4. **Clear naming** - No need to avoid conflicts with existing names
|
||||
5. **Breaking changes allowed** - Can restructure packages and exports as needed
|
||||
|
||||
This is a complete rewrite of the agent module with a new, cleaner API that fully supports:
|
||||
- Granular streaming events
|
||||
- Progressive tool execution
|
||||
- Remote tool suspension/resumption
|
||||
- Efficient HTTP transport
|
||||
|
||||
## Open Questions
|
||||
|
||||
1. **Tool Timeout**: How long should we wait for remote tool execution before timing out?
|
||||
2. **Session Persistence**: Should sessions be stored in memory or persisted (Redis/DB)?
|
||||
3. **Tool Chaining**: Can a remote tool trigger another LLM call that uses more tools?
|
||||
4. **Error Recovery**: How to handle partial tool execution failures?
|
||||
5. **Rate Limiting**: How to prevent abuse of the HTTP API?
|
||||
6. **Error/Abort Messages**: Should we use explicit `ErrorMessage`/`AbortedMessage` types or keep errors in `AssistantMessage` with `stopReason: 'error'`? Explicit types are cleaner but add complexity.
|
||||
|
||||
## Next Steps
|
||||
|
||||
1. Implement Phase 1 - Core event system with granular events
|
||||
2. Test with existing tools to ensure compatibility
|
||||
3. Add streaming support to one built-in tool as proof of concept
|
||||
4. Build minimal HTTP server to validate the design
|
||||
5. Gather feedback and iterate before full implementation
|
||||
Loading…
Add table
Add a link
Reference in a new issue