fix(coding-agent): harden chat stream completion

Flush final text before closing each AI SDK text block, surface event-processing failures to chat callers, and clear the remaining Companion OS check blockers.

fixes #273

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Harivansh Rathi 2026-03-09 12:48:21 -07:00
parent 6b2a639fb6
commit 3c0f74c1dc
5 changed files with 124 additions and 24 deletions

View file

@ -291,6 +291,7 @@ export class AgentSession {
private _unsubscribeAgent?: () => void; private _unsubscribeAgent?: () => void;
private _eventListeners: AgentSessionEventListener[] = []; private _eventListeners: AgentSessionEventListener[] = [];
private _agentEventQueue: Promise<void> = Promise.resolve(); private _agentEventQueue: Promise<void> = Promise.resolve();
private _agentEventFailure: Error | undefined = undefined;
/** Tracks pending steering messages for UI display. Removed when delivered. */ /** Tracks pending steering messages for UI display. Removed when delivered. */
private _steeringMessages: string[] = []; private _steeringMessages: string[] = [];
@ -408,10 +409,12 @@ export class AgentSession {
this._agentEventQueue = this._agentEventQueue.then( this._agentEventQueue = this._agentEventQueue.then(
() => this._processAgentEvent(event), () => this._processAgentEvent(event),
() => this._processAgentEvent(event), () => this._processAgentEvent(event),
); ).catch((error: unknown) => {
if (!this._agentEventFailure) {
// Keep queue alive if an event handler fails this._agentEventFailure =
this._agentEventQueue.catch(() => {}); error instanceof Error ? error : new Error(String(error));
}
});
}; };
private _createRetryPromiseForAgentEnd(event: AgentEvent): void { private _createRetryPromiseForAgentEnd(event: AgentEvent): void {
@ -914,10 +917,11 @@ export class AgentSession {
} }
private async _awaitAgentEventProcessing(): Promise<void> { private async _awaitAgentEventProcessing(): Promise<void> {
try { await this._agentEventQueue;
await this._agentEventQueue; if (this._agentEventFailure) {
} catch { const error = this._agentEventFailure;
// Agent event failures are surfaced through normal listener paths. this._agentEventFailure = undefined;
throw error;
} }
} }
@ -1167,6 +1171,7 @@ export class AgentSession {
} }
} }
this._agentEventFailure = undefined;
await this.agent.prompt(messages); await this.agent.prompt(messages);
await this.waitForRetry(); await this.waitForRetry();
await this._awaitAgentEventProcessing(); await this._awaitAgentEventProcessing();
@ -1377,6 +1382,7 @@ export class AgentSession {
this.agent.steer(appMessage); this.agent.steer(appMessage);
} }
} else if (options?.triggerTurn) { } else if (options?.triggerTurn) {
this._agentEventFailure = undefined;
await this.agent.prompt(appMessage); await this.agent.prompt(appMessage);
await this.waitForRetry(); await this.waitForRetry();
await this._awaitAgentEventProcessing(); await this._awaitAgentEventProcessing();

View file

@ -26,7 +26,6 @@ import type {
GatewaySessionState, GatewaySessionState,
GatewaySessionSnapshot, GatewaySessionSnapshot,
HistoryMessage, HistoryMessage,
HistoryPart,
ModelInfo, ModelInfo,
} from "./types.js"; } from "./types.js";
import { import {
@ -54,9 +53,9 @@ export type {
GatewaySessionState, GatewaySessionState,
GatewaySessionSnapshot, GatewaySessionSnapshot,
HistoryMessage, HistoryMessage,
HistoryPart,
ModelInfo, ModelInfo,
} from "./types.js"; } from "./types.js";
export type { HistoryPart } from "./types.js";
let activeGatewayRuntime: GatewayRuntime | null = null; let activeGatewayRuntime: GatewayRuntime | null = null;

View file

@ -129,7 +129,8 @@ export function createVercelStreamListener(
}; };
const emitTextDelta = (contentIndex: number, delta: string): void => { const emitTextDelta = (contentIndex: number, delta: string): void => {
if (delta.length === 0) { const state = getTextState(contentIndex);
if (delta.length === 0 || state.ended) {
return; return;
} }
emitTextStart(contentIndex); emitTextStart(contentIndex);
@ -138,7 +139,7 @@ export function createVercelStreamListener(
id: `text_${contentIndex}`, id: `text_${contentIndex}`,
delta, delta,
}); });
getTextState(contentIndex).streamedText += delta; state.streamedText += delta;
}; };
const emitTextEnd = (contentIndex: number): void => { const emitTextEnd = (contentIndex: number): void => {
@ -154,20 +155,36 @@ export function createVercelStreamListener(
state.ended = true; state.ended = true;
}; };
const flushTextPart = (
contentIndex: number,
fullText: string,
close: boolean,
): void => {
const state = getTextState(contentIndex);
if (state.ended) {
return;
}
if (!fullText.startsWith(state.streamedText)) {
if (close && state.started) {
emitTextEnd(contentIndex);
}
return;
}
const suffix = fullText.slice(state.streamedText.length);
if (suffix.length > 0) {
emitTextDelta(contentIndex, suffix);
}
if (close) {
emitTextEnd(contentIndex);
}
};
const flushAssistantMessageText = ( const flushAssistantMessageText = (
event: Extract<AgentSessionEvent, { type: "message_end" }>, event: Extract<AgentSessionEvent, { type: "message_end" }>,
): void => { ): void => {
for (const part of getAssistantTextParts(event)) { for (const part of getAssistantTextParts(event)) {
const state = getTextState(part.contentIndex); flushTextPart(part.contentIndex, part.text, true);
if (!part.text.startsWith(state.streamedText)) {
continue;
}
const suffix = part.text.slice(state.streamedText.length);
if (suffix.length > 0) {
emitTextDelta(part.contentIndex, suffix);
}
emitTextEnd(part.contentIndex);
} }
}; };
@ -206,7 +223,7 @@ export function createVercelStreamListener(
emitTextDelta(inner.contentIndex, inner.delta); emitTextDelta(inner.contentIndex, inner.delta);
return; return;
case "text_end": case "text_end":
emitTextEnd(inner.contentIndex); flushTextPart(inner.contentIndex, inner.content, true);
return; return;
case "toolcall_start": { case "toolcall_start": {
const content = inner.partial.content[inner.contentIndex]; const content = inner.partial.content[inner.contentIndex];

View file

@ -84,7 +84,7 @@ function buildProjectContextSection(
} }
if (guides.length > 0) { if (guides.length > 0) {
section += "\n" + guides.map((g) => `- ${g}`).join("\n") + "\n"; section += `\n${guides.map((g) => `- ${g}`).join("\n")}\n`;
} }
section += "\n"; section += "\n";

View file

@ -273,6 +273,84 @@ describe("createVercelStreamListener", () => {
]); ]);
}); });
it("flushes text_end content before closing the block", () => {
const response = createMockResponse();
const listener = createVercelStreamListener(response, "test-msg-id");
listener({ type: "agent_start" } as AgentSessionEvent);
listener({
type: "turn_start",
turnIndex: 0,
timestamp: Date.now(),
} as AgentSessionEvent);
listener(
createMessageUpdateEvent({
type: "text_start",
contentIndex: 0,
partial: createAssistantMessage(""),
}),
);
listener(
createMessageUpdateEvent({
type: "text_end",
contentIndex: 0,
content: "hello",
partial: createAssistantMessage("hello"),
}),
);
listener(createAssistantMessageEndEvent("hello"));
listener(createTurnEndEvent());
const parsed = parseChunks(response.chunks);
expect(parsed).toEqual([
{ type: "start", messageId: "test-msg-id" },
{ type: "start-step" },
{ type: "text-start", id: "text_0" },
{ type: "text-delta", id: "text_0", delta: "hello" },
{ type: "text-end", id: "text_0" },
{ type: "finish-step" },
]);
});
it("closes an open text block when final text mismatches the streamed prefix", () => {
const response = createMockResponse();
const listener = createVercelStreamListener(response, "test-msg-id");
listener({ type: "agent_start" } as AgentSessionEvent);
listener({
type: "turn_start",
turnIndex: 0,
timestamp: Date.now(),
} as AgentSessionEvent);
listener(
createMessageUpdateEvent({
type: "text_start",
contentIndex: 0,
partial: createAssistantMessage(""),
}),
);
listener(
createMessageUpdateEvent({
type: "text_delta",
contentIndex: 0,
delta: "hello",
partial: createAssistantMessage("hello"),
}),
);
listener(createAssistantMessageEndEvent("goodbye"));
listener(createTurnEndEvent());
const parsed = parseChunks(response.chunks);
expect(parsed).toEqual([
{ type: "start", messageId: "test-msg-id" },
{ type: "start-step" },
{ type: "text-start", id: "text_0" },
{ type: "text-delta", id: "text_0", delta: "hello" },
{ type: "text-end", id: "text_0" },
{ type: "finish-step" },
]);
});
it("does not write after response has ended", () => { it("does not write after response has ended", () => {
const response = createMockResponse(); const response = createMockResponse();
const listener = createVercelStreamListener(response, "test-msg-id"); const listener = createVercelStreamListener(response, "test-msg-id");