Add Vercel AI SDK v5+ chat endpoint with text streaming

New POST /sessions/:key/chat endpoint that speaks the Vercel AI SDK
UI message SSE protocol (x-vercel-ai-ui-message-stream: v1). Accepts
both useChat format ({ messages: UIMessage[] }) and simple gateway
format ({ text: string }). Streams text-start, text-delta, text-end
events through the existing session infrastructure.
This commit is contained in:
Harivansh Rathi 2026-03-06 01:17:51 -08:00
parent 52211fa3d2
commit f83648c5c5
2 changed files with 197 additions and 1 deletions

View file

@ -4,6 +4,7 @@ import { URL } from "node:url";
import type { ImageContent } from "@mariozechner/pi-ai";
import type { AgentSession, AgentSessionEvent } from "./agent-session.js";
import { SessionManager } from "./session-manager.js";
import { createVercelStreamListener, errorVercelStream, extractUserText, finishVercelStream } from "./vercel-ai-stream.js";
export interface GatewayConfig {
bind: string;
@ -491,7 +492,7 @@ export class GatewayRuntime {
return;
}
const sessionMatch = path.match(/^\/sessions\/([^/]+)(?:\/(events|messages|abort|reset))?$/);
const sessionMatch = path.match(/^\/sessions\/([^/]+)(?:\/(events|messages|abort|reset|chat))?$/);
if (!sessionMatch) {
this.writeJson(response, 404, { error: "Not found" });
return;
@ -511,6 +512,11 @@ export class GatewayRuntime {
return;
}
if (action === "chat" && method === "POST") {
await this.handleChat(sessionKey, request, response);
return;
}
if (action === "messages" && method === "POST") {
const body = await this.readJsonBody(request);
const text = typeof body.text === "string" ? body.text : "";
@ -587,6 +593,63 @@ export class GatewayRuntime {
});
}
private async handleChat(
sessionKey: string,
request: IncomingMessage,
response: ServerResponse,
): Promise<void> {
const body = await this.readJsonBody(request);
const text = extractUserText(body);
if (!text) {
this.writeJson(response, 400, { error: "Missing user message text" });
return;
}
// Set up SSE response headers
response.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"x-vercel-ai-ui-message-stream": "v1",
});
response.write("\n");
// Subscribe to session events for Vercel AI SDK translation
const managedSession = await this.ensureSession(sessionKey);
const listener = createVercelStreamListener(response);
const unsubscribe = managedSession.session.subscribe(listener);
// Clean up on client disconnect
let clientDisconnected = false;
request.on("close", () => {
clientDisconnected = true;
unsubscribe();
});
// Drive the session through the existing queue infrastructure
try {
const result = await this.enqueueMessage({
sessionKey,
text,
source: "extension",
});
if (!clientDisconnected) {
unsubscribe();
if (result.ok) {
finishVercelStream(response, "stop");
} else {
errorVercelStream(response, result.error ?? "Unknown error");
}
}
} catch (error) {
if (!clientDisconnected) {
unsubscribe();
const message = error instanceof Error ? error.message : String(error);
errorVercelStream(response, message);
}
}
}
private requireAuth(request: IncomingMessage, response: ServerResponse): void {
if (!this.config.bearerToken) {
return;

View file

@ -0,0 +1,133 @@
import type { ServerResponse } from "node:http";
import type { AgentSessionEvent } from "./agent-session.js";
/**
* Write a single Vercel AI SDK v5+ SSE chunk to the response.
* Format: `data: <JSON>\n\n`
* For the terminal [DONE] sentinel: `data: [DONE]\n\n`
*/
function writeChunk(response: ServerResponse, chunk: object | string): void {
if (response.writableEnded) return;
const payload = typeof chunk === "string" ? chunk : JSON.stringify(chunk);
response.write(`data: ${payload}\n\n`);
}
/**
* Extract the user's text from the request body.
* Supports both useChat format ({ messages: UIMessage[] }) and simple gateway format ({ text: string }).
*/
export function extractUserText(body: Record<string, unknown>): string | null {
// Simple gateway format
if (typeof body.text === "string" && body.text.trim()) {
return body.text;
}
// Convenience format
if (typeof body.prompt === "string" && body.prompt.trim()) {
return body.prompt;
}
// Vercel AI SDK useChat format - extract last user message
if (Array.isArray(body.messages)) {
for (let i = body.messages.length - 1; i >= 0; i--) {
const msg = body.messages[i] as Record<string, unknown>;
if (msg.role !== "user") continue;
// v5+ format with parts array
if (Array.isArray(msg.parts)) {
for (const part of msg.parts as Array<Record<string, unknown>>) {
if (part.type === "text" && typeof part.text === "string") {
return part.text;
}
}
}
// v4 format with content string
if (typeof msg.content === "string" && msg.content.trim()) {
return msg.content;
}
}
}
return null;
}
/**
* Create an AgentSessionEvent listener that translates events to Vercel AI SDK v5+ SSE
* chunks and writes them to the HTTP response.
*
* Returns the listener function. The caller is responsible for subscribing/unsubscribing.
*/
export function createVercelStreamListener(
response: ServerResponse,
): (event: AgentSessionEvent) => void {
let started = false;
return (event: AgentSessionEvent) => {
if (response.writableEnded) return;
switch (event.type) {
case "agent_start":
if (!started) {
writeChunk(response, { type: "start" });
started = true;
}
return;
case "turn_start":
writeChunk(response, { type: "start-step" });
return;
case "message_update": {
const inner = event.assistantMessageEvent;
switch (inner.type) {
case "text_start":
writeChunk(response, {
type: "text-start",
id: `text_${inner.contentIndex}`,
});
return;
case "text_delta":
writeChunk(response, {
type: "text-delta",
id: `text_${inner.contentIndex}`,
delta: inner.delta,
});
return;
case "text_end":
writeChunk(response, {
type: "text-end",
id: `text_${inner.contentIndex}`,
});
return;
}
return;
}
case "turn_end":
writeChunk(response, { type: "finish-step" });
return;
}
};
}
/**
* Write the terminal finish sequence and end the response.
*/
export function finishVercelStream(
response: ServerResponse,
finishReason: string = "stop",
): void {
if (response.writableEnded) return;
writeChunk(response, { type: "finish", finishReason });
writeChunk(response, "[DONE]");
response.end();
}
/**
* Write an error chunk and end the response.
*/
export function errorVercelStream(
response: ServerResponse,
errorText: string,
): void {
if (response.writableEnded) return;
writeChunk(response, { type: "error", errorText });
writeChunk(response, "[DONE]");
response.end();
}