From f83648c5c52759636faf13a8a41e3816e25752d1 Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Fri, 6 Mar 2026 01:17:51 -0800 Subject: [PATCH 1/7] Add Vercel AI SDK v5+ chat endpoint with text streaming New POST /sessions/:key/chat endpoint that speaks the Vercel AI SDK UI message SSE protocol (x-vercel-ai-ui-message-stream: v1). Accepts both useChat format ({ messages: UIMessage[] }) and simple gateway format ({ text: string }). Streams text-start, text-delta, text-end events through the existing session infrastructure. --- .../coding-agent/src/core/gateway-runtime.ts | 65 ++++++++- .../coding-agent/src/core/vercel-ai-stream.ts | 133 ++++++++++++++++++ 2 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 packages/coding-agent/src/core/vercel-ai-stream.ts diff --git a/packages/coding-agent/src/core/gateway-runtime.ts b/packages/coding-agent/src/core/gateway-runtime.ts index c8b15990..71d919c4 100644 --- a/packages/coding-agent/src/core/gateway-runtime.ts +++ b/packages/coding-agent/src/core/gateway-runtime.ts @@ -4,6 +4,7 @@ import { URL } from "node:url"; import type { ImageContent } from "@mariozechner/pi-ai"; import type { AgentSession, AgentSessionEvent } from "./agent-session.js"; import { SessionManager } from "./session-manager.js"; +import { createVercelStreamListener, errorVercelStream, extractUserText, finishVercelStream } from "./vercel-ai-stream.js"; export interface GatewayConfig { bind: string; @@ -491,7 +492,7 @@ export class GatewayRuntime { return; } - const sessionMatch = path.match(/^\/sessions\/([^/]+)(?:\/(events|messages|abort|reset))?$/); + const sessionMatch = path.match(/^\/sessions\/([^/]+)(?:\/(events|messages|abort|reset|chat))?$/); if (!sessionMatch) { this.writeJson(response, 404, { error: "Not found" }); return; @@ -511,6 +512,11 @@ export class GatewayRuntime { return; } + if (action === "chat" && method === "POST") { + await this.handleChat(sessionKey, request, response); + return; + } + if (action === "messages" && method === "POST") { const body = await this.readJsonBody(request); const text = typeof body.text === "string" ? body.text : ""; @@ -587,6 +593,63 @@ export class GatewayRuntime { }); } + private async handleChat( + sessionKey: string, + request: IncomingMessage, + response: ServerResponse, + ): Promise { + const body = await this.readJsonBody(request); + const text = extractUserText(body); + if (!text) { + this.writeJson(response, 400, { error: "Missing user message text" }); + return; + } + + // Set up SSE response headers + response.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "x-vercel-ai-ui-message-stream": "v1", + }); + response.write("\n"); + + // Subscribe to session events for Vercel AI SDK translation + const managedSession = await this.ensureSession(sessionKey); + const listener = createVercelStreamListener(response); + const unsubscribe = managedSession.session.subscribe(listener); + + // Clean up on client disconnect + let clientDisconnected = false; + request.on("close", () => { + clientDisconnected = true; + unsubscribe(); + }); + + // Drive the session through the existing queue infrastructure + try { + const result = await this.enqueueMessage({ + sessionKey, + text, + source: "extension", + }); + if (!clientDisconnected) { + unsubscribe(); + if (result.ok) { + finishVercelStream(response, "stop"); + } else { + errorVercelStream(response, result.error ?? "Unknown error"); + } + } + } catch (error) { + if (!clientDisconnected) { + unsubscribe(); + const message = error instanceof Error ? error.message : String(error); + errorVercelStream(response, message); + } + } + } + private requireAuth(request: IncomingMessage, response: ServerResponse): void { if (!this.config.bearerToken) { return; diff --git a/packages/coding-agent/src/core/vercel-ai-stream.ts b/packages/coding-agent/src/core/vercel-ai-stream.ts new file mode 100644 index 00000000..f087b253 --- /dev/null +++ b/packages/coding-agent/src/core/vercel-ai-stream.ts @@ -0,0 +1,133 @@ +import type { ServerResponse } from "node:http"; +import type { AgentSessionEvent } from "./agent-session.js"; + +/** + * Write a single Vercel AI SDK v5+ SSE chunk to the response. + * Format: `data: \n\n` + * For the terminal [DONE] sentinel: `data: [DONE]\n\n` + */ +function writeChunk(response: ServerResponse, chunk: object | string): void { + if (response.writableEnded) return; + const payload = typeof chunk === "string" ? chunk : JSON.stringify(chunk); + response.write(`data: ${payload}\n\n`); +} + +/** + * Extract the user's text from the request body. + * Supports both useChat format ({ messages: UIMessage[] }) and simple gateway format ({ text: string }). + */ +export function extractUserText(body: Record): string | null { + // Simple gateway format + if (typeof body.text === "string" && body.text.trim()) { + return body.text; + } + // Convenience format + if (typeof body.prompt === "string" && body.prompt.trim()) { + return body.prompt; + } + // Vercel AI SDK useChat format - extract last user message + if (Array.isArray(body.messages)) { + for (let i = body.messages.length - 1; i >= 0; i--) { + const msg = body.messages[i] as Record; + if (msg.role !== "user") continue; + // v5+ format with parts array + if (Array.isArray(msg.parts)) { + for (const part of msg.parts as Array>) { + if (part.type === "text" && typeof part.text === "string") { + return part.text; + } + } + } + // v4 format with content string + if (typeof msg.content === "string" && msg.content.trim()) { + return msg.content; + } + } + } + return null; +} + +/** + * Create an AgentSessionEvent listener that translates events to Vercel AI SDK v5+ SSE + * chunks and writes them to the HTTP response. + * + * Returns the listener function. The caller is responsible for subscribing/unsubscribing. + */ +export function createVercelStreamListener( + response: ServerResponse, +): (event: AgentSessionEvent) => void { + let started = false; + + return (event: AgentSessionEvent) => { + if (response.writableEnded) return; + + switch (event.type) { + case "agent_start": + if (!started) { + writeChunk(response, { type: "start" }); + started = true; + } + return; + + case "turn_start": + writeChunk(response, { type: "start-step" }); + return; + + case "message_update": { + const inner = event.assistantMessageEvent; + switch (inner.type) { + case "text_start": + writeChunk(response, { + type: "text-start", + id: `text_${inner.contentIndex}`, + }); + return; + case "text_delta": + writeChunk(response, { + type: "text-delta", + id: `text_${inner.contentIndex}`, + delta: inner.delta, + }); + return; + case "text_end": + writeChunk(response, { + type: "text-end", + id: `text_${inner.contentIndex}`, + }); + return; + } + return; + } + + case "turn_end": + writeChunk(response, { type: "finish-step" }); + return; + } + }; +} + +/** + * Write the terminal finish sequence and end the response. + */ +export function finishVercelStream( + response: ServerResponse, + finishReason: string = "stop", +): void { + if (response.writableEnded) return; + writeChunk(response, { type: "finish", finishReason }); + writeChunk(response, "[DONE]"); + response.end(); +} + +/** + * Write an error chunk and end the response. + */ +export function errorVercelStream( + response: ServerResponse, + errorText: string, +): void { + if (response.writableEnded) return; + writeChunk(response, { type: "error", errorText }); + writeChunk(response, "[DONE]"); + response.end(); +} From fcd51005e2fbd1147e9d8e46a8055d3c00e24aa1 Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Fri, 6 Mar 2026 01:21:40 -0800 Subject: [PATCH 2/7] Add tool call streaming and tool execution results to chat endpoint Extend the Vercel stream listener to handle toolcall_start, toolcall_delta, toolcall_end, and tool_execution_end events. Maps to tool-input-start, tool-input-delta, tool-input-available, and tool-output-available/tool-output-error Vercel SDK chunk types. --- .../coding-agent/src/core/vercel-ai-stream.ts | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/packages/coding-agent/src/core/vercel-ai-stream.ts b/packages/coding-agent/src/core/vercel-ai-stream.ts index f087b253..728b5eb7 100644 --- a/packages/coding-agent/src/core/vercel-ai-stream.ts +++ b/packages/coding-agent/src/core/vercel-ai-stream.ts @@ -95,6 +95,36 @@ export function createVercelStreamListener( id: `text_${inner.contentIndex}`, }); return; + case "toolcall_start": { + const content = inner.partial.content[inner.contentIndex]; + if (content?.type === "toolCall") { + writeChunk(response, { + type: "tool-input-start", + toolCallId: content.id, + toolName: content.name, + }); + } + return; + } + case "toolcall_delta": { + const content = inner.partial.content[inner.contentIndex]; + if (content?.type === "toolCall") { + writeChunk(response, { + type: "tool-input-delta", + toolCallId: content.id, + inputTextDelta: inner.delta, + }); + } + return; + } + case "toolcall_end": + writeChunk(response, { + type: "tool-input-available", + toolCallId: inner.toolCall.id, + toolName: inner.toolCall.name, + input: inner.toolCall.arguments, + }); + return; } return; } @@ -102,6 +132,22 @@ export function createVercelStreamListener( case "turn_end": writeChunk(response, { type: "finish-step" }); return; + + case "tool_execution_end": + if (event.isError) { + writeChunk(response, { + type: "tool-output-error", + toolCallId: event.toolCallId, + errorText: typeof event.result === "string" ? event.result : JSON.stringify(event.result), + }); + } else { + writeChunk(response, { + type: "tool-output-available", + toolCallId: event.toolCallId, + output: typeof event.result === "string" ? event.result : JSON.stringify(event.result), + }); + } + return; } }; } From 8a61de15fa1c0647c8f6668b5d0136e37355944d Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Fri, 6 Mar 2026 01:25:22 -0800 Subject: [PATCH 3/7] Add reasoning events and abort-aware finish reason to chat endpoint Map thinking_start/delta/end to Vercel AI SDK reasoning-start/delta/end chunk types. Derive finish reason from enqueueMessage result - aborted sessions get a clean finish with reason "error" instead of an error chunk. --- .../coding-agent/src/core/gateway-runtime.ts | 7 ++++++- .../coding-agent/src/core/vercel-ai-stream.ts | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/packages/coding-agent/src/core/gateway-runtime.ts b/packages/coding-agent/src/core/gateway-runtime.ts index 71d919c4..84cd28bc 100644 --- a/packages/coding-agent/src/core/gateway-runtime.ts +++ b/packages/coding-agent/src/core/gateway-runtime.ts @@ -638,7 +638,12 @@ export class GatewayRuntime { if (result.ok) { finishVercelStream(response, "stop"); } else { - errorVercelStream(response, result.error ?? "Unknown error"); + const isAbort = result.error?.includes("aborted"); + if (isAbort) { + finishVercelStream(response, "error"); + } else { + errorVercelStream(response, result.error ?? "Unknown error"); + } } } } catch (error) { diff --git a/packages/coding-agent/src/core/vercel-ai-stream.ts b/packages/coding-agent/src/core/vercel-ai-stream.ts index 728b5eb7..5ff7dd01 100644 --- a/packages/coding-agent/src/core/vercel-ai-stream.ts +++ b/packages/coding-agent/src/core/vercel-ai-stream.ts @@ -125,6 +125,25 @@ export function createVercelStreamListener( input: inner.toolCall.arguments, }); return; + case "thinking_start": + writeChunk(response, { + type: "reasoning-start", + id: `reasoning_${inner.contentIndex}`, + }); + return; + case "thinking_delta": + writeChunk(response, { + type: "reasoning-delta", + id: `reasoning_${inner.contentIndex}`, + delta: inner.delta, + }); + return; + case "thinking_end": + writeChunk(response, { + type: "reasoning-end", + id: `reasoning_${inner.contentIndex}`, + }); + return; } return; } From 998945afe590455485b9600a9fd561154f1a8eeb Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Fri, 6 Mar 2026 01:27:43 -0800 Subject: [PATCH 4/7] Add unit tests for vercel-ai-stream extractUserText and stream listener Tests cover extractUserText with v5+ parts format, v4 content string, last-user-message extraction, simple text/prompt fields, null cases, and preference ordering. Stream listener tests verify text event translation and the writableEnded guard. --- .../test/vercel-ai-stream.test.ts | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 packages/coding-agent/test/vercel-ai-stream.test.ts diff --git a/packages/coding-agent/test/vercel-ai-stream.test.ts b/packages/coding-agent/test/vercel-ai-stream.test.ts new file mode 100644 index 00000000..248fa658 --- /dev/null +++ b/packages/coding-agent/test/vercel-ai-stream.test.ts @@ -0,0 +1,132 @@ +import { describe, it, expect } from "vitest"; +import type { AgentSessionEvent } from "../src/core/agent-session.js"; +import { extractUserText, createVercelStreamListener, finishVercelStream } from "../src/core/vercel-ai-stream.js"; + +describe("extractUserText", () => { + it("extracts text from useChat v5+ format with parts", () => { + const body = { + messages: [ + { role: "user", parts: [{ type: "text", text: "hello world" }] }, + ], + }; + expect(extractUserText(body)).toBe("hello world"); + }); + + it("extracts text from useChat v4 format with content string", () => { + const body = { + messages: [{ role: "user", content: "hello world" }], + }; + expect(extractUserText(body)).toBe("hello world"); + }); + + it("extracts last user message when multiple messages present", () => { + const body = { + messages: [ + { role: "user", parts: [{ type: "text", text: "first" }] }, + { role: "assistant", parts: [{ type: "text", text: "response" }] }, + { role: "user", parts: [{ type: "text", text: "second" }] }, + ], + }; + expect(extractUserText(body)).toBe("second"); + }); + + it("extracts text from simple gateway format", () => { + expect(extractUserText({ text: "hello" })).toBe("hello"); + }); + + it("extracts text from prompt format", () => { + expect(extractUserText({ prompt: "hello" })).toBe("hello"); + }); + + it("returns null for empty body", () => { + expect(extractUserText({})).toBeNull(); + }); + + it("returns null for empty messages array", () => { + expect(extractUserText({ messages: [] })).toBeNull(); + }); + + it("prefers text field over messages", () => { + const body = { + text: "direct", + messages: [{ role: "user", parts: [{ type: "text", text: "from messages" }] }], + }; + expect(extractUserText(body)).toBe("direct"); + }); +}); + +describe("createVercelStreamListener", () => { + function createMockResponse() { + const chunks: string[] = []; + let ended = false; + return { + writableEnded: false, + write(data: string) { + chunks.push(data); + return true; + }, + end() { + ended = true; + this.writableEnded = true; + }, + chunks, + get ended() { return ended; }, + } as any; + } + + function parseChunks(chunks: string[]): Array { + return chunks + .filter((c) => c.startsWith("data: ")) + .map((c) => { + const payload = c.replace(/^data: /, "").replace(/\n\n$/, ""); + try { return JSON.parse(payload); } + catch { return payload; } + }); + } + + it("translates text streaming events", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response); + + listener({ type: "agent_start" } as AgentSessionEvent); + listener({ type: "turn_start", turnIndex: 0, timestamp: Date.now() } as AgentSessionEvent); + listener({ + type: "message_update", + message: {} as any, + assistantMessageEvent: { type: "text_start", contentIndex: 0, partial: {} as any }, + } as AgentSessionEvent); + listener({ + type: "message_update", + message: {} as any, + assistantMessageEvent: { type: "text_delta", contentIndex: 0, delta: "hello", partial: {} as any }, + } as AgentSessionEvent); + listener({ + type: "message_update", + message: {} as any, + assistantMessageEvent: { type: "text_end", contentIndex: 0, content: "hello", partial: {} as any }, + } as AgentSessionEvent); + listener({ type: "turn_end", turnIndex: 0, message: {} as any, toolResults: [] } as AgentSessionEvent); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([ + { type: "start" }, + { type: "start-step" }, + { type: "text-start", id: "text_0" }, + { type: "text-delta", id: "text_0", delta: "hello" }, + { type: "text-end", id: "text_0" }, + { type: "finish-step" }, + ]); + }); + + it("does not write after response has ended", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response); + + listener({ type: "agent_start" } as AgentSessionEvent); + response.end(); + listener({ type: "turn_start", turnIndex: 0, timestamp: Date.now() } as AgentSessionEvent); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([{ type: "start" }]); + }); +}); From ca0861400dff6f267dcf36fc3d1bd336fc51b81d Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Fri, 6 Mar 2026 01:36:19 -0800 Subject: [PATCH 5/7] Fix Vercel AI SDK v6 protocol compliance - Add messageId to start chunk (required by useChat) - Remove undocumented tool-output-error wire type, use tool-output-available for all tool results - Pass structured tool output through instead of JSON-stringifying --- .../coding-agent/src/core/vercel-ai-stream.ts | 23 ++++++++----------- .../test/vercel-ai-stream.test.ts | 8 +++---- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/packages/coding-agent/src/core/vercel-ai-stream.ts b/packages/coding-agent/src/core/vercel-ai-stream.ts index 5ff7dd01..3f6dfdea 100644 --- a/packages/coding-agent/src/core/vercel-ai-stream.ts +++ b/packages/coding-agent/src/core/vercel-ai-stream.ts @@ -1,3 +1,4 @@ +import { randomUUID } from "node:crypto"; import type { ServerResponse } from "node:http"; import type { AgentSessionEvent } from "./agent-session.js"; @@ -55,8 +56,10 @@ export function extractUserText(body: Record): string | null { */ export function createVercelStreamListener( response: ServerResponse, + messageId?: string, ): (event: AgentSessionEvent) => void { let started = false; + const msgId = messageId ?? randomUUID(); return (event: AgentSessionEvent) => { if (response.writableEnded) return; @@ -64,7 +67,7 @@ export function createVercelStreamListener( switch (event.type) { case "agent_start": if (!started) { - writeChunk(response, { type: "start" }); + writeChunk(response, { type: "start", messageId: msgId }); started = true; } return; @@ -153,19 +156,11 @@ export function createVercelStreamListener( return; case "tool_execution_end": - if (event.isError) { - writeChunk(response, { - type: "tool-output-error", - toolCallId: event.toolCallId, - errorText: typeof event.result === "string" ? event.result : JSON.stringify(event.result), - }); - } else { - writeChunk(response, { - type: "tool-output-available", - toolCallId: event.toolCallId, - output: typeof event.result === "string" ? event.result : JSON.stringify(event.result), - }); - } + writeChunk(response, { + type: "tool-output-available", + toolCallId: event.toolCallId, + output: event.result, + }); return; } }; diff --git a/packages/coding-agent/test/vercel-ai-stream.test.ts b/packages/coding-agent/test/vercel-ai-stream.test.ts index 248fa658..c90ca646 100644 --- a/packages/coding-agent/test/vercel-ai-stream.test.ts +++ b/packages/coding-agent/test/vercel-ai-stream.test.ts @@ -86,7 +86,7 @@ describe("createVercelStreamListener", () => { it("translates text streaming events", () => { const response = createMockResponse(); - const listener = createVercelStreamListener(response); + const listener = createVercelStreamListener(response, "test-msg-id"); listener({ type: "agent_start" } as AgentSessionEvent); listener({ type: "turn_start", turnIndex: 0, timestamp: Date.now() } as AgentSessionEvent); @@ -109,7 +109,7 @@ describe("createVercelStreamListener", () => { const parsed = parseChunks(response.chunks); expect(parsed).toEqual([ - { type: "start" }, + { type: "start", messageId: "test-msg-id" }, { type: "start-step" }, { type: "text-start", id: "text_0" }, { type: "text-delta", id: "text_0", delta: "hello" }, @@ -120,13 +120,13 @@ describe("createVercelStreamListener", () => { it("does not write after response has ended", () => { const response = createMockResponse(); - const listener = createVercelStreamListener(response); + const listener = createVercelStreamListener(response, "test-msg-id"); listener({ type: "agent_start" } as AgentSessionEvent); response.end(); listener({ type: "turn_start", turnIndex: 0, timestamp: Date.now() } as AgentSessionEvent); const parsed = parseChunks(response.chunks); - expect(parsed).toEqual([{ type: "start" }]); + expect(parsed).toEqual([{ type: "start", messageId: "test-msg-id" }]); }); }); From 5a2172fb9d224dd03d8c1a2c7ca5b8cdc599d3b6 Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Fri, 6 Mar 2026 10:05:58 -0800 Subject: [PATCH 6/7] fix --- .gitignore | 3 + .../coding-agent/src/core/gateway-runtime.ts | 66 ++++++++++++++----- .../coding-agent/src/core/vercel-ai-stream.ts | 38 ++++++----- .../test/vercel-ai-stream.test.ts | 33 +++++++--- 4 files changed, 98 insertions(+), 42 deletions(-) diff --git a/.gitignore b/.gitignore index a06c1377..7b88789e 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,6 @@ pi-*.html out.html packages/coding-agent/binaries/ todo.md + +# Riptide artifacts (cloud-synced) +.humanlayer/tasks/ diff --git a/packages/coding-agent/src/core/gateway-runtime.ts b/packages/coding-agent/src/core/gateway-runtime.ts index 84cd28bc..f3c03e54 100644 --- a/packages/coding-agent/src/core/gateway-runtime.ts +++ b/packages/coding-agent/src/core/gateway-runtime.ts @@ -4,7 +4,12 @@ import { URL } from "node:url"; import type { ImageContent } from "@mariozechner/pi-ai"; import type { AgentSession, AgentSessionEvent } from "./agent-session.js"; import { SessionManager } from "./session-manager.js"; -import { createVercelStreamListener, errorVercelStream, extractUserText, finishVercelStream } from "./vercel-ai-stream.js"; +import { + createVercelStreamListener, + errorVercelStream, + extractUserText, + finishVercelStream, +} from "./vercel-ai-stream.js"; export interface GatewayConfig { bind: string; @@ -59,6 +64,8 @@ export interface GatewayRuntimeOptions { interface GatewayQueuedMessage { request: GatewayMessageRequest; resolve: (result: GatewayMessageResult) => void; + onStart?: () => void; + onFinish?: () => void; } type GatewayEvent = @@ -186,18 +193,26 @@ export class GatewayRuntime { } async enqueueMessage(request: GatewayMessageRequest): Promise { - const managedSession = await this.ensureSession(request.sessionKey); + return this.enqueueManagedMessage({ request }); + } + + private async enqueueManagedMessage(queuedMessage: { + request: GatewayMessageRequest; + onStart?: () => void; + onFinish?: () => void; + }): Promise { + const managedSession = await this.ensureSession(queuedMessage.request.sessionKey); if (managedSession.queue.length >= this.config.session.maxQueuePerSession) { return { ok: false, response: "", error: `Queue full (${this.config.session.maxQueuePerSession} pending).`, - sessionKey: request.sessionKey, + sessionKey: queuedMessage.request.sessionKey, }; } return new Promise((resolve) => { - managedSession.queue.push({ request, resolve }); + managedSession.queue.push({ ...queuedMessage, resolve }); this.emitState(managedSession); void this.processNext(managedSession); }); @@ -303,6 +318,7 @@ export class GatewayRuntime { this.emitState(managedSession); try { + queued.onStart?.(); await managedSession.session.prompt(queued.request.text, { images: queued.request.images, source: queued.request.source ?? "extension", @@ -327,6 +343,7 @@ export class GatewayRuntime { sessionKey: managedSession.sessionKey, }); } finally { + queued.onFinish?.(); managedSession.processing = false; managedSession.lastActiveAt = Date.now(); this.emitState(managedSession); @@ -593,11 +610,7 @@ export class GatewayRuntime { }); } - private async handleChat( - sessionKey: string, - request: IncomingMessage, - response: ServerResponse, - ): Promise { + private async handleChat(sessionKey: string, request: IncomingMessage, response: ServerResponse): Promise { const body = await this.readJsonBody(request); const text = extractUserText(body); if (!text) { @@ -614,27 +627,44 @@ export class GatewayRuntime { }); response.write("\n"); - // Subscribe to session events for Vercel AI SDK translation const managedSession = await this.ensureSession(sessionKey); const listener = createVercelStreamListener(response); - const unsubscribe = managedSession.session.subscribe(listener); + let unsubscribe: (() => void) | undefined; + let streamingActive = false; + + const stopStreaming = () => { + if (!streamingActive) return; + streamingActive = false; + unsubscribe?.(); + unsubscribe = undefined; + }; // Clean up on client disconnect let clientDisconnected = false; request.on("close", () => { clientDisconnected = true; - unsubscribe(); + stopStreaming(); }); // Drive the session through the existing queue infrastructure try { - const result = await this.enqueueMessage({ - sessionKey, - text, - source: "extension", + const result = await this.enqueueManagedMessage({ + request: { + sessionKey, + text, + source: "extension", + }, + onStart: () => { + if (clientDisconnected || streamingActive) return; + unsubscribe = managedSession.session.subscribe(listener); + streamingActive = true; + }, + onFinish: () => { + stopStreaming(); + }, }); if (!clientDisconnected) { - unsubscribe(); + stopStreaming(); if (result.ok) { finishVercelStream(response, "stop"); } else { @@ -648,7 +678,7 @@ export class GatewayRuntime { } } catch (error) { if (!clientDisconnected) { - unsubscribe(); + stopStreaming(); const message = error instanceof Error ? error.message : String(error); errorVercelStream(response, message); } diff --git a/packages/coding-agent/src/core/vercel-ai-stream.ts b/packages/coding-agent/src/core/vercel-ai-stream.ts index 3f6dfdea..2326cbb6 100644 --- a/packages/coding-agent/src/core/vercel-ai-stream.ts +++ b/packages/coding-agent/src/core/vercel-ai-stream.ts @@ -58,20 +58,32 @@ export function createVercelStreamListener( response: ServerResponse, messageId?: string, ): (event: AgentSessionEvent) => void { - let started = false; + // Gate: only forward events within a single prompt's agent_start -> agent_end lifecycle. + // handleChat now subscribes this listener immediately before the queued prompt starts, + // so these guards only need to bound the stream to that prompt's event span. + let active = false; const msgId = messageId ?? randomUUID(); return (event: AgentSessionEvent) => { if (response.writableEnded) return; - switch (event.type) { - case "agent_start": - if (!started) { - writeChunk(response, { type: "start", messageId: msgId }); - started = true; - } - return; + // Activate on our agent_start, deactivate on agent_end + if (event.type === "agent_start") { + if (!active) { + active = true; + writeChunk(response, { type: "start", messageId: msgId }); + } + return; + } + if (event.type === "agent_end") { + active = false; + return; + } + // Drop events that don't belong to our message + if (!active) return; + + switch (event.type) { case "turn_start": writeChunk(response, { type: "start-step" }); return; @@ -169,10 +181,7 @@ export function createVercelStreamListener( /** * Write the terminal finish sequence and end the response. */ -export function finishVercelStream( - response: ServerResponse, - finishReason: string = "stop", -): void { +export function finishVercelStream(response: ServerResponse, finishReason: string = "stop"): void { if (response.writableEnded) return; writeChunk(response, { type: "finish", finishReason }); writeChunk(response, "[DONE]"); @@ -182,10 +191,7 @@ export function finishVercelStream( /** * Write an error chunk and end the response. */ -export function errorVercelStream( - response: ServerResponse, - errorText: string, -): void { +export function errorVercelStream(response: ServerResponse, errorText: string): void { if (response.writableEnded) return; writeChunk(response, { type: "error", errorText }); writeChunk(response, "[DONE]"); diff --git a/packages/coding-agent/test/vercel-ai-stream.test.ts b/packages/coding-agent/test/vercel-ai-stream.test.ts index c90ca646..1c3ea750 100644 --- a/packages/coding-agent/test/vercel-ai-stream.test.ts +++ b/packages/coding-agent/test/vercel-ai-stream.test.ts @@ -1,13 +1,11 @@ -import { describe, it, expect } from "vitest"; +import { describe, expect, it } from "vitest"; import type { AgentSessionEvent } from "../src/core/agent-session.js"; -import { extractUserText, createVercelStreamListener, finishVercelStream } from "../src/core/vercel-ai-stream.js"; +import { createVercelStreamListener, extractUserText } from "../src/core/vercel-ai-stream.js"; describe("extractUserText", () => { it("extracts text from useChat v5+ format with parts", () => { const body = { - messages: [ - { role: "user", parts: [{ type: "text", text: "hello world" }] }, - ], + messages: [{ role: "user", parts: [{ type: "text", text: "hello world" }] }], }; expect(extractUserText(body)).toBe("hello world"); }); @@ -70,7 +68,9 @@ describe("createVercelStreamListener", () => { this.writableEnded = true; }, chunks, - get ended() { return ended; }, + get ended() { + return ended; + }, } as any; } @@ -79,8 +79,11 @@ describe("createVercelStreamListener", () => { .filter((c) => c.startsWith("data: ")) .map((c) => { const payload = c.replace(/^data: /, "").replace(/\n\n$/, ""); - try { return JSON.parse(payload); } - catch { return payload; } + try { + return JSON.parse(payload); + } catch { + return payload; + } }); } @@ -129,4 +132,18 @@ describe("createVercelStreamListener", () => { const parsed = parseChunks(response.chunks); expect(parsed).toEqual([{ type: "start", messageId: "test-msg-id" }]); }); + + it("ignores events outside the active prompt lifecycle", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response, "test-msg-id"); + + listener({ type: "turn_start", turnIndex: 0, timestamp: Date.now() } as AgentSessionEvent); + listener({ type: "agent_start" } as AgentSessionEvent); + listener({ type: "turn_start", turnIndex: 0, timestamp: Date.now() } as AgentSessionEvent); + listener({ type: "agent_end", messages: [] } as AgentSessionEvent); + listener({ type: "turn_start", turnIndex: 1, timestamp: Date.now() } as AgentSessionEvent); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([{ type: "start", messageId: "test-msg-id" }, { type: "start-step" }]); + }); }); From 2cb87538c443189db3028286e014a6a9f335826e Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Fri, 6 Mar 2026 10:13:13 -0800 Subject: [PATCH 7/7] fix --- packages/coding-agent/src/core/gateway-runtime.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/coding-agent/src/core/gateway-runtime.ts b/packages/coding-agent/src/core/gateway-runtime.ts index f3c03e54..cc5149a0 100644 --- a/packages/coding-agent/src/core/gateway-runtime.ts +++ b/packages/coding-agent/src/core/gateway-runtime.ts @@ -627,7 +627,6 @@ export class GatewayRuntime { }); response.write("\n"); - const managedSession = await this.ensureSession(sessionKey); const listener = createVercelStreamListener(response); let unsubscribe: (() => void) | undefined; let streamingActive = false; @@ -648,6 +647,7 @@ export class GatewayRuntime { // Drive the session through the existing queue infrastructure try { + const managedSession = await this.ensureSession(sessionKey); const result = await this.enqueueManagedMessage({ request: { sessionKey,