diff --git a/packages/coding-agent/src/core/gateway-runtime-helpers.ts b/packages/coding-agent/src/core/gateway-runtime-helpers.ts new file mode 100644 index 0000000..9d72f3d --- /dev/null +++ b/packages/coding-agent/src/core/gateway-runtime-helpers.ts @@ -0,0 +1,29 @@ +import type { AgentSession } from "./agent-session.js"; + +export function extractMessageText(message: { content: unknown }): string { + if (!Array.isArray(message.content)) { + return ""; + } + return message.content + .filter((part): part is { type: "text"; text: string } => { + return ( + typeof part === "object" && + part !== null && + "type" in part && + "text" in part && + part.type === "text" + ); + }) + .map((part) => part.text) + .join(""); +} + +export function getLastAssistantText(session: AgentSession): string { + for (let index = session.messages.length - 1; index >= 0; index--) { + const message = session.messages[index]; + if (message.role === "assistant") { + return extractMessageText(message); + } + } + return ""; +} diff --git a/packages/coding-agent/src/core/gateway-runtime-internal-types.ts b/packages/coding-agent/src/core/gateway-runtime-internal-types.ts new file mode 100644 index 0000000..0c08955 --- /dev/null +++ b/packages/coding-agent/src/core/gateway-runtime-internal-types.ts @@ -0,0 +1,76 @@ +import type { AgentSession } from "./agent-session.js"; +import type { + GatewayMessageRequest, + GatewayMessageResult, + GatewaySessionSnapshot, +} from "./gateway-runtime-types.js"; + +export interface GatewayQueuedMessage { + request: GatewayMessageRequest; + resolve: (result: GatewayMessageResult) => void; + onStart?: () => void; + onFinish?: () => void; +} + +export type GatewayEvent = + | { type: "hello"; sessionKey: string; snapshot: GatewaySessionSnapshot } + | { + type: "session_state"; + sessionKey: string; + snapshot: GatewaySessionSnapshot; + } + | { type: "turn_start"; sessionKey: string } + | { type: "turn_end"; sessionKey: string } + | { type: "message_start"; sessionKey: string; role?: string } + | { type: "token"; sessionKey: string; delta: string; contentIndex: number } + | { + type: "thinking"; + sessionKey: string; + delta: string; + contentIndex: number; + } + | { + type: "tool_start"; + sessionKey: string; + toolCallId: string; + toolName: string; + args: unknown; + } + | { + type: "tool_update"; + sessionKey: string; + toolCallId: string; + toolName: string; + partialResult: unknown; + } + | { + type: "tool_complete"; + sessionKey: string; + toolCallId: string; + toolName: string; + result: unknown; + isError: boolean; + } + | { type: "message_complete"; sessionKey: string; text: string } + | { type: "error"; sessionKey: string; error: string } + | { type: "aborted"; sessionKey: string }; + +export interface ManagedGatewaySession { + sessionKey: string; + session: AgentSession; + queue: GatewayQueuedMessage[]; + processing: boolean; + createdAt: number; + lastActiveAt: number; + listeners: Set<(event: GatewayEvent) => void>; + unsubscribe: () => void; +} + +export class HttpError extends Error { + constructor( + public readonly statusCode: number, + message: string, + ) { + super(message); + } +} diff --git a/packages/coding-agent/src/core/gateway-runtime-types.ts b/packages/coding-agent/src/core/gateway-runtime-types.ts new file mode 100644 index 0000000..26ee173 --- /dev/null +++ b/packages/coding-agent/src/core/gateway-runtime-types.ts @@ -0,0 +1,90 @@ +import type { AgentSession } from "./agent-session.js"; +import type { ImageContent } from "@mariozechner/pi-ai"; + +export interface GatewayConfig { + bind: string; + port: number; + bearerToken?: string; + session: { + idleMinutes: number; + maxQueuePerSession: number; + }; + webhook: { + enabled: boolean; + basePath: string; + secret?: string; + }; +} + +export type GatewaySessionFactory = ( + sessionKey: string, +) => Promise; + +export interface GatewayMessageRequest { + sessionKey: string; + text: string; + source?: "interactive" | "rpc" | "extension"; + images?: ImageContent[]; + metadata?: Record; +} + +export interface GatewayMessageResult { + ok: boolean; + response: string; + error?: string; + sessionKey: string; +} + +export interface GatewaySessionSnapshot { + sessionKey: string; + sessionId: string; + messageCount: number; + queueDepth: number; + processing: boolean; + lastActiveAt: number; + createdAt: number; + name?: string; + lastMessagePreview?: string; + updatedAt: number; +} + +export interface ModelInfo { + provider: string; + modelId: string; + displayName: string; + capabilities?: string[]; +} + +export interface HistoryMessage { + id: string; + role: "user" | "assistant" | "toolResult"; + parts: HistoryPart[]; + timestamp: number; +} + +export type HistoryPart = + | { type: "text"; text: string } + | { type: "reasoning"; text: string } + | { + type: "tool-invocation"; + toolCallId: string; + toolName: string; + args: unknown; + state: string; + result?: unknown; + }; + +export interface ChannelStatus { + id: string; + name: string; + connected: boolean; + error?: string; +} + +export interface GatewayRuntimeOptions { + config: GatewayConfig; + primarySessionKey: string; + primarySession: AgentSession; + createSession: GatewaySessionFactory; + log?: (message: string) => void; +} diff --git a/packages/coding-agent/src/core/gateway-runtime.ts b/packages/coding-agent/src/core/gateway-runtime.ts index 8cd6579..3b2c53d 100644 --- a/packages/coding-agent/src/core/gateway-runtime.ts +++ b/packages/coding-agent/src/core/gateway-runtime.ts @@ -4,12 +4,34 @@ import { type Server, type ServerResponse, } from "node:http"; +import { rm } from "node:fs/promises"; import { join } from "node:path"; import { URL } from "node:url"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; -import type { ImageContent } from "@mariozechner/pi-ai"; import type { AgentSession, AgentSessionEvent } from "./agent-session.js"; -import { SessionManager } from "./session-manager.js"; +import { + extractMessageText, + getLastAssistantText, +} from "./gateway-runtime-helpers.js"; +import { + type GatewayEvent, + type GatewayQueuedMessage, + HttpError, + type ManagedGatewaySession, +} from "./gateway-runtime-internal-types.js"; +import { sanitizeSessionKey } from "./gateway-session-manager.js"; +import type { + ChannelStatus, + GatewayConfig, + GatewayMessageRequest, + GatewayMessageResult, + GatewayRuntimeOptions, + GatewaySessionFactory, + GatewaySessionSnapshot, + HistoryMessage, + HistoryPart, + ModelInfo, +} from "./gateway-runtime-types.js"; import type { Settings } from "./settings-manager.js"; import { createVercelStreamListener, @@ -17,164 +39,22 @@ import { extractUserText, finishVercelStream, } from "./vercel-ai-stream.js"; - -export interface GatewayConfig { - bind: string; - port: number; - bearerToken?: string; - session: { - idleMinutes: number; - maxQueuePerSession: number; - }; - webhook: { - enabled: boolean; - basePath: string; - secret?: string; - }; -} - -export type GatewaySessionFactory = ( - sessionKey: string, -) => Promise; - -export interface GatewayMessageRequest { - sessionKey: string; - text: string; - source?: "interactive" | "rpc" | "extension"; - images?: ImageContent[]; - metadata?: Record; -} - -export interface GatewayMessageResult { - ok: boolean; - response: string; - error?: string; - sessionKey: string; -} - -export interface GatewaySessionSnapshot { - sessionKey: string; - sessionId: string; - messageCount: number; - queueDepth: number; - processing: boolean; - lastActiveAt: number; - createdAt: number; - name?: string; - lastMessagePreview?: string; - updatedAt: number; -} - -export interface ModelInfo { - provider: string; - modelId: string; - displayName: string; - capabilities?: string[]; -} - -export interface HistoryMessage { - id: string; - role: "user" | "assistant" | "toolResult"; - parts: HistoryPart[]; - timestamp: number; -} - -export type HistoryPart = - | { type: "text"; text: string } - | { type: "reasoning"; text: string } - | { - type: "tool-invocation"; - toolCallId: string; - toolName: string; - args: unknown; - state: string; - result?: unknown; - }; - -export interface ChannelStatus { - id: string; - name: string; - connected: boolean; - error?: string; -} - -export interface GatewayRuntimeOptions { - config: GatewayConfig; - primarySessionKey: string; - primarySession: AgentSession; - createSession: GatewaySessionFactory; - log?: (message: string) => void; -} - -interface GatewayQueuedMessage { - request: GatewayMessageRequest; - resolve: (result: GatewayMessageResult) => void; - onStart?: () => void; - onFinish?: () => void; -} - -type GatewayEvent = - | { type: "hello"; sessionKey: string; snapshot: GatewaySessionSnapshot } - | { - type: "session_state"; - sessionKey: string; - snapshot: GatewaySessionSnapshot; - } - | { type: "turn_start"; sessionKey: string } - | { type: "turn_end"; sessionKey: string } - | { type: "message_start"; sessionKey: string; role?: string } - | { type: "token"; sessionKey: string; delta: string; contentIndex: number } - | { - type: "thinking"; - sessionKey: string; - delta: string; - contentIndex: number; - } - | { - type: "tool_start"; - sessionKey: string; - toolCallId: string; - toolName: string; - args: unknown; - } - | { - type: "tool_update"; - sessionKey: string; - toolCallId: string; - toolName: string; - partialResult: unknown; - } - | { - type: "tool_complete"; - sessionKey: string; - toolCallId: string; - toolName: string; - result: unknown; - isError: boolean; - } - | { type: "message_complete"; sessionKey: string; text: string } - | { type: "error"; sessionKey: string; error: string } - | { type: "aborted"; sessionKey: string }; - -interface ManagedGatewaySession { - sessionKey: string; - session: AgentSession; - queue: GatewayQueuedMessage[]; - processing: boolean; - createdAt: number; - lastActiveAt: number; - listeners: Set<(event: GatewayEvent) => void>; - unsubscribe: () => void; -} - -class HttpError extends Error { - constructor( - public readonly statusCode: number, - message: string, - ) { - super(message); - } -} +export { + createGatewaySessionManager, + sanitizeSessionKey, +} from "./gateway-session-manager.js"; +export type { + ChannelStatus, + GatewayConfig, + GatewayMessageRequest, + GatewayMessageResult, + GatewayRuntimeOptions, + GatewaySessionFactory, + GatewaySessionSnapshot, + HistoryMessage, + HistoryPart, + ModelInfo, +} from "./gateway-runtime-types.js"; let activeGatewayRuntime: GatewayRuntime | null = null; @@ -759,7 +639,7 @@ export class GatewayRuntime { const action = sessionMatch[2]; if (!action && method === "GET") { - const session = this.getManagedSessionOrThrow(sessionKey); + const session = await this.ensureSession(sessionKey); this.writeJson(response, 200, { session: this.createSnapshot(session) }); return; } @@ -818,7 +698,7 @@ export class GatewayRuntime { if (action === "history" && method === "GET") { const limitParam = url.searchParams.get("limit"); - const messages = this.handleGetHistory( + const messages = await this.handleGetHistory( sessionKey, limitParam ? parseInt(limitParam, 10) : undefined, ); @@ -1067,7 +947,7 @@ export class GatewayRuntime { provider: string, modelId: string, ): Promise<{ ok: true; model: { provider: string; modelId: string } }> { - const managed = this.getManagedSessionOrThrow(sessionKey); + const managed = await this.ensureSession(sessionKey); const found = managed.session.modelRegistry.find(provider, modelId); if (!found) { throw new HttpError(404, `Model not found: ${provider}/${modelId}`); @@ -1076,14 +956,14 @@ export class GatewayRuntime { return { ok: true, model: { provider, modelId } }; } - private handleGetHistory( + private async handleGetHistory( sessionKey: string, limit?: number, - ): HistoryMessage[] { + ): Promise { if (limit !== undefined && (!Number.isFinite(limit) || limit < 1)) { throw new HttpError(400, "History limit must be a positive integer"); } - const managed = this.getManagedSessionOrThrow(sessionKey); + const managed = await this.ensureSession(sessionKey); const rawMessages = managed.session.messages; const messages: HistoryMessage[] = []; for (const msg of rawMessages) { @@ -1108,7 +988,7 @@ export class GatewayRuntime { sessionKey: string, patch: { name?: string }, ): Promise { - const managed = this.getManagedSessionOrThrow(sessionKey); + const managed = await this.ensureSession(sessionKey); if (patch.name !== undefined) { // Labels in pi-mono are per-entry; we label the current leaf entry const leafId = managed.session.sessionManager.getLeafId(); @@ -1126,7 +1006,7 @@ export class GatewayRuntime { if (sessionKey === this.primarySessionKey) { throw new HttpError(400, "Cannot delete primary session"); } - const managed = this.getManagedSessionOrThrow(sessionKey); + const managed = await this.ensureSession(sessionKey); if (managed.processing) { await managed.session.abort(); } @@ -1134,6 +1014,10 @@ export class GatewayRuntime { managed.unsubscribe(); managed.session.dispose(); this.sessions.delete(sessionKey); + await rm(this.getGatewaySessionDir(sessionKey), { + recursive: true, + force: true, + }).catch(() => undefined); } private getPublicConfig(): Record { @@ -1179,7 +1063,7 @@ export class GatewayRuntime { } private async handleReloadSession(sessionKey: string): Promise { - const managed = this.getManagedSessionOrThrow(sessionKey); + const managed = await this.ensureSession(sessionKey); // Reloading config by calling settingsManager.reload() on the session managed.session.settingsManager.reload(); } @@ -1269,46 +1153,3 @@ export class GatewayRuntime { return join(this.sessionDirRoot, sanitizeSessionKey(sessionKey)); } } - -function extractMessageText(message: { content: unknown }): string { - if (!Array.isArray(message.content)) { - return ""; - } - return message.content - .filter((part): part is { type: "text"; text: string } => { - return ( - typeof part === "object" && - part !== null && - "type" in part && - "text" in part && - part.type === "text" - ); - }) - .map((part) => part.text) - .join(""); -} - -function getLastAssistantText(session: AgentSession): string { - for (let index = session.messages.length - 1; index >= 0; index--) { - const message = session.messages[index]; - if (message.role === "assistant") { - return extractMessageText(message); - } - } - return ""; -} - -export function sanitizeSessionKey(sessionKey: string): string { - return sessionKey.replace(/[^a-zA-Z0-9._-]/g, "_"); -} - -export function createGatewaySessionManager( - cwd: string, - sessionKey: string, - sessionDirRoot: string, -): SessionManager { - return SessionManager.create( - cwd, - join(sessionDirRoot, sanitizeSessionKey(sessionKey)), - ); -} diff --git a/packages/coding-agent/src/core/gateway-session-manager.ts b/packages/coding-agent/src/core/gateway-session-manager.ts new file mode 100644 index 0000000..65d755c --- /dev/null +++ b/packages/coding-agent/src/core/gateway-session-manager.ts @@ -0,0 +1,17 @@ +import { join } from "node:path"; +import { SessionManager } from "./session-manager.js"; + +export function sanitizeSessionKey(sessionKey: string): string { + return sessionKey.replace(/[^a-zA-Z0-9._-]/g, "_"); +} + +export function createGatewaySessionManager( + cwd: string, + sessionKey: string, + sessionDirRoot: string, +): SessionManager { + return SessionManager.continueRecent( + cwd, + join(sessionDirRoot, sanitizeSessionKey(sessionKey)), + ); +}