diff --git a/packages/coding-agent/src/core/gateway-runtime.ts b/packages/coding-agent/src/core/gateway-runtime.ts index c8b15990..71d919c4 100644 --- a/packages/coding-agent/src/core/gateway-runtime.ts +++ b/packages/coding-agent/src/core/gateway-runtime.ts @@ -4,6 +4,7 @@ import { URL } from "node:url"; import type { ImageContent } from "@mariozechner/pi-ai"; import type { AgentSession, AgentSessionEvent } from "./agent-session.js"; import { SessionManager } from "./session-manager.js"; +import { createVercelStreamListener, errorVercelStream, extractUserText, finishVercelStream } from "./vercel-ai-stream.js"; export interface GatewayConfig { bind: string; @@ -491,7 +492,7 @@ export class GatewayRuntime { return; } - const sessionMatch = path.match(/^\/sessions\/([^/]+)(?:\/(events|messages|abort|reset))?$/); + const sessionMatch = path.match(/^\/sessions\/([^/]+)(?:\/(events|messages|abort|reset|chat))?$/); if (!sessionMatch) { this.writeJson(response, 404, { error: "Not found" }); return; @@ -511,6 +512,11 @@ export class GatewayRuntime { return; } + if (action === "chat" && method === "POST") { + await this.handleChat(sessionKey, request, response); + return; + } + if (action === "messages" && method === "POST") { const body = await this.readJsonBody(request); const text = typeof body.text === "string" ? body.text : ""; @@ -587,6 +593,63 @@ export class GatewayRuntime { }); } + private async handleChat( + sessionKey: string, + request: IncomingMessage, + response: ServerResponse, + ): Promise { + const body = await this.readJsonBody(request); + const text = extractUserText(body); + if (!text) { + this.writeJson(response, 400, { error: "Missing user message text" }); + return; + } + + // Set up SSE response headers + response.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "x-vercel-ai-ui-message-stream": "v1", + }); + response.write("\n"); + + // Subscribe to session events for Vercel AI SDK translation + const managedSession = await this.ensureSession(sessionKey); + const listener = createVercelStreamListener(response); + const unsubscribe = managedSession.session.subscribe(listener); + + // Clean up on client disconnect + let clientDisconnected = false; + request.on("close", () => { + clientDisconnected = true; + unsubscribe(); + }); + + // Drive the session through the existing queue infrastructure + try { + const result = await this.enqueueMessage({ + sessionKey, + text, + source: "extension", + }); + if (!clientDisconnected) { + unsubscribe(); + if (result.ok) { + finishVercelStream(response, "stop"); + } else { + errorVercelStream(response, result.error ?? "Unknown error"); + } + } + } catch (error) { + if (!clientDisconnected) { + unsubscribe(); + const message = error instanceof Error ? error.message : String(error); + errorVercelStream(response, message); + } + } + } + private requireAuth(request: IncomingMessage, response: ServerResponse): void { if (!this.config.bearerToken) { return; diff --git a/packages/coding-agent/src/core/vercel-ai-stream.ts b/packages/coding-agent/src/core/vercel-ai-stream.ts new file mode 100644 index 00000000..f087b253 --- /dev/null +++ b/packages/coding-agent/src/core/vercel-ai-stream.ts @@ -0,0 +1,133 @@ +import type { ServerResponse } from "node:http"; +import type { AgentSessionEvent } from "./agent-session.js"; + +/** + * Write a single Vercel AI SDK v5+ SSE chunk to the response. + * Format: `data: \n\n` + * For the terminal [DONE] sentinel: `data: [DONE]\n\n` + */ +function writeChunk(response: ServerResponse, chunk: object | string): void { + if (response.writableEnded) return; + const payload = typeof chunk === "string" ? chunk : JSON.stringify(chunk); + response.write(`data: ${payload}\n\n`); +} + +/** + * Extract the user's text from the request body. + * Supports both useChat format ({ messages: UIMessage[] }) and simple gateway format ({ text: string }). + */ +export function extractUserText(body: Record): string | null { + // Simple gateway format + if (typeof body.text === "string" && body.text.trim()) { + return body.text; + } + // Convenience format + if (typeof body.prompt === "string" && body.prompt.trim()) { + return body.prompt; + } + // Vercel AI SDK useChat format - extract last user message + if (Array.isArray(body.messages)) { + for (let i = body.messages.length - 1; i >= 0; i--) { + const msg = body.messages[i] as Record; + if (msg.role !== "user") continue; + // v5+ format with parts array + if (Array.isArray(msg.parts)) { + for (const part of msg.parts as Array>) { + if (part.type === "text" && typeof part.text === "string") { + return part.text; + } + } + } + // v4 format with content string + if (typeof msg.content === "string" && msg.content.trim()) { + return msg.content; + } + } + } + return null; +} + +/** + * Create an AgentSessionEvent listener that translates events to Vercel AI SDK v5+ SSE + * chunks and writes them to the HTTP response. + * + * Returns the listener function. The caller is responsible for subscribing/unsubscribing. + */ +export function createVercelStreamListener( + response: ServerResponse, +): (event: AgentSessionEvent) => void { + let started = false; + + return (event: AgentSessionEvent) => { + if (response.writableEnded) return; + + switch (event.type) { + case "agent_start": + if (!started) { + writeChunk(response, { type: "start" }); + started = true; + } + return; + + case "turn_start": + writeChunk(response, { type: "start-step" }); + return; + + case "message_update": { + const inner = event.assistantMessageEvent; + switch (inner.type) { + case "text_start": + writeChunk(response, { + type: "text-start", + id: `text_${inner.contentIndex}`, + }); + return; + case "text_delta": + writeChunk(response, { + type: "text-delta", + id: `text_${inner.contentIndex}`, + delta: inner.delta, + }); + return; + case "text_end": + writeChunk(response, { + type: "text-end", + id: `text_${inner.contentIndex}`, + }); + return; + } + return; + } + + case "turn_end": + writeChunk(response, { type: "finish-step" }); + return; + } + }; +} + +/** + * Write the terminal finish sequence and end the response. + */ +export function finishVercelStream( + response: ServerResponse, + finishReason: string = "stop", +): void { + if (response.writableEnded) return; + writeChunk(response, { type: "finish", finishReason }); + writeChunk(response, "[DONE]"); + response.end(); +} + +/** + * Write an error chunk and end the response. + */ +export function errorVercelStream( + response: ServerResponse, + errorText: string, +): void { + if (response.writableEnded) return; + writeChunk(response, { type: "error", errorText }); + writeChunk(response, "[DONE]"); + response.end(); +}