diff --git a/packages/coding-agent/src/core/agent-session.ts b/packages/coding-agent/src/core/agent-session.ts index c3c0696..b382bdb 100644 --- a/packages/coding-agent/src/core/agent-session.ts +++ b/packages/coding-agent/src/core/agent-session.ts @@ -913,6 +913,14 @@ export class AgentSession { await this._memoryDisposePromise; } + private async _awaitAgentEventProcessing(): Promise { + try { + await this._agentEventQueue; + } catch { + // Agent event failures are surfaced through normal listener paths. + } + } + private _enqueueMemoryPromotion(messages: AgentMessage[]): void { this._memoryWriteQueue = this._memoryWriteQueue .catch(() => undefined) @@ -1161,6 +1169,7 @@ export class AgentSession { await this.agent.prompt(messages); await this.waitForRetry(); + await this._awaitAgentEventProcessing(); } /** @@ -1369,6 +1378,8 @@ export class AgentSession { } } else if (options?.triggerTurn) { await this.agent.prompt(appMessage); + await this.waitForRetry(); + await this._awaitAgentEventProcessing(); } else { this.agent.appendMessage(appMessage); this.sessionManager.appendCustomMessageEntry( diff --git a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts index cbc20e6..2cd8fc7 100644 --- a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts +++ b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts @@ -2,6 +2,43 @@ import { randomUUID } from "node:crypto"; import type { ServerResponse } from "node:http"; import type { AgentSessionEvent } from "../agent-session.js"; +type TextStreamState = { + started: boolean; + ended: boolean; + streamedText: string; +}; + +function isTextContent( + value: unknown, +): value is { type: "text"; text: string } { + return ( + typeof value === "object" && + value !== null && + "type" in value && + "text" in value && + (value as { type: unknown }).type === "text" && + typeof (value as { text: unknown }).text === "string" + ); +} + +function getAssistantTextParts( + event: Extract, +): Array<{ contentIndex: number; text: string }> { + if (event.message.role !== "assistant" || !Array.isArray(event.message.content)) { + return []; + } + + const textParts: Array<{ contentIndex: number; text: string }> = []; + for (const [contentIndex, content] of event.message.content.entries()) { + if (!isTextContent(content) || content.text.length === 0) { + continue; + } + textParts.push({ contentIndex, text: content.text }); + } + + return textParts; +} + /** * Write a single Vercel AI SDK v5+ SSE chunk to the response. * Format: `data: \n\n` @@ -63,6 +100,76 @@ export function createVercelStreamListener( // so these guards only need to bound the stream to that prompt's event span. let active = false; const msgId = messageId ?? randomUUID(); + let textStates = new Map(); + + const getTextState = (contentIndex: number): TextStreamState => { + const existing = textStates.get(contentIndex); + if (existing) { + return existing; + } + const initial: TextStreamState = { + started: false, + ended: false, + streamedText: "", + }; + textStates.set(contentIndex, initial); + return initial; + }; + + const emitTextStart = (contentIndex: number): void => { + const state = getTextState(contentIndex); + if (state.started) { + return; + } + writeChunk(response, { + type: "text-start", + id: `text_${contentIndex}`, + }); + state.started = true; + }; + + const emitTextDelta = (contentIndex: number, delta: string): void => { + if (delta.length === 0) { + return; + } + emitTextStart(contentIndex); + writeChunk(response, { + type: "text-delta", + id: `text_${contentIndex}`, + delta, + }); + getTextState(contentIndex).streamedText += delta; + }; + + const emitTextEnd = (contentIndex: number): void => { + const state = getTextState(contentIndex); + if (state.ended) { + return; + } + emitTextStart(contentIndex); + writeChunk(response, { + type: "text-end", + id: `text_${contentIndex}`, + }); + state.ended = true; + }; + + const flushAssistantMessageText = ( + event: Extract, + ): void => { + for (const part of getAssistantTextParts(event)) { + const state = getTextState(part.contentIndex); + if (!part.text.startsWith(state.streamedText)) { + continue; + } + + const suffix = part.text.slice(state.streamedText.length); + if (suffix.length > 0) { + emitTextDelta(part.contentIndex, suffix); + } + emitTextEnd(part.contentIndex); + } + }; return (event: AgentSessionEvent) => { if (response.writableEnded) return; @@ -85,6 +192,7 @@ export function createVercelStreamListener( switch (event.type) { case "turn_start": + textStates = new Map(); writeChunk(response, { type: "start-step" }); return; @@ -92,23 +200,13 @@ export function createVercelStreamListener( const inner = event.assistantMessageEvent; switch (inner.type) { case "text_start": - writeChunk(response, { - type: "text-start", - id: `text_${inner.contentIndex}`, - }); + emitTextStart(inner.contentIndex); return; case "text_delta": - writeChunk(response, { - type: "text-delta", - id: `text_${inner.contentIndex}`, - delta: inner.delta, - }); + emitTextDelta(inner.contentIndex, inner.delta); return; case "text_end": - writeChunk(response, { - type: "text-end", - id: `text_${inner.contentIndex}`, - }); + emitTextEnd(inner.contentIndex); return; case "toolcall_start": { const content = inner.partial.content[inner.contentIndex]; @@ -163,6 +261,12 @@ export function createVercelStreamListener( return; } + case "message_end": + if (event.message.role === "assistant") { + flushAssistantMessageText(event); + } + return; + case "turn_end": writeChunk(response, { type: "finish-step" }); return; diff --git a/packages/coding-agent/test/vercel-ai-stream.test.ts b/packages/coding-agent/test/vercel-ai-stream.test.ts index ab98c69..c237ab6 100644 --- a/packages/coding-agent/test/vercel-ai-stream.test.ts +++ b/packages/coding-agent/test/vercel-ai-stream.test.ts @@ -1,3 +1,5 @@ +import type { IncomingMessage, ServerResponse } from "node:http"; +import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it } from "vitest"; import type { AgentSessionEvent } from "../src/core/agent-session.js"; import { @@ -5,6 +7,24 @@ import { extractUserText, } from "../src/core/gateway/vercel-ai-stream.js"; +type MessageUpdateSessionEvent = Extract< + AgentSessionEvent, + { type: "message_update" } +>; +type MessageEndSessionEvent = Extract< + AgentSessionEvent, + { type: "message_end" } +>; +type TurnEndSessionEvent = Extract; +type TextAssistantMessageEvent = Extract< + MessageUpdateSessionEvent["assistantMessageEvent"], + { type: "text_start" | "text_delta" | "text_end" } +>; +type MockResponse = ServerResponse & { + chunks: string[]; + ended: boolean; +}; + describe("extractUserText", () => { it("extracts text from useChat v5+ format with parts", () => { const body = { @@ -61,24 +81,73 @@ describe("extractUserText", () => { }); describe("createVercelStreamListener", () => { - function createMockResponse() { + function createAssistantMessage(text: string): AssistantMessage { + return { + role: "assistant", + content: [{ type: "text", text }], + api: "anthropic-messages", + provider: "anthropic", + model: "mock", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: Date.now(), + }; + } + + function createMockResponse(): MockResponse { const chunks: string[] = []; let ended = false; - return { - writableEnded: false, + const response = { + get writableEnded() { + return ended; + }, write(data: string) { chunks.push(data); return true; }, end() { ended = true; - this.writableEnded = true; }, chunks, get ended() { return ended; }, - } as any; + } as unknown as MockResponse; + return response; + } + + function createMessageUpdateEvent( + assistantMessageEvent: TextAssistantMessageEvent, + ): MessageUpdateSessionEvent { + return { + type: "message_update", + message: createAssistantMessage(""), + assistantMessageEvent, + }; + } + + function createAssistantMessageEndEvent( + text: string, + ): MessageEndSessionEvent { + return { + type: "message_end", + message: createAssistantMessage(text), + }; + } + + function createTurnEndEvent(): TurnEndSessionEvent { + return { + type: "turn_end", + message: createAssistantMessage(""), + toolResults: [], + }; } function parseChunks(chunks: string[]): Array { @@ -104,41 +173,30 @@ describe("createVercelStreamListener", () => { turnIndex: 0, timestamp: Date.now(), } as AgentSessionEvent); - listener({ - type: "message_update", - message: {} as any, - assistantMessageEvent: { + listener( + createMessageUpdateEvent({ type: "text_start", contentIndex: 0, - partial: {} as any, - }, - } as AgentSessionEvent); - listener({ - type: "message_update", - message: {} as any, - assistantMessageEvent: { + partial: createAssistantMessage(""), + }), + ); + listener( + createMessageUpdateEvent({ type: "text_delta", contentIndex: 0, delta: "hello", - partial: {} as any, - }, - } as AgentSessionEvent); - listener({ - type: "message_update", - message: {} as any, - assistantMessageEvent: { + partial: createAssistantMessage("hello"), + }), + ); + listener( + createMessageUpdateEvent({ type: "text_end", contentIndex: 0, content: "hello", - partial: {} as any, - }, - } as AgentSessionEvent); - listener({ - type: "turn_end", - turnIndex: 0, - message: {} as any, - toolResults: [], - } as AgentSessionEvent); + partial: createAssistantMessage("hello"), + }), + ); + listener(createTurnEndEvent()); const parsed = parseChunks(response.chunks); expect(parsed).toEqual([ @@ -151,6 +209,70 @@ describe("createVercelStreamListener", () => { ]); }); + it("flushes final assistant text from message_end when no deltas streamed", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response, "test-msg-id"); + + listener({ type: "agent_start" } as AgentSessionEvent); + listener({ + type: "turn_start", + turnIndex: 0, + timestamp: Date.now(), + } as AgentSessionEvent); + listener(createAssistantMessageEndEvent("final answer")); + listener(createTurnEndEvent()); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([ + { type: "start", messageId: "test-msg-id" }, + { type: "start-step" }, + { type: "text-start", id: "text_0" }, + { type: "text-delta", id: "text_0", delta: "final answer" }, + { type: "text-end", id: "text_0" }, + { type: "finish-step" }, + ]); + }); + + it("flushes the missing text suffix on message_end", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response, "test-msg-id"); + + listener({ type: "agent_start" } as AgentSessionEvent); + listener({ + type: "turn_start", + turnIndex: 0, + timestamp: Date.now(), + } as AgentSessionEvent); + listener( + createMessageUpdateEvent({ + type: "text_start", + contentIndex: 0, + partial: createAssistantMessage(""), + }), + ); + listener( + createMessageUpdateEvent({ + type: "text_delta", + contentIndex: 0, + delta: "hel", + partial: createAssistantMessage("hel"), + }), + ); + listener(createAssistantMessageEndEvent("hello")); + listener(createTurnEndEvent()); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([ + { type: "start", messageId: "test-msg-id" }, + { type: "start-step" }, + { type: "text-start", id: "text_0" }, + { type: "text-delta", id: "text_0", delta: "hel" }, + { type: "text-delta", id: "text_0", delta: "lo" }, + { type: "text-end", id: "text_0" }, + { type: "finish-step" }, + ]); + }); + it("does not write after response has ended", () => { const response = createMockResponse(); const listener = createVercelStreamListener(response, "test-msg-id");