Merge pull request #273 from getcompanion-ai/grind

fix chat
This commit is contained in:
Hari 2026-03-09 16:02:14 -04:00 committed by GitHub
commit d880d2740e
5 changed files with 386 additions and 49 deletions

View file

@ -291,6 +291,7 @@ export class AgentSession {
private _unsubscribeAgent?: () => void;
private _eventListeners: AgentSessionEventListener[] = [];
private _agentEventQueue: Promise<void> = Promise.resolve();
private _agentEventFailure: Error | undefined = undefined;
/** Tracks pending steering messages for UI display. Removed when delivered. */
private _steeringMessages: string[] = [];
@ -408,10 +409,12 @@ export class AgentSession {
this._agentEventQueue = this._agentEventQueue.then(
() => this._processAgentEvent(event),
() => this._processAgentEvent(event),
);
// Keep queue alive if an event handler fails
this._agentEventQueue.catch(() => {});
).catch((error: unknown) => {
if (!this._agentEventFailure) {
this._agentEventFailure =
error instanceof Error ? error : new Error(String(error));
}
});
};
private _createRetryPromiseForAgentEnd(event: AgentEvent): void {
@ -913,6 +916,15 @@ export class AgentSession {
await this._memoryDisposePromise;
}
private async _awaitAgentEventProcessing(): Promise<void> {
await this._agentEventQueue;
if (this._agentEventFailure) {
const error = this._agentEventFailure;
this._agentEventFailure = undefined;
throw error;
}
}
private _enqueueMemoryPromotion(messages: AgentMessage[]): void {
this._memoryWriteQueue = this._memoryWriteQueue
.catch(() => undefined)
@ -1159,8 +1171,10 @@ export class AgentSession {
}
}
this._agentEventFailure = undefined;
await this.agent.prompt(messages);
await this.waitForRetry();
await this._awaitAgentEventProcessing();
}
/**
@ -1368,7 +1382,10 @@ export class AgentSession {
this.agent.steer(appMessage);
}
} else if (options?.triggerTurn) {
this._agentEventFailure = undefined;
await this.agent.prompt(appMessage);
await this.waitForRetry();
await this._awaitAgentEventProcessing();
} else {
this.agent.appendMessage(appMessage);
this.sessionManager.appendCustomMessageEntry(

View file

@ -26,7 +26,6 @@ import type {
GatewaySessionState,
GatewaySessionSnapshot,
HistoryMessage,
HistoryPart,
ModelInfo,
} from "./types.js";
import {
@ -54,9 +53,9 @@ export type {
GatewaySessionState,
GatewaySessionSnapshot,
HistoryMessage,
HistoryPart,
ModelInfo,
} from "./types.js";
export type { HistoryPart } from "./types.js";
let activeGatewayRuntime: GatewayRuntime | null = null;

View file

@ -2,6 +2,43 @@ import { randomUUID } from "node:crypto";
import type { ServerResponse } from "node:http";
import type { AgentSessionEvent } from "../agent-session.js";
type TextStreamState = {
started: boolean;
ended: boolean;
streamedText: string;
};
function isTextContent(
value: unknown,
): value is { type: "text"; text: string } {
return (
typeof value === "object" &&
value !== null &&
"type" in value &&
"text" in value &&
(value as { type: unknown }).type === "text" &&
typeof (value as { text: unknown }).text === "string"
);
}
function getAssistantTextParts(
event: Extract<AgentSessionEvent, { type: "message_end" }>,
): Array<{ contentIndex: number; text: string }> {
if (event.message.role !== "assistant" || !Array.isArray(event.message.content)) {
return [];
}
const textParts: Array<{ contentIndex: number; text: string }> = [];
for (const [contentIndex, content] of event.message.content.entries()) {
if (!isTextContent(content) || content.text.length === 0) {
continue;
}
textParts.push({ contentIndex, text: content.text });
}
return textParts;
}
/**
* Write a single Vercel AI SDK v5+ SSE chunk to the response.
* Format: `data: <JSON>\n\n`
@ -63,6 +100,93 @@ export function createVercelStreamListener(
// so these guards only need to bound the stream to that prompt's event span.
let active = false;
const msgId = messageId ?? randomUUID();
let textStates = new Map<number, TextStreamState>();
const getTextState = (contentIndex: number): TextStreamState => {
const existing = textStates.get(contentIndex);
if (existing) {
return existing;
}
const initial: TextStreamState = {
started: false,
ended: false,
streamedText: "",
};
textStates.set(contentIndex, initial);
return initial;
};
const emitTextStart = (contentIndex: number): void => {
const state = getTextState(contentIndex);
if (state.started) {
return;
}
writeChunk(response, {
type: "text-start",
id: `text_${contentIndex}`,
});
state.started = true;
};
const emitTextDelta = (contentIndex: number, delta: string): void => {
const state = getTextState(contentIndex);
if (delta.length === 0 || state.ended) {
return;
}
emitTextStart(contentIndex);
writeChunk(response, {
type: "text-delta",
id: `text_${contentIndex}`,
delta,
});
state.streamedText += delta;
};
const emitTextEnd = (contentIndex: number): void => {
const state = getTextState(contentIndex);
if (state.ended) {
return;
}
emitTextStart(contentIndex);
writeChunk(response, {
type: "text-end",
id: `text_${contentIndex}`,
});
state.ended = true;
};
const flushTextPart = (
contentIndex: number,
fullText: string,
close: boolean,
): void => {
const state = getTextState(contentIndex);
if (state.ended) {
return;
}
if (!fullText.startsWith(state.streamedText)) {
if (close && state.started) {
emitTextEnd(contentIndex);
}
return;
}
const suffix = fullText.slice(state.streamedText.length);
if (suffix.length > 0) {
emitTextDelta(contentIndex, suffix);
}
if (close) {
emitTextEnd(contentIndex);
}
};
const flushAssistantMessageText = (
event: Extract<AgentSessionEvent, { type: "message_end" }>,
): void => {
for (const part of getAssistantTextParts(event)) {
flushTextPart(part.contentIndex, part.text, true);
}
};
return (event: AgentSessionEvent) => {
if (response.writableEnded) return;
@ -85,6 +209,7 @@ export function createVercelStreamListener(
switch (event.type) {
case "turn_start":
textStates = new Map();
writeChunk(response, { type: "start-step" });
return;
@ -92,23 +217,13 @@ export function createVercelStreamListener(
const inner = event.assistantMessageEvent;
switch (inner.type) {
case "text_start":
writeChunk(response, {
type: "text-start",
id: `text_${inner.contentIndex}`,
});
emitTextStart(inner.contentIndex);
return;
case "text_delta":
writeChunk(response, {
type: "text-delta",
id: `text_${inner.contentIndex}`,
delta: inner.delta,
});
emitTextDelta(inner.contentIndex, inner.delta);
return;
case "text_end":
writeChunk(response, {
type: "text-end",
id: `text_${inner.contentIndex}`,
});
flushTextPart(inner.contentIndex, inner.content, true);
return;
case "toolcall_start": {
const content = inner.partial.content[inner.contentIndex];
@ -163,6 +278,12 @@ export function createVercelStreamListener(
return;
}
case "message_end":
if (event.message.role === "assistant") {
flushAssistantMessageText(event);
}
return;
case "turn_end":
writeChunk(response, { type: "finish-step" });
return;

View file

@ -68,7 +68,7 @@ function buildProjectContextSection(
}
if (guides.length > 0) {
section += "\n" + guides.map((g) => `- ${g}`).join("\n") + "\n";
section += `\n${guides.map((g) => `- ${g}`).join("\n")}\n`;
}
section += "\n";

View file

@ -1,3 +1,5 @@
import type { IncomingMessage, ServerResponse } from "node:http";
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it } from "vitest";
import type { AgentSessionEvent } from "../src/core/agent-session.js";
import {
@ -5,6 +7,24 @@ import {
extractUserText,
} from "../src/core/gateway/vercel-ai-stream.js";
type MessageUpdateSessionEvent = Extract<
AgentSessionEvent,
{ type: "message_update" }
>;
type MessageEndSessionEvent = Extract<
AgentSessionEvent,
{ type: "message_end" }
>;
type TurnEndSessionEvent = Extract<AgentSessionEvent, { type: "turn_end" }>;
type TextAssistantMessageEvent = Extract<
MessageUpdateSessionEvent["assistantMessageEvent"],
{ type: "text_start" | "text_delta" | "text_end" }
>;
type MockResponse = ServerResponse<IncomingMessage> & {
chunks: string[];
ended: boolean;
};
describe("extractUserText", () => {
it("extracts text from useChat v5+ format with parts", () => {
const body = {
@ -61,24 +81,73 @@ describe("extractUserText", () => {
});
describe("createVercelStreamListener", () => {
function createMockResponse() {
function createAssistantMessage(text: string): AssistantMessage {
return {
role: "assistant",
content: [{ type: "text", text }],
api: "anthropic-messages",
provider: "anthropic",
model: "mock",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: Date.now(),
};
}
function createMockResponse(): MockResponse {
const chunks: string[] = [];
let ended = false;
return {
writableEnded: false,
const response = {
get writableEnded() {
return ended;
},
write(data: string) {
chunks.push(data);
return true;
},
end() {
ended = true;
this.writableEnded = true;
},
chunks,
get ended() {
return ended;
},
} as any;
} as unknown as MockResponse;
return response;
}
function createMessageUpdateEvent(
assistantMessageEvent: TextAssistantMessageEvent,
): MessageUpdateSessionEvent {
return {
type: "message_update",
message: createAssistantMessage(""),
assistantMessageEvent,
};
}
function createAssistantMessageEndEvent(
text: string,
): MessageEndSessionEvent {
return {
type: "message_end",
message: createAssistantMessage(text),
};
}
function createTurnEndEvent(): TurnEndSessionEvent {
return {
type: "turn_end",
message: createAssistantMessage(""),
toolResults: [],
};
}
function parseChunks(chunks: string[]): Array<object | string> {
@ -104,41 +173,172 @@ describe("createVercelStreamListener", () => {
turnIndex: 0,
timestamp: Date.now(),
} as AgentSessionEvent);
listener({
type: "message_update",
message: {} as any,
assistantMessageEvent: {
listener(
createMessageUpdateEvent({
type: "text_start",
contentIndex: 0,
partial: {} as any,
},
} as AgentSessionEvent);
listener({
type: "message_update",
message: {} as any,
assistantMessageEvent: {
partial: createAssistantMessage(""),
}),
);
listener(
createMessageUpdateEvent({
type: "text_delta",
contentIndex: 0,
delta: "hello",
partial: {} as any,
},
} as AgentSessionEvent);
listener({
type: "message_update",
message: {} as any,
assistantMessageEvent: {
partial: createAssistantMessage("hello"),
}),
);
listener(
createMessageUpdateEvent({
type: "text_end",
contentIndex: 0,
content: "hello",
partial: {} as any,
},
} as AgentSessionEvent);
partial: createAssistantMessage("hello"),
}),
);
listener(createTurnEndEvent());
const parsed = parseChunks(response.chunks);
expect(parsed).toEqual([
{ type: "start", messageId: "test-msg-id" },
{ type: "start-step" },
{ type: "text-start", id: "text_0" },
{ type: "text-delta", id: "text_0", delta: "hello" },
{ type: "text-end", id: "text_0" },
{ type: "finish-step" },
]);
});
it("flushes final assistant text from message_end when no deltas streamed", () => {
const response = createMockResponse();
const listener = createVercelStreamListener(response, "test-msg-id");
listener({ type: "agent_start" } as AgentSessionEvent);
listener({
type: "turn_end",
type: "turn_start",
turnIndex: 0,
message: {} as any,
toolResults: [],
timestamp: Date.now(),
} as AgentSessionEvent);
listener(createAssistantMessageEndEvent("final answer"));
listener(createTurnEndEvent());
const parsed = parseChunks(response.chunks);
expect(parsed).toEqual([
{ type: "start", messageId: "test-msg-id" },
{ type: "start-step" },
{ type: "text-start", id: "text_0" },
{ type: "text-delta", id: "text_0", delta: "final answer" },
{ type: "text-end", id: "text_0" },
{ type: "finish-step" },
]);
});
it("flushes the missing text suffix on message_end", () => {
const response = createMockResponse();
const listener = createVercelStreamListener(response, "test-msg-id");
listener({ type: "agent_start" } as AgentSessionEvent);
listener({
type: "turn_start",
turnIndex: 0,
timestamp: Date.now(),
} as AgentSessionEvent);
listener(
createMessageUpdateEvent({
type: "text_start",
contentIndex: 0,
partial: createAssistantMessage(""),
}),
);
listener(
createMessageUpdateEvent({
type: "text_delta",
contentIndex: 0,
delta: "hel",
partial: createAssistantMessage("hel"),
}),
);
listener(createAssistantMessageEndEvent("hello"));
listener(createTurnEndEvent());
const parsed = parseChunks(response.chunks);
expect(parsed).toEqual([
{ type: "start", messageId: "test-msg-id" },
{ type: "start-step" },
{ type: "text-start", id: "text_0" },
{ type: "text-delta", id: "text_0", delta: "hel" },
{ type: "text-delta", id: "text_0", delta: "lo" },
{ type: "text-end", id: "text_0" },
{ type: "finish-step" },
]);
});
it("flushes text_end content before closing the block", () => {
const response = createMockResponse();
const listener = createVercelStreamListener(response, "test-msg-id");
listener({ type: "agent_start" } as AgentSessionEvent);
listener({
type: "turn_start",
turnIndex: 0,
timestamp: Date.now(),
} as AgentSessionEvent);
listener(
createMessageUpdateEvent({
type: "text_start",
contentIndex: 0,
partial: createAssistantMessage(""),
}),
);
listener(
createMessageUpdateEvent({
type: "text_end",
contentIndex: 0,
content: "hello",
partial: createAssistantMessage("hello"),
}),
);
listener(createAssistantMessageEndEvent("hello"));
listener(createTurnEndEvent());
const parsed = parseChunks(response.chunks);
expect(parsed).toEqual([
{ type: "start", messageId: "test-msg-id" },
{ type: "start-step" },
{ type: "text-start", id: "text_0" },
{ type: "text-delta", id: "text_0", delta: "hello" },
{ type: "text-end", id: "text_0" },
{ type: "finish-step" },
]);
});
it("closes an open text block when final text mismatches the streamed prefix", () => {
const response = createMockResponse();
const listener = createVercelStreamListener(response, "test-msg-id");
listener({ type: "agent_start" } as AgentSessionEvent);
listener({
type: "turn_start",
turnIndex: 0,
timestamp: Date.now(),
} as AgentSessionEvent);
listener(
createMessageUpdateEvent({
type: "text_start",
contentIndex: 0,
partial: createAssistantMessage(""),
}),
);
listener(
createMessageUpdateEvent({
type: "text_delta",
contentIndex: 0,
delta: "hello",
partial: createAssistantMessage("hello"),
}),
);
listener(createAssistantMessageEndEvent("goodbye"));
listener(createTurnEndEvent());
const parsed = parseChunks(response.chunks);
expect(parsed).toEqual([