From 6b2a639fb6f77995ac7420563d23cab7ae76fc6e Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Mon, 9 Mar 2026 10:46:43 -0700 Subject: [PATCH 1/2] fix chat --- .../coding-agent/src/core/agent-session.ts | 11 ++ .../src/core/gateway/vercel-ai-stream.ts | 130 ++++++++++-- .../test/vercel-ai-stream.test.ts | 186 +++++++++++++++--- 3 files changed, 282 insertions(+), 45 deletions(-) diff --git a/packages/coding-agent/src/core/agent-session.ts b/packages/coding-agent/src/core/agent-session.ts index c3c0696..b382bdb 100644 --- a/packages/coding-agent/src/core/agent-session.ts +++ b/packages/coding-agent/src/core/agent-session.ts @@ -913,6 +913,14 @@ export class AgentSession { await this._memoryDisposePromise; } + private async _awaitAgentEventProcessing(): Promise { + try { + await this._agentEventQueue; + } catch { + // Agent event failures are surfaced through normal listener paths. + } + } + private _enqueueMemoryPromotion(messages: AgentMessage[]): void { this._memoryWriteQueue = this._memoryWriteQueue .catch(() => undefined) @@ -1161,6 +1169,7 @@ export class AgentSession { await this.agent.prompt(messages); await this.waitForRetry(); + await this._awaitAgentEventProcessing(); } /** @@ -1369,6 +1378,8 @@ export class AgentSession { } } else if (options?.triggerTurn) { await this.agent.prompt(appMessage); + await this.waitForRetry(); + await this._awaitAgentEventProcessing(); } else { this.agent.appendMessage(appMessage); this.sessionManager.appendCustomMessageEntry( diff --git a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts index cbc20e6..2cd8fc7 100644 --- a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts +++ b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts @@ -2,6 +2,43 @@ import { randomUUID } from "node:crypto"; import type { ServerResponse } from "node:http"; import type { AgentSessionEvent } from "../agent-session.js"; +type TextStreamState = { + started: boolean; + ended: boolean; + streamedText: string; +}; + +function isTextContent( + value: unknown, +): value is { type: "text"; text: string } { + return ( + typeof value === "object" && + value !== null && + "type" in value && + "text" in value && + (value as { type: unknown }).type === "text" && + typeof (value as { text: unknown }).text === "string" + ); +} + +function getAssistantTextParts( + event: Extract, +): Array<{ contentIndex: number; text: string }> { + if (event.message.role !== "assistant" || !Array.isArray(event.message.content)) { + return []; + } + + const textParts: Array<{ contentIndex: number; text: string }> = []; + for (const [contentIndex, content] of event.message.content.entries()) { + if (!isTextContent(content) || content.text.length === 0) { + continue; + } + textParts.push({ contentIndex, text: content.text }); + } + + return textParts; +} + /** * Write a single Vercel AI SDK v5+ SSE chunk to the response. * Format: `data: \n\n` @@ -63,6 +100,76 @@ export function createVercelStreamListener( // so these guards only need to bound the stream to that prompt's event span. let active = false; const msgId = messageId ?? randomUUID(); + let textStates = new Map(); + + const getTextState = (contentIndex: number): TextStreamState => { + const existing = textStates.get(contentIndex); + if (existing) { + return existing; + } + const initial: TextStreamState = { + started: false, + ended: false, + streamedText: "", + }; + textStates.set(contentIndex, initial); + return initial; + }; + + const emitTextStart = (contentIndex: number): void => { + const state = getTextState(contentIndex); + if (state.started) { + return; + } + writeChunk(response, { + type: "text-start", + id: `text_${contentIndex}`, + }); + state.started = true; + }; + + const emitTextDelta = (contentIndex: number, delta: string): void => { + if (delta.length === 0) { + return; + } + emitTextStart(contentIndex); + writeChunk(response, { + type: "text-delta", + id: `text_${contentIndex}`, + delta, + }); + getTextState(contentIndex).streamedText += delta; + }; + + const emitTextEnd = (contentIndex: number): void => { + const state = getTextState(contentIndex); + if (state.ended) { + return; + } + emitTextStart(contentIndex); + writeChunk(response, { + type: "text-end", + id: `text_${contentIndex}`, + }); + state.ended = true; + }; + + const flushAssistantMessageText = ( + event: Extract, + ): void => { + for (const part of getAssistantTextParts(event)) { + const state = getTextState(part.contentIndex); + if (!part.text.startsWith(state.streamedText)) { + continue; + } + + const suffix = part.text.slice(state.streamedText.length); + if (suffix.length > 0) { + emitTextDelta(part.contentIndex, suffix); + } + emitTextEnd(part.contentIndex); + } + }; return (event: AgentSessionEvent) => { if (response.writableEnded) return; @@ -85,6 +192,7 @@ export function createVercelStreamListener( switch (event.type) { case "turn_start": + textStates = new Map(); writeChunk(response, { type: "start-step" }); return; @@ -92,23 +200,13 @@ export function createVercelStreamListener( const inner = event.assistantMessageEvent; switch (inner.type) { case "text_start": - writeChunk(response, { - type: "text-start", - id: `text_${inner.contentIndex}`, - }); + emitTextStart(inner.contentIndex); return; case "text_delta": - writeChunk(response, { - type: "text-delta", - id: `text_${inner.contentIndex}`, - delta: inner.delta, - }); + emitTextDelta(inner.contentIndex, inner.delta); return; case "text_end": - writeChunk(response, { - type: "text-end", - id: `text_${inner.contentIndex}`, - }); + emitTextEnd(inner.contentIndex); return; case "toolcall_start": { const content = inner.partial.content[inner.contentIndex]; @@ -163,6 +261,12 @@ export function createVercelStreamListener( return; } + case "message_end": + if (event.message.role === "assistant") { + flushAssistantMessageText(event); + } + return; + case "turn_end": writeChunk(response, { type: "finish-step" }); return; diff --git a/packages/coding-agent/test/vercel-ai-stream.test.ts b/packages/coding-agent/test/vercel-ai-stream.test.ts index ab98c69..c237ab6 100644 --- a/packages/coding-agent/test/vercel-ai-stream.test.ts +++ b/packages/coding-agent/test/vercel-ai-stream.test.ts @@ -1,3 +1,5 @@ +import type { IncomingMessage, ServerResponse } from "node:http"; +import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it } from "vitest"; import type { AgentSessionEvent } from "../src/core/agent-session.js"; import { @@ -5,6 +7,24 @@ import { extractUserText, } from "../src/core/gateway/vercel-ai-stream.js"; +type MessageUpdateSessionEvent = Extract< + AgentSessionEvent, + { type: "message_update" } +>; +type MessageEndSessionEvent = Extract< + AgentSessionEvent, + { type: "message_end" } +>; +type TurnEndSessionEvent = Extract; +type TextAssistantMessageEvent = Extract< + MessageUpdateSessionEvent["assistantMessageEvent"], + { type: "text_start" | "text_delta" | "text_end" } +>; +type MockResponse = ServerResponse & { + chunks: string[]; + ended: boolean; +}; + describe("extractUserText", () => { it("extracts text from useChat v5+ format with parts", () => { const body = { @@ -61,24 +81,73 @@ describe("extractUserText", () => { }); describe("createVercelStreamListener", () => { - function createMockResponse() { + function createAssistantMessage(text: string): AssistantMessage { + return { + role: "assistant", + content: [{ type: "text", text }], + api: "anthropic-messages", + provider: "anthropic", + model: "mock", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: Date.now(), + }; + } + + function createMockResponse(): MockResponse { const chunks: string[] = []; let ended = false; - return { - writableEnded: false, + const response = { + get writableEnded() { + return ended; + }, write(data: string) { chunks.push(data); return true; }, end() { ended = true; - this.writableEnded = true; }, chunks, get ended() { return ended; }, - } as any; + } as unknown as MockResponse; + return response; + } + + function createMessageUpdateEvent( + assistantMessageEvent: TextAssistantMessageEvent, + ): MessageUpdateSessionEvent { + return { + type: "message_update", + message: createAssistantMessage(""), + assistantMessageEvent, + }; + } + + function createAssistantMessageEndEvent( + text: string, + ): MessageEndSessionEvent { + return { + type: "message_end", + message: createAssistantMessage(text), + }; + } + + function createTurnEndEvent(): TurnEndSessionEvent { + return { + type: "turn_end", + message: createAssistantMessage(""), + toolResults: [], + }; } function parseChunks(chunks: string[]): Array { @@ -104,41 +173,30 @@ describe("createVercelStreamListener", () => { turnIndex: 0, timestamp: Date.now(), } as AgentSessionEvent); - listener({ - type: "message_update", - message: {} as any, - assistantMessageEvent: { + listener( + createMessageUpdateEvent({ type: "text_start", contentIndex: 0, - partial: {} as any, - }, - } as AgentSessionEvent); - listener({ - type: "message_update", - message: {} as any, - assistantMessageEvent: { + partial: createAssistantMessage(""), + }), + ); + listener( + createMessageUpdateEvent({ type: "text_delta", contentIndex: 0, delta: "hello", - partial: {} as any, - }, - } as AgentSessionEvent); - listener({ - type: "message_update", - message: {} as any, - assistantMessageEvent: { + partial: createAssistantMessage("hello"), + }), + ); + listener( + createMessageUpdateEvent({ type: "text_end", contentIndex: 0, content: "hello", - partial: {} as any, - }, - } as AgentSessionEvent); - listener({ - type: "turn_end", - turnIndex: 0, - message: {} as any, - toolResults: [], - } as AgentSessionEvent); + partial: createAssistantMessage("hello"), + }), + ); + listener(createTurnEndEvent()); const parsed = parseChunks(response.chunks); expect(parsed).toEqual([ @@ -151,6 +209,70 @@ describe("createVercelStreamListener", () => { ]); }); + it("flushes final assistant text from message_end when no deltas streamed", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response, "test-msg-id"); + + listener({ type: "agent_start" } as AgentSessionEvent); + listener({ + type: "turn_start", + turnIndex: 0, + timestamp: Date.now(), + } as AgentSessionEvent); + listener(createAssistantMessageEndEvent("final answer")); + listener(createTurnEndEvent()); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([ + { type: "start", messageId: "test-msg-id" }, + { type: "start-step" }, + { type: "text-start", id: "text_0" }, + { type: "text-delta", id: "text_0", delta: "final answer" }, + { type: "text-end", id: "text_0" }, + { type: "finish-step" }, + ]); + }); + + it("flushes the missing text suffix on message_end", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response, "test-msg-id"); + + listener({ type: "agent_start" } as AgentSessionEvent); + listener({ + type: "turn_start", + turnIndex: 0, + timestamp: Date.now(), + } as AgentSessionEvent); + listener( + createMessageUpdateEvent({ + type: "text_start", + contentIndex: 0, + partial: createAssistantMessage(""), + }), + ); + listener( + createMessageUpdateEvent({ + type: "text_delta", + contentIndex: 0, + delta: "hel", + partial: createAssistantMessage("hel"), + }), + ); + listener(createAssistantMessageEndEvent("hello")); + listener(createTurnEndEvent()); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([ + { type: "start", messageId: "test-msg-id" }, + { type: "start-step" }, + { type: "text-start", id: "text_0" }, + { type: "text-delta", id: "text_0", delta: "hel" }, + { type: "text-delta", id: "text_0", delta: "lo" }, + { type: "text-end", id: "text_0" }, + { type: "finish-step" }, + ]); + }); + it("does not write after response has ended", () => { const response = createMockResponse(); const listener = createVercelStreamListener(response, "test-msg-id"); From 3c0f74c1dccf92f324b97e64fb73dc5dac5385d2 Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Mon, 9 Mar 2026 12:48:21 -0700 Subject: [PATCH 2/2] fix(coding-agent): harden chat stream completion Flush final text before closing each AI SDK text block, surface event-processing failures to chat callers, and clear the remaining Companion OS check blockers. fixes #273 Co-authored-by: Codex --- .../coding-agent/src/core/agent-session.ts | 22 ++++-- .../coding-agent/src/core/gateway/runtime.ts | 3 +- .../src/core/gateway/vercel-ai-stream.ts | 43 ++++++---- .../coding-agent/src/core/system-prompt.ts | 2 +- .../test/vercel-ai-stream.test.ts | 78 +++++++++++++++++++ 5 files changed, 124 insertions(+), 24 deletions(-) diff --git a/packages/coding-agent/src/core/agent-session.ts b/packages/coding-agent/src/core/agent-session.ts index b382bdb..0ca72cb 100644 --- a/packages/coding-agent/src/core/agent-session.ts +++ b/packages/coding-agent/src/core/agent-session.ts @@ -291,6 +291,7 @@ export class AgentSession { private _unsubscribeAgent?: () => void; private _eventListeners: AgentSessionEventListener[] = []; private _agentEventQueue: Promise = Promise.resolve(); + private _agentEventFailure: Error | undefined = undefined; /** Tracks pending steering messages for UI display. Removed when delivered. */ private _steeringMessages: string[] = []; @@ -408,10 +409,12 @@ export class AgentSession { this._agentEventQueue = this._agentEventQueue.then( () => this._processAgentEvent(event), () => this._processAgentEvent(event), - ); - - // Keep queue alive if an event handler fails - this._agentEventQueue.catch(() => {}); + ).catch((error: unknown) => { + if (!this._agentEventFailure) { + this._agentEventFailure = + error instanceof Error ? error : new Error(String(error)); + } + }); }; private _createRetryPromiseForAgentEnd(event: AgentEvent): void { @@ -914,10 +917,11 @@ export class AgentSession { } private async _awaitAgentEventProcessing(): Promise { - try { - await this._agentEventQueue; - } catch { - // Agent event failures are surfaced through normal listener paths. + await this._agentEventQueue; + if (this._agentEventFailure) { + const error = this._agentEventFailure; + this._agentEventFailure = undefined; + throw error; } } @@ -1167,6 +1171,7 @@ export class AgentSession { } } + this._agentEventFailure = undefined; await this.agent.prompt(messages); await this.waitForRetry(); await this._awaitAgentEventProcessing(); @@ -1377,6 +1382,7 @@ export class AgentSession { this.agent.steer(appMessage); } } else if (options?.triggerTurn) { + this._agentEventFailure = undefined; await this.agent.prompt(appMessage); await this.waitForRetry(); await this._awaitAgentEventProcessing(); diff --git a/packages/coding-agent/src/core/gateway/runtime.ts b/packages/coding-agent/src/core/gateway/runtime.ts index a509fba..33994f7 100644 --- a/packages/coding-agent/src/core/gateway/runtime.ts +++ b/packages/coding-agent/src/core/gateway/runtime.ts @@ -26,7 +26,6 @@ import type { GatewaySessionState, GatewaySessionSnapshot, HistoryMessage, - HistoryPart, ModelInfo, } from "./types.js"; import { @@ -54,9 +53,9 @@ export type { GatewaySessionState, GatewaySessionSnapshot, HistoryMessage, - HistoryPart, ModelInfo, } from "./types.js"; +export type { HistoryPart } from "./types.js"; let activeGatewayRuntime: GatewayRuntime | null = null; diff --git a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts index 2cd8fc7..3fcaab7 100644 --- a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts +++ b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts @@ -129,7 +129,8 @@ export function createVercelStreamListener( }; const emitTextDelta = (contentIndex: number, delta: string): void => { - if (delta.length === 0) { + const state = getTextState(contentIndex); + if (delta.length === 0 || state.ended) { return; } emitTextStart(contentIndex); @@ -138,7 +139,7 @@ export function createVercelStreamListener( id: `text_${contentIndex}`, delta, }); - getTextState(contentIndex).streamedText += delta; + state.streamedText += delta; }; const emitTextEnd = (contentIndex: number): void => { @@ -154,20 +155,36 @@ export function createVercelStreamListener( state.ended = true; }; + const flushTextPart = ( + contentIndex: number, + fullText: string, + close: boolean, + ): void => { + const state = getTextState(contentIndex); + if (state.ended) { + return; + } + if (!fullText.startsWith(state.streamedText)) { + if (close && state.started) { + emitTextEnd(contentIndex); + } + return; + } + + const suffix = fullText.slice(state.streamedText.length); + if (suffix.length > 0) { + emitTextDelta(contentIndex, suffix); + } + if (close) { + emitTextEnd(contentIndex); + } + }; + const flushAssistantMessageText = ( event: Extract, ): void => { for (const part of getAssistantTextParts(event)) { - const state = getTextState(part.contentIndex); - if (!part.text.startsWith(state.streamedText)) { - continue; - } - - const suffix = part.text.slice(state.streamedText.length); - if (suffix.length > 0) { - emitTextDelta(part.contentIndex, suffix); - } - emitTextEnd(part.contentIndex); + flushTextPart(part.contentIndex, part.text, true); } }; @@ -206,7 +223,7 @@ export function createVercelStreamListener( emitTextDelta(inner.contentIndex, inner.delta); return; case "text_end": - emitTextEnd(inner.contentIndex); + flushTextPart(inner.contentIndex, inner.content, true); return; case "toolcall_start": { const content = inner.partial.content[inner.contentIndex]; diff --git a/packages/coding-agent/src/core/system-prompt.ts b/packages/coding-agent/src/core/system-prompt.ts index cd9e860..b66d5e2 100644 --- a/packages/coding-agent/src/core/system-prompt.ts +++ b/packages/coding-agent/src/core/system-prompt.ts @@ -84,7 +84,7 @@ function buildProjectContextSection( } if (guides.length > 0) { - section += "\n" + guides.map((g) => `- ${g}`).join("\n") + "\n"; + section += `\n${guides.map((g) => `- ${g}`).join("\n")}\n`; } section += "\n"; diff --git a/packages/coding-agent/test/vercel-ai-stream.test.ts b/packages/coding-agent/test/vercel-ai-stream.test.ts index c237ab6..db4f0eb 100644 --- a/packages/coding-agent/test/vercel-ai-stream.test.ts +++ b/packages/coding-agent/test/vercel-ai-stream.test.ts @@ -273,6 +273,84 @@ describe("createVercelStreamListener", () => { ]); }); + it("flushes text_end content before closing the block", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response, "test-msg-id"); + + listener({ type: "agent_start" } as AgentSessionEvent); + listener({ + type: "turn_start", + turnIndex: 0, + timestamp: Date.now(), + } as AgentSessionEvent); + listener( + createMessageUpdateEvent({ + type: "text_start", + contentIndex: 0, + partial: createAssistantMessage(""), + }), + ); + listener( + createMessageUpdateEvent({ + type: "text_end", + contentIndex: 0, + content: "hello", + partial: createAssistantMessage("hello"), + }), + ); + listener(createAssistantMessageEndEvent("hello")); + listener(createTurnEndEvent()); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([ + { type: "start", messageId: "test-msg-id" }, + { type: "start-step" }, + { type: "text-start", id: "text_0" }, + { type: "text-delta", id: "text_0", delta: "hello" }, + { type: "text-end", id: "text_0" }, + { type: "finish-step" }, + ]); + }); + + it("closes an open text block when final text mismatches the streamed prefix", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response, "test-msg-id"); + + listener({ type: "agent_start" } as AgentSessionEvent); + listener({ + type: "turn_start", + turnIndex: 0, + timestamp: Date.now(), + } as AgentSessionEvent); + listener( + createMessageUpdateEvent({ + type: "text_start", + contentIndex: 0, + partial: createAssistantMessage(""), + }), + ); + listener( + createMessageUpdateEvent({ + type: "text_delta", + contentIndex: 0, + delta: "hello", + partial: createAssistantMessage("hello"), + }), + ); + listener(createAssistantMessageEndEvent("goodbye")); + listener(createTurnEndEvent()); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([ + { type: "start", messageId: "test-msg-id" }, + { type: "start-step" }, + { type: "text-start", id: "text_0" }, + { type: "text-delta", id: "text_0", delta: "hello" }, + { type: "text-end", id: "text_0" }, + { type: "finish-step" }, + ]); + }); + it("does not write after response has ended", () => { const response = createMockResponse(); const listener = createVercelStreamListener(response, "test-msg-id");