refactor: centralize gateway module and address review fixes

Move the gateway runtime files into a dedicated core/gateway module and fix follow-up issues in session deletion, history import batching, message IDs, and legacy thread parsing.

Fixes #253

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Harivansh Rathi 2026-03-07 16:53:36 -08:00
parent 753cb935f1
commit c0bba5c38f
10 changed files with 47 additions and 29 deletions

View file

@ -0,0 +1,29 @@
import type { AgentSession } from "../agent-session.js";
export function extractMessageText(message: { content: unknown }): string {
if (!Array.isArray(message.content)) {
return "";
}
return message.content
.filter((part): part is { type: "text"; text: string } => {
return (
typeof part === "object" &&
part !== null &&
"type" in part &&
"text" in part &&
part.type === "text"
);
})
.map((part) => part.text)
.join("");
}
export function getLastAssistantText(session: AgentSession): string {
for (let index = session.messages.length - 1; index >= 0; index--) {
const message = session.messages[index];
if (message.role === "assistant") {
return extractMessageText(message);
}
}
return "";
}

View file

@ -0,0 +1,19 @@
export {
createGatewaySessionManager,
GatewayRuntime,
getActiveGatewayRuntime,
sanitizeSessionKey,
setActiveGatewayRuntime,
} from "./runtime.js";
export type {
ChannelStatus,
GatewayConfig,
GatewayMessageRequest,
GatewayMessageResult,
GatewayRuntimeOptions,
GatewaySessionFactory,
GatewaySessionSnapshot,
HistoryMessage,
HistoryPart,
ModelInfo,
} from "./runtime.js";

View file

@ -0,0 +1,76 @@
import type { AgentSession } from "../agent-session.js";
import type {
GatewayMessageRequest,
GatewayMessageResult,
GatewaySessionSnapshot,
} from "./types.js";
export interface GatewayQueuedMessage {
request: GatewayMessageRequest;
resolve: (result: GatewayMessageResult) => void;
onStart?: () => void;
onFinish?: () => void;
}
export type GatewayEvent =
| { type: "hello"; sessionKey: string; snapshot: GatewaySessionSnapshot }
| {
type: "session_state";
sessionKey: string;
snapshot: GatewaySessionSnapshot;
}
| { type: "turn_start"; sessionKey: string }
| { type: "turn_end"; sessionKey: string }
| { type: "message_start"; sessionKey: string; role?: string }
| { type: "token"; sessionKey: string; delta: string; contentIndex: number }
| {
type: "thinking";
sessionKey: string;
delta: string;
contentIndex: number;
}
| {
type: "tool_start";
sessionKey: string;
toolCallId: string;
toolName: string;
args: unknown;
}
| {
type: "tool_update";
sessionKey: string;
toolCallId: string;
toolName: string;
partialResult: unknown;
}
| {
type: "tool_complete";
sessionKey: string;
toolCallId: string;
toolName: string;
result: unknown;
isError: boolean;
}
| { type: "message_complete"; sessionKey: string; text: string }
| { type: "error"; sessionKey: string; error: string }
| { type: "aborted"; sessionKey: string };
export interface ManagedGatewaySession {
sessionKey: string;
session: AgentSession;
queue: GatewayQueuedMessage[];
processing: boolean;
createdAt: number;
lastActiveAt: number;
listeners: Set<(event: GatewayEvent) => void>;
unsubscribe: () => void;
}
export class HttpError extends Error {
constructor(
public readonly statusCode: number,
message: string,
) {
super(message);
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,17 @@
import { join } from "node:path";
import { SessionManager } from "../session-manager.js";
export function sanitizeSessionKey(sessionKey: string): string {
return sessionKey.replace(/[^a-zA-Z0-9._-]/g, "_");
}
export function createGatewaySessionManager(
cwd: string,
sessionKey: string,
sessionDirRoot: string,
): SessionManager {
return SessionManager.continueRecent(
cwd,
join(sessionDirRoot, sanitizeSessionKey(sessionKey)),
);
}

View file

@ -0,0 +1,90 @@
import type { ImageContent } from "@mariozechner/pi-ai";
import type { AgentSession } from "../agent-session.js";
export interface GatewayConfig {
bind: string;
port: number;
bearerToken?: string;
session: {
idleMinutes: number;
maxQueuePerSession: number;
};
webhook: {
enabled: boolean;
basePath: string;
secret?: string;
};
}
export type GatewaySessionFactory = (
sessionKey: string,
) => Promise<AgentSession>;
export interface GatewayMessageRequest {
sessionKey: string;
text: string;
source?: "interactive" | "rpc" | "extension";
images?: ImageContent[];
metadata?: Record<string, unknown>;
}
export interface GatewayMessageResult {
ok: boolean;
response: string;
error?: string;
sessionKey: string;
}
export interface GatewaySessionSnapshot {
sessionKey: string;
sessionId: string;
messageCount: number;
queueDepth: number;
processing: boolean;
lastActiveAt: number;
createdAt: number;
name?: string;
lastMessagePreview?: string;
updatedAt: number;
}
export interface ModelInfo {
provider: string;
modelId: string;
displayName: string;
capabilities?: string[];
}
export interface HistoryMessage {
id: string;
role: "user" | "assistant" | "toolResult";
parts: HistoryPart[];
timestamp: number;
}
export type HistoryPart =
| { type: "text"; text: string }
| { type: "reasoning"; text: string }
| {
type: "tool-invocation";
toolCallId: string;
toolName: string;
args: unknown;
state: string;
result?: unknown;
};
export interface ChannelStatus {
id: string;
name: string;
connected: boolean;
error?: string;
}
export interface GatewayRuntimeOptions {
config: GatewayConfig;
primarySessionKey: string;
primarySession: AgentSession;
createSession: GatewaySessionFactory;
log?: (message: string) => void;
}

View file

@ -0,0 +1,205 @@
import { randomUUID } from "node:crypto";
import type { ServerResponse } from "node:http";
import type { AgentSessionEvent } from "../agent-session.js";
/**
* Write a single Vercel AI SDK v5+ SSE chunk to the response.
* Format: `data: <JSON>\n\n`
* For the terminal [DONE] sentinel: `data: [DONE]\n\n`
*/
function writeChunk(response: ServerResponse, chunk: object | string): void {
if (response.writableEnded) return;
const payload = typeof chunk === "string" ? chunk : JSON.stringify(chunk);
response.write(`data: ${payload}\n\n`);
}
/**
* Extract the user's text from the request body.
* Supports both useChat format ({ messages: UIMessage[] }) and simple gateway format ({ text: string }).
*/
export function extractUserText(body: Record<string, unknown>): string | null {
// Simple gateway format
if (typeof body.text === "string" && body.text.trim()) {
return body.text;
}
// Convenience format
if (typeof body.prompt === "string" && body.prompt.trim()) {
return body.prompt;
}
// Vercel AI SDK useChat format - extract last user message
if (Array.isArray(body.messages)) {
for (let i = body.messages.length - 1; i >= 0; i--) {
const msg = body.messages[i] as Record<string, unknown>;
if (msg.role !== "user") continue;
// v5+ format with parts array
if (Array.isArray(msg.parts)) {
for (const part of msg.parts as Array<Record<string, unknown>>) {
if (part.type === "text" && typeof part.text === "string") {
return part.text;
}
}
}
// v4 format with content string
if (typeof msg.content === "string" && msg.content.trim()) {
return msg.content;
}
}
}
return null;
}
/**
* Create an AgentSessionEvent listener that translates events to Vercel AI SDK v5+ SSE
* chunks and writes them to the HTTP response.
*
* Returns the listener function. The caller is responsible for subscribing/unsubscribing.
*/
export function createVercelStreamListener(
response: ServerResponse,
messageId?: string,
): (event: AgentSessionEvent) => void {
// Gate: only forward events within a single prompt's agent_start -> agent_end lifecycle.
// handleChat now subscribes this listener immediately before the queued prompt starts,
// so these guards only need to bound the stream to that prompt's event span.
let active = false;
const msgId = messageId ?? randomUUID();
return (event: AgentSessionEvent) => {
if (response.writableEnded) return;
// Activate on our agent_start, deactivate on agent_end
if (event.type === "agent_start") {
if (!active) {
active = true;
writeChunk(response, { type: "start", messageId: msgId });
}
return;
}
if (event.type === "agent_end") {
active = false;
return;
}
// Drop events that don't belong to our message
if (!active) return;
switch (event.type) {
case "turn_start":
writeChunk(response, { type: "start-step" });
return;
case "message_update": {
const inner = event.assistantMessageEvent;
switch (inner.type) {
case "text_start":
writeChunk(response, {
type: "text-start",
id: `text_${inner.contentIndex}`,
});
return;
case "text_delta":
writeChunk(response, {
type: "text-delta",
id: `text_${inner.contentIndex}`,
delta: inner.delta,
});
return;
case "text_end":
writeChunk(response, {
type: "text-end",
id: `text_${inner.contentIndex}`,
});
return;
case "toolcall_start": {
const content = inner.partial.content[inner.contentIndex];
if (content?.type === "toolCall") {
writeChunk(response, {
type: "tool-input-start",
toolCallId: content.id,
toolName: content.name,
});
}
return;
}
case "toolcall_delta": {
const content = inner.partial.content[inner.contentIndex];
if (content?.type === "toolCall") {
writeChunk(response, {
type: "tool-input-delta",
toolCallId: content.id,
inputTextDelta: inner.delta,
});
}
return;
}
case "toolcall_end":
writeChunk(response, {
type: "tool-input-available",
toolCallId: inner.toolCall.id,
toolName: inner.toolCall.name,
input: inner.toolCall.arguments,
});
return;
case "thinking_start":
writeChunk(response, {
type: "reasoning-start",
id: `reasoning_${inner.contentIndex}`,
});
return;
case "thinking_delta":
writeChunk(response, {
type: "reasoning-delta",
id: `reasoning_${inner.contentIndex}`,
delta: inner.delta,
});
return;
case "thinking_end":
writeChunk(response, {
type: "reasoning-end",
id: `reasoning_${inner.contentIndex}`,
});
return;
}
return;
}
case "turn_end":
writeChunk(response, { type: "finish-step" });
return;
case "tool_execution_end":
writeChunk(response, {
type: "tool-output-available",
toolCallId: event.toolCallId,
output: event.result,
});
return;
}
};
}
/**
* Write the terminal finish sequence and end the response.
*/
export function finishVercelStream(
response: ServerResponse,
finishReason: string = "stop",
): void {
if (response.writableEnded) return;
writeChunk(response, { type: "finish", finishReason });
writeChunk(response, "[DONE]");
response.end();
}
/**
* Write an error chunk and end the response.
*/
export function errorVercelStream(
response: ServerResponse,
errorText: string,
): void {
if (response.writableEnded) return;
writeChunk(response, { type: "error", errorText });
writeChunk(response, "[DONE]");
response.end();
}