co-mono/packages/ai/src/utils/event-stream.ts
Mario Zechner 177c694406 feat: custom provider support with streamSimple
- Add resetApiProviders() to clear and re-register built-in providers
- Add createAssistantMessageEventStream() factory for extensions
- Add streamSimple support in ProviderConfig for custom API implementations
- Call resetApiProviders() on /reload to clean up extension providers
- Add custom-provider.md documentation
- Add custom-provider.ts example with full Anthropic implementation
- Update extensions.md with streamSimple config option
2026-01-24 23:15:11 +01:00

87 lines
2.2 KiB
TypeScript

import type { AssistantMessage, AssistantMessageEvent } from "../types.js";
// Generic event stream class for async iteration
export class EventStream<T, R = T> implements AsyncIterable<T> {
private queue: T[] = [];
private waiting: ((value: IteratorResult<T>) => void)[] = [];
private done = false;
private finalResultPromise: Promise<R>;
private resolveFinalResult!: (result: R) => void;
constructor(
private isComplete: (event: T) => boolean,
private extractResult: (event: T) => R,
) {
this.finalResultPromise = new Promise((resolve) => {
this.resolveFinalResult = resolve;
});
}
push(event: T): void {
if (this.done) return;
if (this.isComplete(event)) {
this.done = true;
this.resolveFinalResult(this.extractResult(event));
}
// Deliver to waiting consumer or queue it
const waiter = this.waiting.shift();
if (waiter) {
waiter({ value: event, done: false });
} else {
this.queue.push(event);
}
}
end(result?: R): void {
this.done = true;
if (result !== undefined) {
this.resolveFinalResult(result);
}
// Notify all waiting consumers that we're done
while (this.waiting.length > 0) {
const waiter = this.waiting.shift()!;
waiter({ value: undefined as any, done: true });
}
}
async *[Symbol.asyncIterator](): AsyncIterator<T> {
while (true) {
if (this.queue.length > 0) {
yield this.queue.shift()!;
} else if (this.done) {
return;
} else {
const result = await new Promise<IteratorResult<T>>((resolve) => this.waiting.push(resolve));
if (result.done) return;
yield result.value;
}
}
}
result(): Promise<R> {
return this.finalResultPromise;
}
}
export class AssistantMessageEventStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
constructor() {
super(
(event) => event.type === "done" || event.type === "error",
(event) => {
if (event.type === "done") {
return event.message;
} else if (event.type === "error") {
return event.error;
}
throw new Error("Unexpected event type for final result");
},
);
}
}
/** Factory function for AssistantMessageEventStream (for use in extensions) */
export function createAssistantMessageEventStream(): AssistantMessageEventStream {
return new AssistantMessageEventStream();
}