From a132b8140cd7c9bd87b637baac4e6fd58415a5f1 Mon Sep 17 00:00:00 2001 From: Mario Zechner Date: Sun, 31 Aug 2025 23:09:14 +0200 Subject: [PATCH] feat(ai): Add start event emission to all providers - Emit start event with model and provider info after creating stream - Add abort signal tests for all providers - Update README abort signal section to reflect non-throwing API - Fix model references in README examples --- packages/ai/README.md | 41 ++++--- packages/ai/src/providers/anthropic.ts | 2 + packages/ai/src/providers/google.ts | 10 ++ .../ai/src/providers/openai-completions.ts | 2 + packages/ai/src/providers/openai-responses.ts | 18 +++ packages/ai/test/abort.test.ts | 114 ++++++++++++++++++ 6 files changed, 167 insertions(+), 20 deletions(-) create mode 100644 packages/ai/test/abort.test.ts diff --git a/packages/ai/README.md b/packages/ai/README.md index 3f828245..747371e3 100644 --- a/packages/ai/README.md +++ b/packages/ai/README.md @@ -101,7 +101,7 @@ if (toolCalls.length > 0) { // Continue conversation with tool results const followUp = await llm.complete({ messages, tools }); messages.push(followUp); - + // Print text blocks from the response for (const block of followUp.content) { if (block.type === 'text') { @@ -160,24 +160,25 @@ const response = await llm.complete({ ```typescript const controller = new AbortController(); -// Abort after 5 seconds -setTimeout(() => controller.abort(), 5000); +// Abort after 2 seconds +setTimeout(() => controller.abort(), 2000); -try { - const response = await llm.complete({ - messages: [{ role: 'user', content: 'Write a long story' }] - }, { - signal: controller.signal, - onEvent: (event) => { - if (event.type === 'text_delta') { - process.stdout.write(event.delta); - } +const response = await llm.complete({ + messages: [{ role: 'user', content: 'Write a long story' }] +}, { + signal: controller.signal, + onEvent: (event) => { + if (event.type === 'text_delta') { + process.stdout.write(event.delta); } - }); -} catch (error) { - if (error.name === 'AbortError') { - console.log('Request was aborted'); } +}); + +// Check if the request was aborted +if (response.stopReason === 'error' && response.error) { + console.log('Request was aborted:', response.error); +} else { + console.log('Request completed successfully'); } ``` @@ -206,7 +207,7 @@ await llm.complete(context, { ### Google Gemini Thinking ```typescript -const llm = createLLM('google', 'gemini-2.0-flash-thinking-exp'); +const llm = createLLM('google', 'gemini-2.5-pro'); await llm.complete(context, { thinking: { enabled: true } @@ -220,14 +221,14 @@ await llm.complete(context, { import { OpenAICompletionsLLM } from '@mariozechner/pi-ai'; const model = { - id: 'llama3.1:8b', + id: 'gpt-oss:20b', provider: 'ollama', baseUrl: 'http://localhost:11434/v1', reasoning: false, input: ['text'], cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, - contextWindow: 8192, - maxTokens: 4096, + contextWindow: 126000, + maxTokens: 32000, name: 'Llama 3.1 8B' }; diff --git a/packages/ai/src/providers/anthropic.ts b/packages/ai/src/providers/anthropic.ts index 8439891d..7705ea30 100644 --- a/packages/ai/src/providers/anthropic.ts +++ b/packages/ai/src/providers/anthropic.ts @@ -129,6 +129,8 @@ export class AnthropicLLM implements LLM { }, ); + options?.onEvent?.({ type: "start", model: this.modelInfo.id, provider: this.modelInfo.provider }); + let blockType: "text" | "thinking" | "toolUse" | "other" = "other"; let blockContent = ""; let toolCall: (ToolCall & { partialJson: string }) | null = null; diff --git a/packages/ai/src/providers/google.ts b/packages/ai/src/providers/google.ts index 36f5aef6..1876f22c 100644 --- a/packages/ai/src/providers/google.ts +++ b/packages/ai/src/providers/google.ts @@ -89,6 +89,14 @@ export class GoogleLLM implements LLM { }; } + // Abort signal + if (options?.signal) { + if (options.signal.aborted) { + throw new Error("Request aborted"); + } + config.abortSignal = options.signal; + } + // Build the request parameters const params: GenerateContentParameters = { model: this.model.id, @@ -98,6 +106,8 @@ export class GoogleLLM implements LLM { const stream = await this.client.models.generateContentStream(params); + options?.onEvent?.({ type: "start", model: this.model.id, provider: this.model.provider }); + const blocks: AssistantMessage["content"] = []; let currentBlock: TextContent | ThinkingContent | null = null; let usage: Usage = { diff --git a/packages/ai/src/providers/openai-completions.ts b/packages/ai/src/providers/openai-completions.ts index 0979002e..68ced3c5 100644 --- a/packages/ai/src/providers/openai-completions.ts +++ b/packages/ai/src/providers/openai-completions.ts @@ -92,6 +92,8 @@ export class OpenAICompletionsLLM implements LLM { signal: options?.signal, }); + options?.onEvent?.({ type: "start", model: this.modelInfo.id, provider: this.modelInfo.provider }); + const blocks: AssistantMessage["content"] = []; let currentBlock: TextContent | ThinkingContent | (ToolCall & { partialArgs?: string }) | null = null; let usage: Usage = { diff --git a/packages/ai/src/providers/openai-responses.ts b/packages/ai/src/providers/openai-responses.ts index a0a34991..e494f325 100644 --- a/packages/ai/src/providers/openai-responses.ts +++ b/packages/ai/src/providers/openai-responses.ts @@ -85,6 +85,8 @@ export class OpenAIResponsesLLM implements LLM { signal: options?.signal, }); + options?.onEvent?.({ type: "start", model: this.modelInfo.id, provider: this.modelInfo.provider }); + const outputItems: (ResponseReasoningItem | ResponseOutputMessage | ResponseFunctionToolCall)[] = []; // any for function_call items let currentTextAccum = ""; // For delta accumulation let currentThinkingAccum = ""; // For delta accumulation @@ -184,9 +186,25 @@ export class OpenAIResponsesLLM implements LLM { } satisfies AssistantMessage; options?.onEvent?.({ type: "error", error: errorOutput.error || "Unknown error" }); return errorOutput; + } else if (event.type === "response.failed") { + const errorOutput = { + role: "assistant", + content: [], + provider: this.modelInfo.provider, + model: this.modelInfo.id, + usage, + stopReason: "error", + error: "Unknown error", + } satisfies AssistantMessage; + options?.onEvent?.({ type: "error", error: errorOutput.error || "Unknown error" }); + return errorOutput; } } + if (options?.signal?.aborted) { + throw new Error("Request was aborted"); + } + // Convert output items to blocks const blocks: AssistantMessage["content"] = []; diff --git a/packages/ai/test/abort.test.ts b/packages/ai/test/abort.test.ts new file mode 100644 index 00000000..ab4e32f8 --- /dev/null +++ b/packages/ai/test/abort.test.ts @@ -0,0 +1,114 @@ +import { describe, it, beforeAll, expect } from "vitest"; +import { GoogleLLM } from "../src/providers/google.js"; +import { OpenAICompletionsLLM } from "../src/providers/openai-completions.js"; +import { OpenAIResponsesLLM } from "../src/providers/openai-responses.js"; +import { AnthropicLLM } from "../src/providers/anthropic.js"; +import type { LLM, LLMOptions, Context } from "../src/types.js"; +import { getModel } from "../src/models.js"; + +async function testAbortSignal(llm: LLM) { + const controller = new AbortController(); + + // Abort after 100ms + setTimeout(() => controller.abort(), 1000); + + const context: Context = { + messages: [{ + role: "user", + content: "Write a very long story about a dragon that lives in a mountain. Include lots of details about the dragon's appearance, its daily life, the treasures it guards, and its interactions with nearby villages. Make it at least 1000 words long." + }] + }; + + const response = await llm.complete(context, { + signal: controller.signal + } as T); + + // If we get here without throwing, the abort didn't work + expect(response.stopReason).toBe("error"); +} + +async function testImmediateAbort(llm: LLM) { + const controller = new AbortController(); + + // Abort immediately + controller.abort(); + + const context: Context = { + messages: [{ role: "user", content: "Hello" }] + }; + + const response = await llm.complete(context, { + signal: controller.signal + } as T); + expect(response.stopReason).toBe("error"); +} + +describe("AI Providers Abort Tests", () => { + describe.skipIf(!process.env.GEMINI_API_KEY)("Google Provider Abort", () => { + let llm: GoogleLLM; + + beforeAll(() => { + llm = new GoogleLLM(getModel("google", "gemini-2.5-flash")!, process.env.GEMINI_API_KEY!); + }); + + it("should abort mid-stream", async () => { + await testAbortSignal(llm); + }); + + it("should handle immediate abort", async () => { + await testImmediateAbort(llm); + }); + }); + + describe.skipIf(!process.env.OPENAI_API_KEY)("OpenAI Completions Provider Abort", () => { + let llm: OpenAICompletionsLLM; + + beforeAll(() => { + llm = new OpenAICompletionsLLM(getModel("openai", "gpt-4o-mini")!, process.env.OPENAI_API_KEY!); + }); + + it("should abort mid-stream", async () => { + await testAbortSignal(llm); + }); + + it("should handle immediate abort", async () => { + await testImmediateAbort(llm); + }); + }); + + describe.skipIf(!process.env.OPENAI_API_KEY)("OpenAI Responses Provider Abort", () => { + let llm: OpenAIResponsesLLM; + + beforeAll(() => { + const model = getModel("openai", "gpt-5-mini"); + if (!model) { + throw new Error("Model not found"); + } + llm = new OpenAIResponsesLLM(model, process.env.OPENAI_API_KEY!); + }); + + it("should abort mid-stream", async () => { + await testAbortSignal(llm); + }); + + it("should handle immediate abort", async () => { + await testImmediateAbort(llm); + }); + }); + + describe.skipIf(!process.env.ANTHROPIC_API_KEY)("Anthropic Provider Abort", () => { + let llm: AnthropicLLM; + + beforeAll(() => { + llm = new AnthropicLLM(getModel("anthropic", "claude-3-5-haiku-latest")!, process.env.ANTHROPIC_API_KEY!); + }); + + it("should abort mid-stream", async () => { + await testAbortSignal(llm); + }); + + it("should handle immediate abort", async () => { + await testImmediateAbort(llm); + }); + }); +}); \ No newline at end of file