feat(ai): Add start event emission to all providers

- Emit start event with model and provider info after creating stream
- Add abort signal tests for all providers
- Update README abort signal section to reflect non-throwing API
- Fix model references in README examples
This commit is contained in:
Mario Zechner 2025-08-31 23:09:14 +02:00
parent 8d4edf6458
commit a132b8140c
6 changed files with 167 additions and 20 deletions

View file

@ -101,7 +101,7 @@ if (toolCalls.length > 0) {
// Continue conversation with tool results
const followUp = await llm.complete({ messages, tools });
messages.push(followUp);
// Print text blocks from the response
for (const block of followUp.content) {
if (block.type === 'text') {
@ -160,24 +160,25 @@ const response = await llm.complete({
```typescript
const controller = new AbortController();
// Abort after 5 seconds
setTimeout(() => controller.abort(), 5000);
// Abort after 2 seconds
setTimeout(() => controller.abort(), 2000);
try {
const response = await llm.complete({
messages: [{ role: 'user', content: 'Write a long story' }]
}, {
signal: controller.signal,
onEvent: (event) => {
if (event.type === 'text_delta') {
process.stdout.write(event.delta);
}
const response = await llm.complete({
messages: [{ role: 'user', content: 'Write a long story' }]
}, {
signal: controller.signal,
onEvent: (event) => {
if (event.type === 'text_delta') {
process.stdout.write(event.delta);
}
});
} catch (error) {
if (error.name === 'AbortError') {
console.log('Request was aborted');
}
});
// Check if the request was aborted
if (response.stopReason === 'error' && response.error) {
console.log('Request was aborted:', response.error);
} else {
console.log('Request completed successfully');
}
```
@ -206,7 +207,7 @@ await llm.complete(context, {
### Google Gemini Thinking
```typescript
const llm = createLLM('google', 'gemini-2.0-flash-thinking-exp');
const llm = createLLM('google', 'gemini-2.5-pro');
await llm.complete(context, {
thinking: { enabled: true }
@ -220,14 +221,14 @@ await llm.complete(context, {
import { OpenAICompletionsLLM } from '@mariozechner/pi-ai';
const model = {
id: 'llama3.1:8b',
id: 'gpt-oss:20b',
provider: 'ollama',
baseUrl: 'http://localhost:11434/v1',
reasoning: false,
input: ['text'],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 8192,
maxTokens: 4096,
contextWindow: 126000,
maxTokens: 32000,
name: 'Llama 3.1 8B'
};

View file

@ -129,6 +129,8 @@ export class AnthropicLLM implements LLM<AnthropicLLMOptions> {
},
);
options?.onEvent?.({ type: "start", model: this.modelInfo.id, provider: this.modelInfo.provider });
let blockType: "text" | "thinking" | "toolUse" | "other" = "other";
let blockContent = "";
let toolCall: (ToolCall & { partialJson: string }) | null = null;

View file

@ -89,6 +89,14 @@ export class GoogleLLM implements LLM<GoogleLLMOptions> {
};
}
// Abort signal
if (options?.signal) {
if (options.signal.aborted) {
throw new Error("Request aborted");
}
config.abortSignal = options.signal;
}
// Build the request parameters
const params: GenerateContentParameters = {
model: this.model.id,
@ -98,6 +106,8 @@ export class GoogleLLM implements LLM<GoogleLLMOptions> {
const stream = await this.client.models.generateContentStream(params);
options?.onEvent?.({ type: "start", model: this.model.id, provider: this.model.provider });
const blocks: AssistantMessage["content"] = [];
let currentBlock: TextContent | ThinkingContent | null = null;
let usage: Usage = {

View file

@ -92,6 +92,8 @@ export class OpenAICompletionsLLM implements LLM<OpenAICompletionsLLMOptions> {
signal: options?.signal,
});
options?.onEvent?.({ type: "start", model: this.modelInfo.id, provider: this.modelInfo.provider });
const blocks: AssistantMessage["content"] = [];
let currentBlock: TextContent | ThinkingContent | (ToolCall & { partialArgs?: string }) | null = null;
let usage: Usage = {

View file

@ -85,6 +85,8 @@ export class OpenAIResponsesLLM implements LLM<OpenAIResponsesLLMOptions> {
signal: options?.signal,
});
options?.onEvent?.({ type: "start", model: this.modelInfo.id, provider: this.modelInfo.provider });
const outputItems: (ResponseReasoningItem | ResponseOutputMessage | ResponseFunctionToolCall)[] = []; // any for function_call items
let currentTextAccum = ""; // For delta accumulation
let currentThinkingAccum = ""; // For delta accumulation
@ -184,9 +186,25 @@ export class OpenAIResponsesLLM implements LLM<OpenAIResponsesLLMOptions> {
} satisfies AssistantMessage;
options?.onEvent?.({ type: "error", error: errorOutput.error || "Unknown error" });
return errorOutput;
} else if (event.type === "response.failed") {
const errorOutput = {
role: "assistant",
content: [],
provider: this.modelInfo.provider,
model: this.modelInfo.id,
usage,
stopReason: "error",
error: "Unknown error",
} satisfies AssistantMessage;
options?.onEvent?.({ type: "error", error: errorOutput.error || "Unknown error" });
return errorOutput;
}
}
if (options?.signal?.aborted) {
throw new Error("Request was aborted");
}
// Convert output items to blocks
const blocks: AssistantMessage["content"] = [];

View file

@ -0,0 +1,114 @@
import { describe, it, beforeAll, expect } from "vitest";
import { GoogleLLM } from "../src/providers/google.js";
import { OpenAICompletionsLLM } from "../src/providers/openai-completions.js";
import { OpenAIResponsesLLM } from "../src/providers/openai-responses.js";
import { AnthropicLLM } from "../src/providers/anthropic.js";
import type { LLM, LLMOptions, Context } from "../src/types.js";
import { getModel } from "../src/models.js";
async function testAbortSignal<T extends LLMOptions>(llm: LLM<T>) {
const controller = new AbortController();
// Abort after 100ms
setTimeout(() => controller.abort(), 1000);
const context: Context = {
messages: [{
role: "user",
content: "Write a very long story about a dragon that lives in a mountain. Include lots of details about the dragon's appearance, its daily life, the treasures it guards, and its interactions with nearby villages. Make it at least 1000 words long."
}]
};
const response = await llm.complete(context, {
signal: controller.signal
} as T);
// If we get here without throwing, the abort didn't work
expect(response.stopReason).toBe("error");
}
async function testImmediateAbort<T extends LLMOptions>(llm: LLM<T>) {
const controller = new AbortController();
// Abort immediately
controller.abort();
const context: Context = {
messages: [{ role: "user", content: "Hello" }]
};
const response = await llm.complete(context, {
signal: controller.signal
} as T);
expect(response.stopReason).toBe("error");
}
describe("AI Providers Abort Tests", () => {
describe.skipIf(!process.env.GEMINI_API_KEY)("Google Provider Abort", () => {
let llm: GoogleLLM;
beforeAll(() => {
llm = new GoogleLLM(getModel("google", "gemini-2.5-flash")!, process.env.GEMINI_API_KEY!);
});
it("should abort mid-stream", async () => {
await testAbortSignal(llm);
});
it("should handle immediate abort", async () => {
await testImmediateAbort(llm);
});
});
describe.skipIf(!process.env.OPENAI_API_KEY)("OpenAI Completions Provider Abort", () => {
let llm: OpenAICompletionsLLM;
beforeAll(() => {
llm = new OpenAICompletionsLLM(getModel("openai", "gpt-4o-mini")!, process.env.OPENAI_API_KEY!);
});
it("should abort mid-stream", async () => {
await testAbortSignal(llm);
});
it("should handle immediate abort", async () => {
await testImmediateAbort(llm);
});
});
describe.skipIf(!process.env.OPENAI_API_KEY)("OpenAI Responses Provider Abort", () => {
let llm: OpenAIResponsesLLM;
beforeAll(() => {
const model = getModel("openai", "gpt-5-mini");
if (!model) {
throw new Error("Model not found");
}
llm = new OpenAIResponsesLLM(model, process.env.OPENAI_API_KEY!);
});
it("should abort mid-stream", async () => {
await testAbortSignal(llm);
});
it("should handle immediate abort", async () => {
await testImmediateAbort(llm);
});
});
describe.skipIf(!process.env.ANTHROPIC_API_KEY)("Anthropic Provider Abort", () => {
let llm: AnthropicLLM;
beforeAll(() => {
llm = new AnthropicLLM(getModel("anthropic", "claude-3-5-haiku-latest")!, process.env.ANTHROPIC_API_KEY!);
});
it("should abort mid-stream", async () => {
await testAbortSignal(llm);
});
it("should handle immediate abort", async () => {
await testImmediateAbort(llm);
});
});
});