diff --git a/packages/coding-agent/src/core/agent-session.ts b/packages/coding-agent/src/core/agent-session.ts index b382bdb..0ca72cb 100644 --- a/packages/coding-agent/src/core/agent-session.ts +++ b/packages/coding-agent/src/core/agent-session.ts @@ -291,6 +291,7 @@ export class AgentSession { private _unsubscribeAgent?: () => void; private _eventListeners: AgentSessionEventListener[] = []; private _agentEventQueue: Promise = Promise.resolve(); + private _agentEventFailure: Error | undefined = undefined; /** Tracks pending steering messages for UI display. Removed when delivered. */ private _steeringMessages: string[] = []; @@ -408,10 +409,12 @@ export class AgentSession { this._agentEventQueue = this._agentEventQueue.then( () => this._processAgentEvent(event), () => this._processAgentEvent(event), - ); - - // Keep queue alive if an event handler fails - this._agentEventQueue.catch(() => {}); + ).catch((error: unknown) => { + if (!this._agentEventFailure) { + this._agentEventFailure = + error instanceof Error ? error : new Error(String(error)); + } + }); }; private _createRetryPromiseForAgentEnd(event: AgentEvent): void { @@ -914,10 +917,11 @@ export class AgentSession { } private async _awaitAgentEventProcessing(): Promise { - try { - await this._agentEventQueue; - } catch { - // Agent event failures are surfaced through normal listener paths. + await this._agentEventQueue; + if (this._agentEventFailure) { + const error = this._agentEventFailure; + this._agentEventFailure = undefined; + throw error; } } @@ -1167,6 +1171,7 @@ export class AgentSession { } } + this._agentEventFailure = undefined; await this.agent.prompt(messages); await this.waitForRetry(); await this._awaitAgentEventProcessing(); @@ -1377,6 +1382,7 @@ export class AgentSession { this.agent.steer(appMessage); } } else if (options?.triggerTurn) { + this._agentEventFailure = undefined; await this.agent.prompt(appMessage); await this.waitForRetry(); await this._awaitAgentEventProcessing(); diff --git a/packages/coding-agent/src/core/gateway/runtime.ts b/packages/coding-agent/src/core/gateway/runtime.ts index a509fba..33994f7 100644 --- a/packages/coding-agent/src/core/gateway/runtime.ts +++ b/packages/coding-agent/src/core/gateway/runtime.ts @@ -26,7 +26,6 @@ import type { GatewaySessionState, GatewaySessionSnapshot, HistoryMessage, - HistoryPart, ModelInfo, } from "./types.js"; import { @@ -54,9 +53,9 @@ export type { GatewaySessionState, GatewaySessionSnapshot, HistoryMessage, - HistoryPart, ModelInfo, } from "./types.js"; +export type { HistoryPart } from "./types.js"; let activeGatewayRuntime: GatewayRuntime | null = null; diff --git a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts index 2cd8fc7..3fcaab7 100644 --- a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts +++ b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts @@ -129,7 +129,8 @@ export function createVercelStreamListener( }; const emitTextDelta = (contentIndex: number, delta: string): void => { - if (delta.length === 0) { + const state = getTextState(contentIndex); + if (delta.length === 0 || state.ended) { return; } emitTextStart(contentIndex); @@ -138,7 +139,7 @@ export function createVercelStreamListener( id: `text_${contentIndex}`, delta, }); - getTextState(contentIndex).streamedText += delta; + state.streamedText += delta; }; const emitTextEnd = (contentIndex: number): void => { @@ -154,20 +155,36 @@ export function createVercelStreamListener( state.ended = true; }; + const flushTextPart = ( + contentIndex: number, + fullText: string, + close: boolean, + ): void => { + const state = getTextState(contentIndex); + if (state.ended) { + return; + } + if (!fullText.startsWith(state.streamedText)) { + if (close && state.started) { + emitTextEnd(contentIndex); + } + return; + } + + const suffix = fullText.slice(state.streamedText.length); + if (suffix.length > 0) { + emitTextDelta(contentIndex, suffix); + } + if (close) { + emitTextEnd(contentIndex); + } + }; + const flushAssistantMessageText = ( event: Extract, ): void => { for (const part of getAssistantTextParts(event)) { - const state = getTextState(part.contentIndex); - if (!part.text.startsWith(state.streamedText)) { - continue; - } - - const suffix = part.text.slice(state.streamedText.length); - if (suffix.length > 0) { - emitTextDelta(part.contentIndex, suffix); - } - emitTextEnd(part.contentIndex); + flushTextPart(part.contentIndex, part.text, true); } }; @@ -206,7 +223,7 @@ export function createVercelStreamListener( emitTextDelta(inner.contentIndex, inner.delta); return; case "text_end": - emitTextEnd(inner.contentIndex); + flushTextPart(inner.contentIndex, inner.content, true); return; case "toolcall_start": { const content = inner.partial.content[inner.contentIndex]; diff --git a/packages/coding-agent/src/core/system-prompt.ts b/packages/coding-agent/src/core/system-prompt.ts index cd9e860..b66d5e2 100644 --- a/packages/coding-agent/src/core/system-prompt.ts +++ b/packages/coding-agent/src/core/system-prompt.ts @@ -84,7 +84,7 @@ function buildProjectContextSection( } if (guides.length > 0) { - section += "\n" + guides.map((g) => `- ${g}`).join("\n") + "\n"; + section += `\n${guides.map((g) => `- ${g}`).join("\n")}\n`; } section += "\n"; diff --git a/packages/coding-agent/test/vercel-ai-stream.test.ts b/packages/coding-agent/test/vercel-ai-stream.test.ts index c237ab6..db4f0eb 100644 --- a/packages/coding-agent/test/vercel-ai-stream.test.ts +++ b/packages/coding-agent/test/vercel-ai-stream.test.ts @@ -273,6 +273,84 @@ describe("createVercelStreamListener", () => { ]); }); + it("flushes text_end content before closing the block", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response, "test-msg-id"); + + listener({ type: "agent_start" } as AgentSessionEvent); + listener({ + type: "turn_start", + turnIndex: 0, + timestamp: Date.now(), + } as AgentSessionEvent); + listener( + createMessageUpdateEvent({ + type: "text_start", + contentIndex: 0, + partial: createAssistantMessage(""), + }), + ); + listener( + createMessageUpdateEvent({ + type: "text_end", + contentIndex: 0, + content: "hello", + partial: createAssistantMessage("hello"), + }), + ); + listener(createAssistantMessageEndEvent("hello")); + listener(createTurnEndEvent()); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([ + { type: "start", messageId: "test-msg-id" }, + { type: "start-step" }, + { type: "text-start", id: "text_0" }, + { type: "text-delta", id: "text_0", delta: "hello" }, + { type: "text-end", id: "text_0" }, + { type: "finish-step" }, + ]); + }); + + it("closes an open text block when final text mismatches the streamed prefix", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response, "test-msg-id"); + + listener({ type: "agent_start" } as AgentSessionEvent); + listener({ + type: "turn_start", + turnIndex: 0, + timestamp: Date.now(), + } as AgentSessionEvent); + listener( + createMessageUpdateEvent({ + type: "text_start", + contentIndex: 0, + partial: createAssistantMessage(""), + }), + ); + listener( + createMessageUpdateEvent({ + type: "text_delta", + contentIndex: 0, + delta: "hello", + partial: createAssistantMessage("hello"), + }), + ); + listener(createAssistantMessageEndEvent("goodbye")); + listener(createTurnEndEvent()); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([ + { type: "start", messageId: "test-msg-id" }, + { type: "start-step" }, + { type: "text-start", id: "text_0" }, + { type: "text-delta", id: "text_0", delta: "hello" }, + { type: "text-end", id: "text_0" }, + { type: "finish-step" }, + ]); + }); + it("does not write after response has ended", () => { const response = createMockResponse(); const listener = createVercelStreamListener(response, "test-msg-id");