sse heartbeat and ui

This commit is contained in:
Harivansh Rathi 2026-03-12 20:21:58 -04:00
parent 970e40a215
commit 124fdce1a0
2 changed files with 25 additions and 4 deletions

View file

@ -60,6 +60,7 @@ export type {
export type { HistoryPart } from "./types.js"; export type { HistoryPart } from "./types.js";
let activeGatewayRuntime: GatewayRuntime | null = null; let activeGatewayRuntime: GatewayRuntime | null = null;
const SSE_HEARTBEAT_MS = 15_000;
type JsonRecord = Record<string, unknown>; type JsonRecord = Record<string, unknown>;
type AssistantAgentMessage = Extract<AgentMessage, { role: "assistant" }>; type AssistantAgentMessage = Extract<AgentMessage, { role: "assistant" }>;
@ -87,6 +88,18 @@ function mergeRecords(base: JsonRecord, overrides: JsonRecord): JsonRecord {
return merged; return merged;
} }
function startSseHeartbeat(response: ServerResponse): () => void {
const timer = setInterval(() => {
if (response.writableEnded) {
clearInterval(timer);
return;
}
response.write(":\n\n");
}, SSE_HEARTBEAT_MS);
timer.unref?.();
return () => clearInterval(timer);
}
function readString(value: unknown): string | undefined { function readString(value: unknown): string | undefined {
if (typeof value !== "string") { if (typeof value !== "string") {
return undefined; return undefined;
@ -1178,11 +1191,13 @@ export class GatewayRuntime {
Connection: "keep-alive", Connection: "keep-alive",
}); });
response.write("\n"); response.write("\n");
const stopHeartbeat = startSseHeartbeat(response);
const unsubscribe = await this.addSubscriber(sessionKey, (event) => { const unsubscribe = await this.addSubscriber(sessionKey, (event) => {
response.write(`data: ${JSON.stringify(event)}\n\n`); response.write(`data: ${JSON.stringify(event)}\n\n`);
}); });
request.on("close", () => { request.on("close", () => {
stopHeartbeat();
unsubscribe(); unsubscribe();
}); });
} }
@ -1210,6 +1225,7 @@ export class GatewayRuntime {
"x-vercel-ai-ui-message-stream": "v1", "x-vercel-ai-ui-message-stream": "v1",
}); });
response.write("\n"); response.write("\n");
const stopHeartbeat = startSseHeartbeat(response);
const listener = createVercelStreamListener(response); const listener = createVercelStreamListener(response);
const structuredPartListener = const structuredPartListener =
@ -1231,6 +1247,7 @@ export class GatewayRuntime {
let clientDisconnected = false; let clientDisconnected = false;
request.on("close", () => { request.on("close", () => {
clientDisconnected = true; clientDisconnected = true;
stopHeartbeat();
stopStreaming(); stopStreaming();
}); });
@ -1258,6 +1275,7 @@ export class GatewayRuntime {
}); });
if (!clientDisconnected) { if (!clientDisconnected) {
stopStreaming(); stopStreaming();
stopHeartbeat();
if (result.ok) { if (result.ok) {
this.log(`[chat] session=${sessionKey} completed ok`); this.log(`[chat] session=${sessionKey} completed ok`);
finishVercelStream(response, "stop"); finishVercelStream(response, "stop");
@ -1278,6 +1296,7 @@ export class GatewayRuntime {
} catch (error) { } catch (error) {
if (!clientDisconnected) { if (!clientDisconnected) {
stopStreaming(); stopStreaming();
stopHeartbeat();
const message = error instanceof Error ? error.message : String(error); const message = error instanceof Error ? error.message : String(error);
this.log(`[chat] session=${sessionKey} exception: ${message}`); this.log(`[chat] session=${sessionKey} exception: ${message}`);
errorVercelStream(response, message); errorVercelStream(response, message);

View file

@ -25,7 +25,10 @@ function isTextContent(
function getAssistantTextParts( function getAssistantTextParts(
event: Extract<AgentSessionEvent, { type: "message_end" }>, event: Extract<AgentSessionEvent, { type: "message_end" }>,
): Array<{ contentIndex: number; text: string }> { ): Array<{ contentIndex: number; text: string }> {
if (event.message.role !== "assistant" || !Array.isArray(event.message.content)) { if (
event.message.role !== "assistant" ||
!Array.isArray(event.message.content)
) {
return []; return [];
} }
@ -338,9 +341,8 @@ export function createGatewayStructuredPartListener(
if (response.writableEnded) return; if (response.writableEnded) return;
if (event.type !== "structured_part") return; if (event.type !== "structured_part") return;
writeChunk(response, { writeChunk(response, {
type: "structured-part", type: `data-${event.partType}`,
partType: event.partType, data: event.payload,
payload: event.payload,
}); });
}; };
} }