feat: persist chat threads and recover gateway sessions

Add a Convex-backed durable chat layer for companion threads, recover gateway sessions from persisted Pi session files after eviction, and start splitting gateway runtime internals into focused modules.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Harivansh Rathi 2026-03-07 16:32:53 -08:00
parent 8ecd55a522
commit 753cb935f1
5 changed files with 265 additions and 212 deletions

View file

@ -0,0 +1,29 @@
import type { AgentSession } from "./agent-session.js";
export function extractMessageText(message: { content: unknown }): string {
if (!Array.isArray(message.content)) {
return "";
}
return message.content
.filter((part): part is { type: "text"; text: string } => {
return (
typeof part === "object" &&
part !== null &&
"type" in part &&
"text" in part &&
part.type === "text"
);
})
.map((part) => part.text)
.join("");
}
export function getLastAssistantText(session: AgentSession): string {
for (let index = session.messages.length - 1; index >= 0; index--) {
const message = session.messages[index];
if (message.role === "assistant") {
return extractMessageText(message);
}
}
return "";
}

View file

@ -0,0 +1,76 @@
import type { AgentSession } from "./agent-session.js";
import type {
GatewayMessageRequest,
GatewayMessageResult,
GatewaySessionSnapshot,
} from "./gateway-runtime-types.js";
export interface GatewayQueuedMessage {
request: GatewayMessageRequest;
resolve: (result: GatewayMessageResult) => void;
onStart?: () => void;
onFinish?: () => void;
}
export type GatewayEvent =
| { type: "hello"; sessionKey: string; snapshot: GatewaySessionSnapshot }
| {
type: "session_state";
sessionKey: string;
snapshot: GatewaySessionSnapshot;
}
| { type: "turn_start"; sessionKey: string }
| { type: "turn_end"; sessionKey: string }
| { type: "message_start"; sessionKey: string; role?: string }
| { type: "token"; sessionKey: string; delta: string; contentIndex: number }
| {
type: "thinking";
sessionKey: string;
delta: string;
contentIndex: number;
}
| {
type: "tool_start";
sessionKey: string;
toolCallId: string;
toolName: string;
args: unknown;
}
| {
type: "tool_update";
sessionKey: string;
toolCallId: string;
toolName: string;
partialResult: unknown;
}
| {
type: "tool_complete";
sessionKey: string;
toolCallId: string;
toolName: string;
result: unknown;
isError: boolean;
}
| { type: "message_complete"; sessionKey: string; text: string }
| { type: "error"; sessionKey: string; error: string }
| { type: "aborted"; sessionKey: string };
export interface ManagedGatewaySession {
sessionKey: string;
session: AgentSession;
queue: GatewayQueuedMessage[];
processing: boolean;
createdAt: number;
lastActiveAt: number;
listeners: Set<(event: GatewayEvent) => void>;
unsubscribe: () => void;
}
export class HttpError extends Error {
constructor(
public readonly statusCode: number,
message: string,
) {
super(message);
}
}

View file

@ -0,0 +1,90 @@
import type { AgentSession } from "./agent-session.js";
import type { ImageContent } from "@mariozechner/pi-ai";
export interface GatewayConfig {
bind: string;
port: number;
bearerToken?: string;
session: {
idleMinutes: number;
maxQueuePerSession: number;
};
webhook: {
enabled: boolean;
basePath: string;
secret?: string;
};
}
export type GatewaySessionFactory = (
sessionKey: string,
) => Promise<AgentSession>;
export interface GatewayMessageRequest {
sessionKey: string;
text: string;
source?: "interactive" | "rpc" | "extension";
images?: ImageContent[];
metadata?: Record<string, unknown>;
}
export interface GatewayMessageResult {
ok: boolean;
response: string;
error?: string;
sessionKey: string;
}
export interface GatewaySessionSnapshot {
sessionKey: string;
sessionId: string;
messageCount: number;
queueDepth: number;
processing: boolean;
lastActiveAt: number;
createdAt: number;
name?: string;
lastMessagePreview?: string;
updatedAt: number;
}
export interface ModelInfo {
provider: string;
modelId: string;
displayName: string;
capabilities?: string[];
}
export interface HistoryMessage {
id: string;
role: "user" | "assistant" | "toolResult";
parts: HistoryPart[];
timestamp: number;
}
export type HistoryPart =
| { type: "text"; text: string }
| { type: "reasoning"; text: string }
| {
type: "tool-invocation";
toolCallId: string;
toolName: string;
args: unknown;
state: string;
result?: unknown;
};
export interface ChannelStatus {
id: string;
name: string;
connected: boolean;
error?: string;
}
export interface GatewayRuntimeOptions {
config: GatewayConfig;
primarySessionKey: string;
primarySession: AgentSession;
createSession: GatewaySessionFactory;
log?: (message: string) => void;
}

View file

@ -4,12 +4,34 @@ import {
type Server,
type ServerResponse,
} from "node:http";
import { rm } from "node:fs/promises";
import { join } from "node:path";
import { URL } from "node:url";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { ImageContent } from "@mariozechner/pi-ai";
import type { AgentSession, AgentSessionEvent } from "./agent-session.js";
import { SessionManager } from "./session-manager.js";
import {
extractMessageText,
getLastAssistantText,
} from "./gateway-runtime-helpers.js";
import {
type GatewayEvent,
type GatewayQueuedMessage,
HttpError,
type ManagedGatewaySession,
} from "./gateway-runtime-internal-types.js";
import { sanitizeSessionKey } from "./gateway-session-manager.js";
import type {
ChannelStatus,
GatewayConfig,
GatewayMessageRequest,
GatewayMessageResult,
GatewayRuntimeOptions,
GatewaySessionFactory,
GatewaySessionSnapshot,
HistoryMessage,
HistoryPart,
ModelInfo,
} from "./gateway-runtime-types.js";
import type { Settings } from "./settings-manager.js";
import {
createVercelStreamListener,
@ -17,164 +39,22 @@ import {
extractUserText,
finishVercelStream,
} from "./vercel-ai-stream.js";
export interface GatewayConfig {
bind: string;
port: number;
bearerToken?: string;
session: {
idleMinutes: number;
maxQueuePerSession: number;
};
webhook: {
enabled: boolean;
basePath: string;
secret?: string;
};
}
export type GatewaySessionFactory = (
sessionKey: string,
) => Promise<AgentSession>;
export interface GatewayMessageRequest {
sessionKey: string;
text: string;
source?: "interactive" | "rpc" | "extension";
images?: ImageContent[];
metadata?: Record<string, unknown>;
}
export interface GatewayMessageResult {
ok: boolean;
response: string;
error?: string;
sessionKey: string;
}
export interface GatewaySessionSnapshot {
sessionKey: string;
sessionId: string;
messageCount: number;
queueDepth: number;
processing: boolean;
lastActiveAt: number;
createdAt: number;
name?: string;
lastMessagePreview?: string;
updatedAt: number;
}
export interface ModelInfo {
provider: string;
modelId: string;
displayName: string;
capabilities?: string[];
}
export interface HistoryMessage {
id: string;
role: "user" | "assistant" | "toolResult";
parts: HistoryPart[];
timestamp: number;
}
export type HistoryPart =
| { type: "text"; text: string }
| { type: "reasoning"; text: string }
| {
type: "tool-invocation";
toolCallId: string;
toolName: string;
args: unknown;
state: string;
result?: unknown;
};
export interface ChannelStatus {
id: string;
name: string;
connected: boolean;
error?: string;
}
export interface GatewayRuntimeOptions {
config: GatewayConfig;
primarySessionKey: string;
primarySession: AgentSession;
createSession: GatewaySessionFactory;
log?: (message: string) => void;
}
interface GatewayQueuedMessage {
request: GatewayMessageRequest;
resolve: (result: GatewayMessageResult) => void;
onStart?: () => void;
onFinish?: () => void;
}
type GatewayEvent =
| { type: "hello"; sessionKey: string; snapshot: GatewaySessionSnapshot }
| {
type: "session_state";
sessionKey: string;
snapshot: GatewaySessionSnapshot;
}
| { type: "turn_start"; sessionKey: string }
| { type: "turn_end"; sessionKey: string }
| { type: "message_start"; sessionKey: string; role?: string }
| { type: "token"; sessionKey: string; delta: string; contentIndex: number }
| {
type: "thinking";
sessionKey: string;
delta: string;
contentIndex: number;
}
| {
type: "tool_start";
sessionKey: string;
toolCallId: string;
toolName: string;
args: unknown;
}
| {
type: "tool_update";
sessionKey: string;
toolCallId: string;
toolName: string;
partialResult: unknown;
}
| {
type: "tool_complete";
sessionKey: string;
toolCallId: string;
toolName: string;
result: unknown;
isError: boolean;
}
| { type: "message_complete"; sessionKey: string; text: string }
| { type: "error"; sessionKey: string; error: string }
| { type: "aborted"; sessionKey: string };
interface ManagedGatewaySession {
sessionKey: string;
session: AgentSession;
queue: GatewayQueuedMessage[];
processing: boolean;
createdAt: number;
lastActiveAt: number;
listeners: Set<(event: GatewayEvent) => void>;
unsubscribe: () => void;
}
class HttpError extends Error {
constructor(
public readonly statusCode: number,
message: string,
) {
super(message);
}
}
export {
createGatewaySessionManager,
sanitizeSessionKey,
} from "./gateway-session-manager.js";
export type {
ChannelStatus,
GatewayConfig,
GatewayMessageRequest,
GatewayMessageResult,
GatewayRuntimeOptions,
GatewaySessionFactory,
GatewaySessionSnapshot,
HistoryMessage,
HistoryPart,
ModelInfo,
} from "./gateway-runtime-types.js";
let activeGatewayRuntime: GatewayRuntime | null = null;
@ -759,7 +639,7 @@ export class GatewayRuntime {
const action = sessionMatch[2];
if (!action && method === "GET") {
const session = this.getManagedSessionOrThrow(sessionKey);
const session = await this.ensureSession(sessionKey);
this.writeJson(response, 200, { session: this.createSnapshot(session) });
return;
}
@ -818,7 +698,7 @@ export class GatewayRuntime {
if (action === "history" && method === "GET") {
const limitParam = url.searchParams.get("limit");
const messages = this.handleGetHistory(
const messages = await this.handleGetHistory(
sessionKey,
limitParam ? parseInt(limitParam, 10) : undefined,
);
@ -1067,7 +947,7 @@ export class GatewayRuntime {
provider: string,
modelId: string,
): Promise<{ ok: true; model: { provider: string; modelId: string } }> {
const managed = this.getManagedSessionOrThrow(sessionKey);
const managed = await this.ensureSession(sessionKey);
const found = managed.session.modelRegistry.find(provider, modelId);
if (!found) {
throw new HttpError(404, `Model not found: ${provider}/${modelId}`);
@ -1076,14 +956,14 @@ export class GatewayRuntime {
return { ok: true, model: { provider, modelId } };
}
private handleGetHistory(
private async handleGetHistory(
sessionKey: string,
limit?: number,
): HistoryMessage[] {
): Promise<HistoryMessage[]> {
if (limit !== undefined && (!Number.isFinite(limit) || limit < 1)) {
throw new HttpError(400, "History limit must be a positive integer");
}
const managed = this.getManagedSessionOrThrow(sessionKey);
const managed = await this.ensureSession(sessionKey);
const rawMessages = managed.session.messages;
const messages: HistoryMessage[] = [];
for (const msg of rawMessages) {
@ -1108,7 +988,7 @@ export class GatewayRuntime {
sessionKey: string,
patch: { name?: string },
): Promise<void> {
const managed = this.getManagedSessionOrThrow(sessionKey);
const managed = await this.ensureSession(sessionKey);
if (patch.name !== undefined) {
// Labels in pi-mono are per-entry; we label the current leaf entry
const leafId = managed.session.sessionManager.getLeafId();
@ -1126,7 +1006,7 @@ export class GatewayRuntime {
if (sessionKey === this.primarySessionKey) {
throw new HttpError(400, "Cannot delete primary session");
}
const managed = this.getManagedSessionOrThrow(sessionKey);
const managed = await this.ensureSession(sessionKey);
if (managed.processing) {
await managed.session.abort();
}
@ -1134,6 +1014,10 @@ export class GatewayRuntime {
managed.unsubscribe();
managed.session.dispose();
this.sessions.delete(sessionKey);
await rm(this.getGatewaySessionDir(sessionKey), {
recursive: true,
force: true,
}).catch(() => undefined);
}
private getPublicConfig(): Record<string, unknown> {
@ -1179,7 +1063,7 @@ export class GatewayRuntime {
}
private async handleReloadSession(sessionKey: string): Promise<void> {
const managed = this.getManagedSessionOrThrow(sessionKey);
const managed = await this.ensureSession(sessionKey);
// Reloading config by calling settingsManager.reload() on the session
managed.session.settingsManager.reload();
}
@ -1269,46 +1153,3 @@ export class GatewayRuntime {
return join(this.sessionDirRoot, sanitizeSessionKey(sessionKey));
}
}
function extractMessageText(message: { content: unknown }): string {
if (!Array.isArray(message.content)) {
return "";
}
return message.content
.filter((part): part is { type: "text"; text: string } => {
return (
typeof part === "object" &&
part !== null &&
"type" in part &&
"text" in part &&
part.type === "text"
);
})
.map((part) => part.text)
.join("");
}
function getLastAssistantText(session: AgentSession): string {
for (let index = session.messages.length - 1; index >= 0; index--) {
const message = session.messages[index];
if (message.role === "assistant") {
return extractMessageText(message);
}
}
return "";
}
export function sanitizeSessionKey(sessionKey: string): string {
return sessionKey.replace(/[^a-zA-Z0-9._-]/g, "_");
}
export function createGatewaySessionManager(
cwd: string,
sessionKey: string,
sessionDirRoot: string,
): SessionManager {
return SessionManager.create(
cwd,
join(sessionDirRoot, sanitizeSessionKey(sessionKey)),
);
}

View file

@ -0,0 +1,17 @@
import { join } from "node:path";
import { SessionManager } from "./session-manager.js";
export function sanitizeSessionKey(sessionKey: string): string {
return sessionKey.replace(/[^a-zA-Z0-9._-]/g, "_");
}
export function createGatewaySessionManager(
cwd: string,
sessionKey: string,
sessionDirRoot: string,
): SessionManager {
return SessionManager.continueRecent(
cwd,
join(sessionDirRoot, sanitizeSessionKey(sessionKey)),
);
}