From 124fdce1a0e13cf0b079666905f6572e7fcfbb58 Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Thu, 12 Mar 2026 20:21:58 -0400 Subject: [PATCH] sse heartbeat and ui --- .../coding-agent/src/core/gateway/runtime.ts | 19 +++++++++++++++++++ .../src/core/gateway/vercel-ai-stream.ts | 10 ++++++---- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/packages/coding-agent/src/core/gateway/runtime.ts b/packages/coding-agent/src/core/gateway/runtime.ts index bb1b45a..81ec27b 100644 --- a/packages/coding-agent/src/core/gateway/runtime.ts +++ b/packages/coding-agent/src/core/gateway/runtime.ts @@ -60,6 +60,7 @@ export type { export type { HistoryPart } from "./types.js"; let activeGatewayRuntime: GatewayRuntime | null = null; +const SSE_HEARTBEAT_MS = 15_000; type JsonRecord = Record; type AssistantAgentMessage = Extract; @@ -87,6 +88,18 @@ function mergeRecords(base: JsonRecord, overrides: JsonRecord): JsonRecord { return merged; } +function startSseHeartbeat(response: ServerResponse): () => void { + const timer = setInterval(() => { + if (response.writableEnded) { + clearInterval(timer); + return; + } + response.write(":\n\n"); + }, SSE_HEARTBEAT_MS); + timer.unref?.(); + return () => clearInterval(timer); +} + function readString(value: unknown): string | undefined { if (typeof value !== "string") { return undefined; @@ -1178,11 +1191,13 @@ export class GatewayRuntime { Connection: "keep-alive", }); response.write("\n"); + const stopHeartbeat = startSseHeartbeat(response); const unsubscribe = await this.addSubscriber(sessionKey, (event) => { response.write(`data: ${JSON.stringify(event)}\n\n`); }); request.on("close", () => { + stopHeartbeat(); unsubscribe(); }); } @@ -1210,6 +1225,7 @@ export class GatewayRuntime { "x-vercel-ai-ui-message-stream": "v1", }); response.write("\n"); + const stopHeartbeat = startSseHeartbeat(response); const listener = createVercelStreamListener(response); const structuredPartListener = @@ -1231,6 +1247,7 @@ export class GatewayRuntime { let clientDisconnected = false; request.on("close", () => { clientDisconnected = true; + stopHeartbeat(); stopStreaming(); }); @@ -1258,6 +1275,7 @@ export class GatewayRuntime { }); if (!clientDisconnected) { stopStreaming(); + stopHeartbeat(); if (result.ok) { this.log(`[chat] session=${sessionKey} completed ok`); finishVercelStream(response, "stop"); @@ -1278,6 +1296,7 @@ export class GatewayRuntime { } catch (error) { if (!clientDisconnected) { stopStreaming(); + stopHeartbeat(); const message = error instanceof Error ? error.message : String(error); this.log(`[chat] session=${sessionKey} exception: ${message}`); errorVercelStream(response, message); diff --git a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts index a1a7262..31d930c 100644 --- a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts +++ b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts @@ -25,7 +25,10 @@ function isTextContent( function getAssistantTextParts( event: Extract, ): Array<{ contentIndex: number; text: string }> { - if (event.message.role !== "assistant" || !Array.isArray(event.message.content)) { + if ( + event.message.role !== "assistant" || + !Array.isArray(event.message.content) + ) { return []; } @@ -338,9 +341,8 @@ export function createGatewayStructuredPartListener( if (response.writableEnded) return; if (event.type !== "structured_part") return; writeChunk(response, { - type: "structured-part", - partType: event.partType, - payload: event.payload, + type: `data-${event.partType}`, + data: event.payload, }); }; }