Merge pull request #317 from getcompanion-ai/ssebeat

sse heartbeat and ui
This commit is contained in:
Hari 2026-03-12 21:29:49 -04:00 committed by GitHub
commit 9b9a0170d6
2 changed files with 25 additions and 4 deletions

View file

@ -60,6 +60,7 @@ export type {
export type { HistoryPart } from "./types.js";
let activeGatewayRuntime: GatewayRuntime | null = null;
const SSE_HEARTBEAT_MS = 15_000;
type JsonRecord = Record<string, unknown>;
type AssistantAgentMessage = Extract<AgentMessage, { role: "assistant" }>;
@ -87,6 +88,18 @@ function mergeRecords(base: JsonRecord, overrides: JsonRecord): JsonRecord {
return merged;
}
function startSseHeartbeat(response: ServerResponse): () => void {
const timer = setInterval(() => {
if (response.writableEnded) {
clearInterval(timer);
return;
}
response.write(":\n\n");
}, SSE_HEARTBEAT_MS);
timer.unref?.();
return () => clearInterval(timer);
}
function readString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
@ -1178,11 +1191,13 @@ export class GatewayRuntime {
Connection: "keep-alive",
});
response.write("\n");
const stopHeartbeat = startSseHeartbeat(response);
const unsubscribe = await this.addSubscriber(sessionKey, (event) => {
response.write(`data: ${JSON.stringify(event)}\n\n`);
});
request.on("close", () => {
stopHeartbeat();
unsubscribe();
});
}
@ -1210,6 +1225,7 @@ export class GatewayRuntime {
"x-vercel-ai-ui-message-stream": "v1",
});
response.write("\n");
const stopHeartbeat = startSseHeartbeat(response);
const listener = createVercelStreamListener(response);
const structuredPartListener =
@ -1231,6 +1247,7 @@ export class GatewayRuntime {
let clientDisconnected = false;
request.on("close", () => {
clientDisconnected = true;
stopHeartbeat();
stopStreaming();
});
@ -1258,6 +1275,7 @@ export class GatewayRuntime {
});
if (!clientDisconnected) {
stopStreaming();
stopHeartbeat();
if (result.ok) {
this.log(`[chat] session=${sessionKey} completed ok`);
finishVercelStream(response, "stop");
@ -1278,6 +1296,7 @@ export class GatewayRuntime {
} catch (error) {
if (!clientDisconnected) {
stopStreaming();
stopHeartbeat();
const message = error instanceof Error ? error.message : String(error);
this.log(`[chat] session=${sessionKey} exception: ${message}`);
errorVercelStream(response, message);

View file

@ -25,7 +25,10 @@ function isTextContent(
function getAssistantTextParts(
event: Extract<AgentSessionEvent, { type: "message_end" }>,
): Array<{ contentIndex: number; text: string }> {
if (event.message.role !== "assistant" || !Array.isArray(event.message.content)) {
if (
event.message.role !== "assistant" ||
!Array.isArray(event.message.content)
) {
return [];
}
@ -338,9 +341,8 @@ export function createGatewayStructuredPartListener(
if (response.writableEnded) return;
if (event.type !== "structured_part") return;
writeChunk(response, {
type: "structured-part",
partType: event.partType,
payload: event.payload,
type: `data-${event.partType}`,
data: event.payload,
});
};
}