diff --git a/docs/openapi.json b/docs/openapi.json index d600fda..383fc31 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -954,6 +954,8 @@ "tags": [ "v1" ], + "summary": "List all managed processes.", + "description": "Returns a list of all processes (running and exited) currently tracked\nby the runtime, sorted by process ID.", "operationId": "get_v1_processes", "responses": { "200": { @@ -982,6 +984,8 @@ "tags": [ "v1" ], + "summary": "Create a long-lived managed process.", + "description": "Spawns a new process with the given command and arguments. Supports both\npipe-based and PTY (tty) modes. Returns the process descriptor on success.", "operationId": "post_v1_processes", "requestBody": { "content": { @@ -1042,6 +1046,8 @@ "tags": [ "v1" ], + "summary": "Get process runtime configuration.", + "description": "Returns the current runtime configuration for the process management API,\nincluding limits for concurrency, timeouts, and buffer sizes.", "operationId": "get_v1_processes_config", "responses": { "200": { @@ -1070,6 +1076,8 @@ "tags": [ "v1" ], + "summary": "Update process runtime configuration.", + "description": "Replaces the runtime configuration for the process management API.\nValidates that all values are non-zero and clamps default timeout to max.", "operationId": "post_v1_processes_config", "requestBody": { "content": { @@ -1120,6 +1128,8 @@ "tags": [ "v1" ], + "summary": "Run a one-shot command.", + "description": "Executes a command to completion and returns its stdout, stderr, exit code,\nand duration. Supports configurable timeout and output size limits.", "operationId": "post_v1_processes_run", "requestBody": { "content": { @@ -1170,6 +1180,8 @@ "tags": [ "v1" ], + "summary": "Get a single process by ID.", + "description": "Returns the current state of a managed process including its status,\nPID, exit code, and creation/exit timestamps.", "operationId": "get_v1_process", "parameters": [ { @@ -1219,6 +1231,8 @@ "tags": [ "v1" ], + "summary": "Delete a process record.", + "description": "Removes a stopped process from the runtime. Returns 409 if the process\nis still running; stop or kill it first.", "operationId": "delete_v1_process", "parameters": [ { @@ -1273,6 +1287,8 @@ "tags": [ "v1" ], + "summary": "Write input to a process.", + "description": "Sends data to a process's stdin (pipe mode) or PTY writer (tty mode).\nData can be encoded as base64, utf8, or text. Returns 413 if the decoded\npayload exceeds the configured `maxInputBytesPerRequest` limit.", "operationId": "post_v1_process_input", "parameters": [ { @@ -1354,6 +1370,8 @@ "tags": [ "v1" ], + "summary": "Send SIGKILL to a process.", + "description": "Sends SIGKILL to the process and optionally waits up to `waitMs`\nmilliseconds for the process to exit before returning.", "operationId": "post_v1_process_kill", "parameters": [ { @@ -1417,6 +1435,8 @@ "tags": [ "v1" ], + "summary": "Fetch process logs.", + "description": "Returns buffered log entries for a process. Supports filtering by stream\ntype, tail count, and sequence-based resumption. When `follow=true`,\nreturns an SSE stream that replays buffered entries then streams live output.", "operationId": "get_v1_process_logs", "parameters": [ { @@ -1515,6 +1535,8 @@ "tags": [ "v1" ], + "summary": "Send SIGTERM to a process.", + "description": "Sends SIGTERM to the process and optionally waits up to `waitMs`\nmilliseconds for the process to exit before returning.", "operationId": "post_v1_process_stop", "parameters": [ { @@ -1573,92 +1595,13 @@ } } }, - "/v1/processes/{id}/terminal/resize": { - "post": { - "tags": [ - "v1" - ], - "operationId": "post_v1_process_terminal_resize", - "parameters": [ - { - "name": "id", - "in": "path", - "description": "Process ID", - "required": true, - "schema": { - "type": "string" - } - } - ], - "requestBody": { - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/ProcessTerminalResizeRequest" - } - } - }, - "required": true - }, - "responses": { - "200": { - "description": "Resize accepted", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/ProcessTerminalResizeResponse" - } - } - } - }, - "400": { - "description": "Invalid request", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/ProblemDetails" - } - } - } - }, - "404": { - "description": "Unknown process", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/ProblemDetails" - } - } - } - }, - "409": { - "description": "Not a terminal process", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/ProblemDetails" - } - } - } - }, - "501": { - "description": "Process API unsupported on this platform", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/ProblemDetails" - } - } - } - } - } - } - }, "/v1/processes/{id}/terminal/ws": { "get": { "tags": [ "v1" ], + "summary": "Open an interactive WebSocket terminal session.", + "description": "Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts\n`access_token` query param for browser-based auth (WebSocket API cannot\nsend custom headers). Uses the `channel.k8s.io` binary subprotocol:\nchannel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize,\nand channel 255 close.", "operationId": "get_v1_process_terminal_ws", "parameters": [ { @@ -2013,6 +1956,7 @@ "permission_denied", "not_acceptable", "unsupported_media_type", + "not_found", "session_not_found", "session_already_exists", "mode_not_supported", @@ -2730,44 +2674,6 @@ "exited" ] }, - "ProcessTerminalResizeRequest": { - "type": "object", - "required": [ - "cols", - "rows" - ], - "properties": { - "cols": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "rows": { - "type": "integer", - "format": "int32", - "minimum": 0 - } - } - }, - "ProcessTerminalResizeResponse": { - "type": "object", - "required": [ - "cols", - "rows" - ], - "properties": { - "cols": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "rows": { - "type": "integer", - "format": "int32", - "minimum": 0 - } - } - }, "ServerStatus": { "type": "string", "enum": [ diff --git a/frontend/packages/inspector/src/components/processes/GhosttyTerminal.tsx b/frontend/packages/inspector/src/components/processes/GhosttyTerminal.tsx index 17ddeba..858b7c1 100644 --- a/frontend/packages/inspector/src/components/processes/GhosttyTerminal.tsx +++ b/frontend/packages/inspector/src/components/processes/GhosttyTerminal.tsx @@ -1,7 +1,7 @@ import { AlertCircle, Loader2, PlugZap, SquareTerminal } from "lucide-react"; import { FitAddon, Terminal, init } from "ghostty-web"; import { useEffect, useRef, useState } from "react"; -import type { ProcessTerminalServerFrame, SandboxAgent } from "sandbox-agent"; +import type { SandboxAgent } from "sandbox-agent"; type ConnectionState = "connecting" | "ready" | "closed" | "error"; @@ -29,21 +29,6 @@ const terminalTheme = { brightWhite: "#fafafa", }; -const toUint8Array = async (data: Blob | ArrayBuffer): Promise => { - if (data instanceof ArrayBuffer) { - return new Uint8Array(data); - } - return new Uint8Array(await data.arrayBuffer()); -}; - -const isServerFrame = (value: unknown): value is ProcessTerminalServerFrame => { - if (!value || typeof value !== "object") { - return false; - } - const type = (value as { type?: unknown }).type; - return type === "ready" || type === "exit" || type === "error"; -}; - const GhosttyTerminal = ({ client, processId, @@ -62,24 +47,16 @@ const GhosttyTerminal = ({ let cancelled = false; let terminal: Terminal | null = null; let fitAddon: FitAddon | null = null; - let socket: WebSocket | null = null; + let session: ReturnType | null = null; let resizeRaf = 0; let removeDataListener: { dispose(): void } | null = null; let removeResizeListener: { dispose(): void } | null = null; - const sendFrame = (payload: unknown) => { - if (!socket || socket.readyState !== WebSocket.OPEN) { - return; - } - socket.send(JSON.stringify(payload)); - }; - const syncSize = () => { - if (!terminal) { + if (!terminal || !session) { return; } - sendFrame({ - type: "resize", + session.resize({ cols: terminal.cols, rows: terminal.rows, }); @@ -110,7 +87,7 @@ const GhosttyTerminal = ({ terminal.focus(); removeDataListener = terminal.onData((data) => { - sendFrame({ type: "input", data }); + session?.sendInput(data); }); removeResizeListener = terminal.onResize(() => { @@ -120,38 +97,29 @@ const GhosttyTerminal = ({ resizeRaf = window.requestAnimationFrame(syncSize); }); - const nextSocket = client.connectProcessTerminalWebSocket(processId); - socket = nextSocket; - nextSocket.binaryType = "arraybuffer"; + const nextSession = client.connectProcessTerminal(processId); + session = nextSession; - const tryParseControlFrame = (raw: string | ArrayBuffer | Blob): ProcessTerminalServerFrame | null => { - let text: string | undefined; - if (typeof raw === "string") { - text = raw; - } else if (raw instanceof ArrayBuffer) { - // Server may send JSON control frames as binary; try to decode small messages as JSON. - if (raw.byteLength < 256) { - try { - text = new TextDecoder().decode(raw); - } catch { - // not decodable, treat as terminal data - } - } + nextSession.onReady((frame) => { + if (cancelled) { + return; } - if (!text) return null; - try { - const parsed = JSON.parse(text); - return isServerFrame(parsed) ? parsed : null; - } catch { - return null; - } - }; - - const handleControlFrame = (frame: ProcessTerminalServerFrame): void => { if (frame.type === "ready") { setConnectionState("ready"); setStatusMessage("Connected"); syncSize(); + } + }); + + nextSession.onData((bytes) => { + if (cancelled || !terminal) { + return; + } + terminal.write(bytes); + }); + + nextSession.onExit((frame) => { + if (cancelled) { return; } if (frame.type === "exit") { @@ -161,47 +129,24 @@ const GhosttyTerminal = ({ frame.exitCode == null ? "Process exited." : `Process exited with code ${frame.exitCode}.` ); onExit?.(); - return; } - if (frame.type === "error") { - setConnectionState("error"); - setStatusMessage(frame.message); - } - }; - - nextSocket.addEventListener("message", (event) => { - if (cancelled || !terminal) { - return; - } - - const controlFrame = tryParseControlFrame(event.data); - if (controlFrame) { - handleControlFrame(controlFrame); - return; - } - - void toUint8Array(event.data).then((bytes) => { - if (!cancelled && terminal) { - terminal.write(bytes); - } - }); }); - nextSocket.addEventListener("close", () => { + nextSession.onError((error) => { + if (cancelled) { + return; + } + setConnectionState("error"); + setStatusMessage(error instanceof Error ? error.message : error.message); + }); + + nextSession.onClose(() => { if (cancelled) { return; } setConnectionState((current) => (current === "error" ? current : "closed")); setStatusMessage((current) => (current === "Connected" ? "Terminal disconnected." : current)); }); - - nextSocket.addEventListener("error", () => { - if (cancelled) { - return; - } - setConnectionState("error"); - setStatusMessage("WebSocket connection failed."); - }); } catch (error) { if (cancelled) { return; @@ -220,15 +165,7 @@ const GhosttyTerminal = ({ } removeDataListener?.dispose(); removeResizeListener?.dispose(); - if (socket?.readyState === WebSocket.OPEN) { - socket.send(JSON.stringify({ type: "close" })); - socket.close(); - } else if (socket?.readyState === WebSocket.CONNECTING) { - const pendingSocket = socket; - pendingSocket.addEventListener("open", () => { - pendingSocket.close(); - }, { once: true }); - } + session?.close(); terminal?.dispose(); }; }, [client, onExit, processId]); diff --git a/sdks/typescript/src/client.ts b/sdks/typescript/src/client.ts index 35d1691..2a3551d 100644 --- a/sdks/typescript/src/client.ts +++ b/sdks/typescript/src/client.ts @@ -51,13 +51,17 @@ import { type ProcessRunRequest, type ProcessRunResponse, type ProcessSignalQuery, - type ProcessTerminalResizeRequest, - type ProcessTerminalResizeResponse, type SessionEvent, type SessionPersistDriver, type SessionRecord, type SkillsConfig, type SkillsConfigQuery, + TerminalChannel, + type TerminalErrorStatus, + type TerminalExitStatus, + type TerminalReadyStatus, + type TerminalResizePayload, + type TerminalStatusMessage, } from "./types.ts"; const API_PREFIX = "/v1"; @@ -134,6 +138,8 @@ export interface ProcessTerminalConnectOptions extends ProcessTerminalWebSocketU WebSocket?: typeof WebSocket; } +export type ProcessTerminalSessionOptions = ProcessTerminalConnectOptions; + export class SandboxAgentError extends Error { readonly status: number; readonly problem?: ProblemDetails; @@ -472,6 +478,188 @@ export class LiveAcpConnection { } } +export class ProcessTerminalSession { + readonly socket: WebSocket; + readonly closed: Promise; + + private readonly readyListeners = new Set<(status: TerminalReadyStatus) => void>(); + private readonly dataListeners = new Set<(data: Uint8Array) => void>(); + private readonly exitListeners = new Set<(status: TerminalExitStatus) => void>(); + private readonly errorListeners = new Set<(error: TerminalErrorStatus | Error) => void>(); + private readonly closeListeners = new Set<() => void>(); + private readonly textEncoder = new TextEncoder(); + + private closeSignalSent = false; + private closedResolve!: () => void; + + constructor(socket: WebSocket) { + this.socket = socket; + this.socket.binaryType = "arraybuffer"; + this.closed = new Promise((resolve) => { + this.closedResolve = resolve; + }); + + this.socket.addEventListener("message", (event) => { + void this.handleMessage(event.data); + }); + this.socket.addEventListener("error", () => { + this.emitError(new Error("Terminal websocket connection failed.")); + }); + this.socket.addEventListener("close", () => { + this.closedResolve(); + for (const listener of this.closeListeners) { + listener(); + } + }); + } + + onReady(listener: (status: TerminalReadyStatus) => void): () => void { + this.readyListeners.add(listener); + return () => { + this.readyListeners.delete(listener); + }; + } + + onData(listener: (data: Uint8Array) => void): () => void { + this.dataListeners.add(listener); + return () => { + this.dataListeners.delete(listener); + }; + } + + onExit(listener: (status: TerminalExitStatus) => void): () => void { + this.exitListeners.add(listener); + return () => { + this.exitListeners.delete(listener); + }; + } + + onError(listener: (error: TerminalErrorStatus | Error) => void): () => void { + this.errorListeners.add(listener); + return () => { + this.errorListeners.delete(listener); + }; + } + + onClose(listener: () => void): () => void { + this.closeListeners.add(listener); + return () => { + this.closeListeners.delete(listener); + }; + } + + sendInput(data: string | ArrayBuffer | ArrayBufferView): void { + this.sendChannel(TerminalChannel.stdin, encodeTerminalBytes(data)); + } + + resize(payload: TerminalResizePayload): void { + this.sendChannel( + TerminalChannel.resize, + this.textEncoder.encode(JSON.stringify(payload)), + ); + } + + close(): void { + if (this.socket.readyState === WebSocket.CONNECTING) { + this.socket.addEventListener( + "open", + () => { + this.close(); + }, + { once: true }, + ); + return; + } + + if (this.socket.readyState === WebSocket.OPEN) { + if (!this.closeSignalSent) { + this.closeSignalSent = true; + this.sendChannel(TerminalChannel.close, new Uint8Array()); + } + this.socket.close(); + return; + } + + if (this.socket.readyState !== WebSocket.CLOSED) { + this.socket.close(); + } + } + + private async handleMessage(data: unknown): Promise { + try { + const bytes = await decodeTerminalBytes(data); + if (bytes.length === 0) { + this.emitError(new Error("Received terminal frame without a channel byte.")); + return; + } + + const channel = bytes[0]; + const payload = bytes.subarray(1); + + if (channel === TerminalChannel.stdout || channel === TerminalChannel.stderr) { + for (const listener of this.dataListeners) { + listener(payload); + } + return; + } + + if (channel === TerminalChannel.status) { + const text = new TextDecoder().decode(payload); + const parsed = JSON.parse(text) as unknown; + if (!isTerminalStatusMessage(parsed)) { + this.emitError(new Error("Received invalid terminal status payload.")); + return; + } + + if (parsed.type === "ready") { + for (const listener of this.readyListeners) { + listener(parsed); + } + return; + } + + if (parsed.type === "exit") { + for (const listener of this.exitListeners) { + listener(parsed); + } + return; + } + + this.emitError(parsed); + return; + } + + if (channel === TerminalChannel.close) { + if (this.socket.readyState === WebSocket.OPEN) { + this.socket.close(); + } + return; + } + + this.emitError(new Error(`Received unsupported terminal channel ${channel}.`)); + } catch (error) { + this.emitError(error instanceof Error ? error : new Error(String(error))); + } + } + + private sendChannel(channel: number, payload: Uint8Array): void { + if (this.socket.readyState !== WebSocket.OPEN) { + return; + } + + const frame = new Uint8Array(payload.length + 1); + frame[0] = channel; + frame.set(payload, 1); + this.socket.send(frame); + } + + private emitError(error: TerminalErrorStatus | Error): void { + for (const listener of this.errorListeners) { + listener(error); + } + } +} + export class SandboxAgent { private readonly baseUrl: string; private readonly token?: string; @@ -893,19 +1081,6 @@ export class SandboxAgent { }); } - async resizeProcessTerminal( - id: string, - request: ProcessTerminalResizeRequest, - ): Promise { - return this.requestJson( - "POST", - `${API_PREFIX}/processes/${encodeURIComponent(id)}/terminal/resize`, - { - body: request, - }, - ); - } - buildProcessTerminalWebSocketUrl( id: string, options: ProcessTerminalWebSocketUrlOptions = {}, @@ -930,10 +1105,17 @@ export class SandboxAgent { this.buildProcessTerminalWebSocketUrl(id, { accessToken: options.accessToken, }), - options.protocols, + options.protocols ?? "channel.k8s.io", ); } + connectProcessTerminal( + id: string, + options: ProcessTerminalSessionOptions = {}, + ): ProcessTerminalSession { + return new ProcessTerminalSession(this.connectProcessTerminalWebSocket(id, options)); + } + private async getLiveConnection(agent: string): Promise { const existing = this.liveConnections.get(agent); if (existing) { @@ -1204,6 +1386,62 @@ type RequestOptions = { signal?: AbortSignal; }; +function isTerminalStatusMessage(value: unknown): value is TerminalStatusMessage { + if (!isRecord(value) || typeof value.type !== "string") { + return false; + } + + if (value.type === "ready") { + return typeof value.processId === "string"; + } + + if (value.type === "exit") { + return ( + value.exitCode === undefined || + value.exitCode === null || + typeof value.exitCode === "number" + ); + } + + if (value.type === "error") { + return typeof value.message === "string"; + } + + return false; +} + +function encodeTerminalBytes(data: string | ArrayBuffer | ArrayBufferView): Uint8Array { + if (typeof data === "string") { + return new TextEncoder().encode(data); + } + + if (data instanceof ArrayBuffer) { + return new Uint8Array(data); + } + + return new Uint8Array(data.buffer, data.byteOffset, data.byteLength).slice(); +} + +async function decodeTerminalBytes(data: unknown): Promise { + if (data instanceof ArrayBuffer) { + return new Uint8Array(data); + } + + if (ArrayBuffer.isView(data)) { + return new Uint8Array(data.buffer, data.byteOffset, data.byteLength).slice(); + } + + if (typeof Blob !== "undefined" && data instanceof Blob) { + return new Uint8Array(await data.arrayBuffer()); + } + + if (typeof data === "string") { + throw new Error("Received text terminal frame; expected channel.k8s.io binary data."); + } + + throw new Error(`Unsupported terminal frame payload: ${String(data)}`); +} + /** * Auto-select and call `authenticate` based on the agent's advertised auth methods. * Prefers env-var-based methods that the server process already has configured. diff --git a/sdks/typescript/src/generated/openapi.ts b/sdks/typescript/src/generated/openapi.ts index a89d796..191f0d3 100644 --- a/sdks/typescript/src/generated/openapi.ts +++ b/sdks/typescript/src/generated/openapi.ts @@ -58,36 +58,98 @@ export interface paths { get: operations["get_v1_health"]; }; "/v1/processes": { + /** + * List all managed processes. + * @description Returns a list of all processes (running and exited) currently tracked + * by the runtime, sorted by process ID. + */ get: operations["get_v1_processes"]; + /** + * Create a long-lived managed process. + * @description Spawns a new process with the given command and arguments. Supports both + * pipe-based and PTY (tty) modes. Returns the process descriptor on success. + */ post: operations["post_v1_processes"]; }; "/v1/processes/config": { + /** + * Get process runtime configuration. + * @description Returns the current runtime configuration for the process management API, + * including limits for concurrency, timeouts, and buffer sizes. + */ get: operations["get_v1_processes_config"]; + /** + * Update process runtime configuration. + * @description Replaces the runtime configuration for the process management API. + * Validates that all values are non-zero and clamps default timeout to max. + */ post: operations["post_v1_processes_config"]; }; "/v1/processes/run": { + /** + * Run a one-shot command. + * @description Executes a command to completion and returns its stdout, stderr, exit code, + * and duration. Supports configurable timeout and output size limits. + */ post: operations["post_v1_processes_run"]; }; "/v1/processes/{id}": { + /** + * Get a single process by ID. + * @description Returns the current state of a managed process including its status, + * PID, exit code, and creation/exit timestamps. + */ get: operations["get_v1_process"]; + /** + * Delete a process record. + * @description Removes a stopped process from the runtime. Returns 409 if the process + * is still running; stop or kill it first. + */ delete: operations["delete_v1_process"]; }; "/v1/processes/{id}/input": { + /** + * Write input to a process. + * @description Sends data to a process's stdin (pipe mode) or PTY writer (tty mode). + * Data can be encoded as base64, utf8, or text. Returns 413 if the decoded + * payload exceeds the configured `maxInputBytesPerRequest` limit. + */ post: operations["post_v1_process_input"]; }; "/v1/processes/{id}/kill": { + /** + * Send SIGKILL to a process. + * @description Sends SIGKILL to the process and optionally waits up to `waitMs` + * milliseconds for the process to exit before returning. + */ post: operations["post_v1_process_kill"]; }; "/v1/processes/{id}/logs": { + /** + * Fetch process logs. + * @description Returns buffered log entries for a process. Supports filtering by stream + * type, tail count, and sequence-based resumption. When `follow=true`, + * returns an SSE stream that replays buffered entries then streams live output. + */ get: operations["get_v1_process_logs"]; }; "/v1/processes/{id}/stop": { + /** + * Send SIGTERM to a process. + * @description Sends SIGTERM to the process and optionally waits up to `waitMs` + * milliseconds for the process to exit before returning. + */ post: operations["post_v1_process_stop"]; }; - "/v1/processes/{id}/terminal/resize": { - post: operations["post_v1_process_terminal_resize"]; - }; "/v1/processes/{id}/terminal/ws": { + /** + * Open an interactive WebSocket terminal session. + * @description Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts + * `access_token` query param for browser-based auth (WebSocket API cannot + * send custom headers). Uses the `channel.k8s.io` binary subprotocol: + * channel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize, + * and channel 255 close. + */ get: operations["get_v1_process_terminal_ws"]; }; } @@ -166,7 +228,7 @@ export interface components { agents: components["schemas"]["AgentInfo"][]; }; /** @enum {string} */ - ErrorType: "invalid_request" | "conflict" | "unsupported_agent" | "agent_not_installed" | "install_failed" | "agent_process_exited" | "token_invalid" | "permission_denied" | "not_acceptable" | "unsupported_media_type" | "session_not_found" | "session_already_exists" | "mode_not_supported" | "stream_error" | "timeout"; + ErrorType: "invalid_request" | "conflict" | "unsupported_agent" | "agent_not_installed" | "install_failed" | "agent_process_exited" | "token_invalid" | "permission_denied" | "not_acceptable" | "unsupported_media_type" | "not_found" | "session_not_found" | "session_already_exists" | "mode_not_supported" | "stream_error" | "timeout"; FsActionResponse: { path: string; }; @@ -361,18 +423,6 @@ export interface components { }; /** @enum {string} */ ProcessState: "running" | "exited"; - ProcessTerminalResizeRequest: { - /** Format: int32 */ - cols: number; - /** Format: int32 */ - rows: number; - }; - ProcessTerminalResizeResponse: { - /** Format: int32 */ - cols: number; - /** Format: int32 */ - rows: number; - }; /** @enum {string} */ ServerStatus: "running" | "stopped"; ServerStatusInfo: { @@ -891,6 +941,11 @@ export interface operations { }; }; }; + /** + * List all managed processes. + * @description Returns a list of all processes (running and exited) currently tracked + * by the runtime, sorted by process ID. + */ get_v1_processes: { responses: { /** @description List processes */ @@ -907,6 +962,11 @@ export interface operations { }; }; }; + /** + * Create a long-lived managed process. + * @description Spawns a new process with the given command and arguments. Supports both + * pipe-based and PTY (tty) modes. Returns the process descriptor on success. + */ post_v1_processes: { requestBody: { content: { @@ -940,6 +1000,11 @@ export interface operations { }; }; }; + /** + * Get process runtime configuration. + * @description Returns the current runtime configuration for the process management API, + * including limits for concurrency, timeouts, and buffer sizes. + */ get_v1_processes_config: { responses: { /** @description Current runtime process config */ @@ -956,6 +1021,11 @@ export interface operations { }; }; }; + /** + * Update process runtime configuration. + * @description Replaces the runtime configuration for the process management API. + * Validates that all values are non-zero and clamps default timeout to max. + */ post_v1_processes_config: { requestBody: { content: { @@ -983,6 +1053,11 @@ export interface operations { }; }; }; + /** + * Run a one-shot command. + * @description Executes a command to completion and returns its stdout, stderr, exit code, + * and duration. Supports configurable timeout and output size limits. + */ post_v1_processes_run: { requestBody: { content: { @@ -1010,6 +1085,11 @@ export interface operations { }; }; }; + /** + * Get a single process by ID. + * @description Returns the current state of a managed process including its status, + * PID, exit code, and creation/exit timestamps. + */ get_v1_process: { parameters: { path: { @@ -1038,6 +1118,11 @@ export interface operations { }; }; }; + /** + * Delete a process record. + * @description Removes a stopped process from the runtime. Returns 409 if the process + * is still running; stop or kill it first. + */ delete_v1_process: { parameters: { path: { @@ -1070,6 +1155,12 @@ export interface operations { }; }; }; + /** + * Write input to a process. + * @description Sends data to a process's stdin (pipe mode) or PTY writer (tty mode). + * Data can be encoded as base64, utf8, or text. Returns 413 if the decoded + * payload exceeds the configured `maxInputBytesPerRequest` limit. + */ post_v1_process_input: { parameters: { path: { @@ -1115,6 +1206,11 @@ export interface operations { }; }; }; + /** + * Send SIGKILL to a process. + * @description Sends SIGKILL to the process and optionally waits up to `waitMs` + * milliseconds for the process to exit before returning. + */ post_v1_process_kill: { parameters: { query?: { @@ -1147,6 +1243,12 @@ export interface operations { }; }; }; + /** + * Fetch process logs. + * @description Returns buffered log entries for a process. Supports filtering by stream + * type, tail count, and sequence-based resumption. When `follow=true`, + * returns an SSE stream that replays buffered entries then streams live output. + */ get_v1_process_logs: { parameters: { query?: { @@ -1185,6 +1287,11 @@ export interface operations { }; }; }; + /** + * Send SIGTERM to a process. + * @description Sends SIGTERM to the process and optionally waits up to `waitMs` + * milliseconds for the process to exit before returning. + */ post_v1_process_stop: { parameters: { query?: { @@ -1217,51 +1324,14 @@ export interface operations { }; }; }; - post_v1_process_terminal_resize: { - parameters: { - path: { - /** @description Process ID */ - id: string; - }; - }; - requestBody: { - content: { - "application/json": components["schemas"]["ProcessTerminalResizeRequest"]; - }; - }; - responses: { - /** @description Resize accepted */ - 200: { - content: { - "application/json": components["schemas"]["ProcessTerminalResizeResponse"]; - }; - }; - /** @description Invalid request */ - 400: { - content: { - "application/json": components["schemas"]["ProblemDetails"]; - }; - }; - /** @description Unknown process */ - 404: { - content: { - "application/json": components["schemas"]["ProblemDetails"]; - }; - }; - /** @description Not a terminal process */ - 409: { - content: { - "application/json": components["schemas"]["ProblemDetails"]; - }; - }; - /** @description Process API unsupported on this platform */ - 501: { - content: { - "application/json": components["schemas"]["ProblemDetails"]; - }; - }; - }; - }; + /** + * Open an interactive WebSocket terminal session. + * @description Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts + * `access_token` query param for browser-based auth (WebSocket API cannot + * send custom headers). Uses the `channel.k8s.io` binary subprotocol: + * channel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize, + * and channel 255 close. + */ get_v1_process_terminal_ws: { parameters: { query?: { diff --git a/sdks/typescript/src/index.ts b/sdks/typescript/src/index.ts index 82b5791..e9e6f7e 100644 --- a/sdks/typescript/src/index.ts +++ b/sdks/typescript/src/index.ts @@ -1,5 +1,6 @@ export { LiveAcpConnection, + ProcessTerminalSession, SandboxAgent, SandboxAgentError, Session, @@ -15,6 +16,7 @@ export type { ProcessLogListener, ProcessLogSubscription, ProcessTerminalConnectOptions, + ProcessTerminalSessionOptions, ProcessTerminalWebSocketUrlOptions, SandboxAgentConnectOptions, SandboxAgentStartOptions, @@ -28,6 +30,7 @@ export type { InspectorUrlOptions } from "./inspector.ts"; export { InMemorySessionPersistDriver, + TerminalChannel, } from "./types.ts"; export type { @@ -72,18 +75,16 @@ export type { ProcessRunResponse, ProcessSignalQuery, ProcessState, - ProcessTerminalClientFrame, - ProcessTerminalErrorFrame, - ProcessTerminalExitFrame, - ProcessTerminalReadyFrame, - ProcessTerminalResizeRequest, - ProcessTerminalResizeResponse, - ProcessTerminalServerFrame, SessionEvent, SessionPersistDriver, SessionRecord, SkillsConfig, SkillsConfigQuery, + TerminalErrorStatus, + TerminalExitStatus, + TerminalReadyStatus, + TerminalResizePayload, + TerminalStatusMessage, } from "./types.ts"; export type { diff --git a/sdks/typescript/src/types.ts b/sdks/typescript/src/types.ts index aa7a73a..3debf7e 100644 --- a/sdks/typescript/src/types.ts +++ b/sdks/typescript/src/types.ts @@ -46,43 +46,40 @@ export type ProcessRunRequest = JsonRequestBody; export type ProcessSignalQuery = QueryParams; export type ProcessState = components["schemas"]["ProcessState"]; -export type ProcessTerminalResizeRequest = JsonRequestBody; -export type ProcessTerminalResizeResponse = JsonResponse; -export type ProcessTerminalClientFrame = - | { - type: "input"; - data: string; - encoding?: string; - } - | { - type: "resize"; - cols: number; - rows: number; - } - | { - type: "close"; - }; +export const TerminalChannel = { + stdin: 0, + stdout: 1, + stderr: 2, + status: 3, + resize: 4, + close: 255, +} as const; -export interface ProcessTerminalReadyFrame { +export interface TerminalReadyStatus { type: "ready"; processId: string; } -export interface ProcessTerminalExitFrame { +export interface TerminalExitStatus { type: "exit"; exitCode?: number | null; } -export interface ProcessTerminalErrorFrame { +export interface TerminalErrorStatus { type: "error"; message: string; } -export type ProcessTerminalServerFrame = - | ProcessTerminalReadyFrame - | ProcessTerminalExitFrame - | ProcessTerminalErrorFrame; +export type TerminalStatusMessage = + | TerminalReadyStatus + | TerminalExitStatus + | TerminalErrorStatus; + +export interface TerminalResizePayload { + cols: number; + rows: number; +} export interface SessionRecord { id: string; diff --git a/sdks/typescript/tests/integration.test.ts b/sdks/typescript/tests/integration.test.ts index f210bdb..2223b8f 100644 --- a/sdks/typescript/tests/integration.test.ts +++ b/sdks/typescript/tests/integration.test.ts @@ -136,22 +136,6 @@ function writeTarChecksum(buffer: Buffer, checksum: number): void { buffer[155] = 0x20; } -function decodeSocketPayload(data: unknown): string { - if (typeof data === "string") { - return data; - } - if (data instanceof ArrayBuffer) { - return Buffer.from(data).toString("utf8"); - } - if (ArrayBuffer.isView(data)) { - return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString("utf8"); - } - if (typeof Blob !== "undefined" && data instanceof Blob) { - throw new Error("Blob socket payloads are not supported in this test"); - } - throw new Error(`Unsupported socket payload type: ${typeof data}`); -} - function decodeProcessLogData(data: string, encoding: string): string { if (encoding === "base64") { return Buffer.from(data, "base64").toString("utf8"); @@ -582,47 +566,53 @@ describe("Integration: TypeScript SDK flat session API", () => { }); ttyProcessId = ttyProcess.id; - const resized = await sdk.resizeProcessTerminal(ttyProcess.id, { - cols: 120, - rows: 40, - }); - expect(resized.cols).toBe(120); - expect(resized.rows).toBe(40); - const wsUrl = sdk.buildProcessTerminalWebSocketUrl(ttyProcess.id); expect(wsUrl.startsWith("ws://") || wsUrl.startsWith("wss://")).toBe(true); - const ws = sdk.connectProcessTerminalWebSocket(ttyProcess.id, { + const session = sdk.connectProcessTerminal(ttyProcess.id, { WebSocket: WebSocket as unknown as typeof globalThis.WebSocket, }); - ws.binaryType = "arraybuffer"; + const readyFrames: string[] = []; + const ttyOutput: string[] = []; + const exitFrames: Array = []; + const terminalErrors: string[] = []; + let closeCount = 0; - const socketTextFrames: string[] = []; - const socketBinaryFrames: string[] = []; - ws.addEventListener("message", (event) => { - if (typeof event.data === "string") { - socketTextFrames.push(event.data); - return; - } - socketBinaryFrames.push(decodeSocketPayload(event.data)); + session.onReady((status) => { + readyFrames.push(status.processId); + }); + session.onData((bytes) => { + ttyOutput.push(Buffer.from(bytes).toString("utf8")); + }); + session.onExit((status) => { + exitFrames.push(status.exitCode); + }); + session.onError((error) => { + terminalErrors.push(error instanceof Error ? error.message : error.message); + }); + session.onClose(() => { + closeCount += 1; }); - await waitFor(() => { - const ready = socketTextFrames.find((frame) => frame.includes('"type":"ready"')); - return ready; + await waitFor(() => readyFrames[0]); + + session.resize({ + cols: 120, + rows: 40, }); - - ws.send(JSON.stringify({ - type: "input", - data: "hello tty\n", - })); + session.sendInput("hello tty\n"); await waitFor(() => { - const joined = socketBinaryFrames.join(""); + const joined = ttyOutput.join(""); return joined.includes("hello tty") ? joined : undefined; }); - ws.close(); + session.close(); + await session.closed; + expect(closeCount).toBeGreaterThan(0); + expect(exitFrames).toHaveLength(0); + expect(terminalErrors).toEqual([]); + await waitForAsync(async () => { const processInfo = await sdk.getProcess(ttyProcess.id); return processInfo.status === "running" ? processInfo : undefined; diff --git a/server/packages/sandbox-agent/src/process_runtime.rs b/server/packages/sandbox-agent/src/process_runtime.rs index 37f40fa..24dde91 100644 --- a/server/packages/sandbox-agent/src/process_runtime.rs +++ b/server/packages/sandbox-agent/src/process_runtime.rs @@ -8,7 +8,7 @@ use base64::Engine; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; use tokio::process::{Child, ChildStdin, Command}; -use tokio::sync::{broadcast, Mutex, RwLock}; +use tokio::sync::{broadcast, Mutex, RwLock, Semaphore}; use sandbox_agent_error::SandboxError; @@ -119,6 +119,7 @@ pub struct ProcessRuntime { struct ProcessRuntimeInner { next_id: AtomicU64, processes: RwLock>>, + run_once_semaphore: Semaphore, } #[derive(Debug)] @@ -182,6 +183,9 @@ impl ProcessRuntime { inner: Arc::new(ProcessRuntimeInner { next_id: AtomicU64::new(1), processes: RwLock::new(HashMap::new()), + run_once_semaphore: Semaphore::new( + ProcessRuntimeConfig::default().max_concurrent_processes, + ), }), } } @@ -324,6 +328,14 @@ impl ProcessRuntime { }); } + let _permit = + self.inner + .run_once_semaphore + .try_acquire() + .map_err(|_| SandboxError::Conflict { + message: "too many concurrent run_once operations".to_string(), + })?; + let config = self.get_config().await; let mut timeout_ms = spec.timeout_ms.unwrap_or(config.default_run_timeout_ms); if timeout_ms == 0 { @@ -331,7 +343,10 @@ impl ProcessRuntime { } timeout_ms = timeout_ms.min(config.max_run_timeout_ms); - let max_output_bytes = spec.max_output_bytes.unwrap_or(config.max_output_bytes); + let max_output_bytes = spec + .max_output_bytes + .unwrap_or(config.max_output_bytes) + .min(config.max_output_bytes); let mut cmd = Command::new(&spec.command); cmd.args(&spec.args) diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index d7b35b4..11211ff 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -50,6 +50,12 @@ pub use self::types::*; const APPLICATION_JSON: &str = "application/json"; const TEXT_EVENT_STREAM: &str = "text/event-stream"; +const CHANNEL_K8S_IO_PROTOCOL: &str = "channel.k8s.io"; +const CH_STDIN: u8 = 0; +const CH_STDOUT: u8 = 1; +const CH_STATUS: u8 = 3; +const CH_RESIZE: u8 = 4; +const CH_CLOSE: u8 = 255; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub enum BrandingMode { @@ -196,10 +202,6 @@ pub fn build_router_with_state(shared: Arc) -> (Router, Arc) .route("/processes/:id/kill", post(post_v1_process_kill)) .route("/processes/:id/logs", get(get_v1_process_logs)) .route("/processes/:id/input", post(post_v1_process_input)) - .route( - "/processes/:id/terminal/resize", - post(post_v1_process_terminal_resize), - ) .route( "/processes/:id/terminal/ws", get(get_v1_process_terminal_ws), @@ -344,7 +346,6 @@ pub async fn shutdown_servers(state: &Arc) { delete_v1_process, get_v1_process_logs, post_v1_process_input, - post_v1_process_terminal_resize, get_v1_process_terminal_ws, get_v1_config_mcp, put_v1_config_mcp, @@ -394,8 +395,6 @@ pub async fn shutdown_servers(state: &Arc) { ProcessInputRequest, ProcessInputResponse, ProcessSignalQuery, - ProcessTerminalResizeRequest, - ProcessTerminalResizeResponse, AcpPostQuery, AcpServerInfo, AcpServerListResponse, @@ -1602,51 +1601,13 @@ async fn post_v1_process_input( Ok(Json(ProcessInputResponse { bytes_written })) } -/// Resize a process terminal. -/// -/// Sets the PTY window size (columns and rows) for a tty-mode process and -/// sends SIGWINCH so the child process can adapt. -#[utoipa::path( - post, - path = "/v1/processes/{id}/terminal/resize", - tag = "v1", - params( - ("id" = String, Path, description = "Process ID") - ), - request_body = ProcessTerminalResizeRequest, - responses( - (status = 200, description = "Resize accepted", body = ProcessTerminalResizeResponse), - (status = 400, description = "Invalid request", body = ProblemDetails), - (status = 404, description = "Unknown process", body = ProblemDetails), - (status = 409, description = "Not a terminal process", body = ProblemDetails), - (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) - ) -)] -async fn post_v1_process_terminal_resize( - State(state): State>, - Path(id): Path, - Json(body): Json, -) -> Result, ApiError> { - if !process_api_supported() { - return Err(process_api_not_supported().into()); - } - - state - .process_runtime() - .resize_terminal(&id, body.cols, body.rows) - .await?; - Ok(Json(ProcessTerminalResizeResponse { - cols: body.cols, - rows: body.rows, - })) -} - /// Open an interactive WebSocket terminal session. /// /// Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts /// `access_token` query param for browser-based auth (WebSocket API cannot -/// send custom headers). Streams raw PTY output as binary frames and accepts -/// JSON control frames for input, resize, and close. +/// send custom headers). Uses the `channel.k8s.io` binary subprotocol: +/// channel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize, +/// and channel 255 close. #[utoipa::path( get, path = "/v1/processes/{id}/terminal/ws", @@ -1682,23 +1643,16 @@ async fn get_v1_process_terminal_ws( } Ok(ws + .protocols([CHANNEL_K8S_IO_PROTOCOL]) .on_upgrade(move |socket| process_terminal_ws_session(socket, runtime, id)) .into_response()) } #[derive(Debug, Deserialize)] -#[serde(tag = "type", rename_all = "camelCase")] -enum TerminalClientFrame { - Input { - data: String, - #[serde(default)] - encoding: Option, - }, - Resize { - cols: u16, - rows: u16, - }, - Close, +#[serde(rename_all = "camelCase")] +struct TerminalResizePayload { + cols: u16, + rows: u16, } async fn process_terminal_ws_session( @@ -1706,7 +1660,7 @@ async fn process_terminal_ws_session( runtime: Arc, id: String, ) { - let _ = send_ws_json( + let _ = send_status_json( &mut socket, json!({ "type": "ready", @@ -1718,7 +1672,8 @@ async fn process_terminal_ws_session( let mut log_rx = match runtime.subscribe_logs(&id).await { Ok(rx) => rx, Err(err) => { - let _ = send_ws_error(&mut socket, &err.to_string()).await; + let _ = send_status_error(&mut socket, &err.to_string()).await; + let _ = send_close_signal(&mut socket).await; let _ = socket.close().await; return; } @@ -1729,43 +1684,57 @@ async fn process_terminal_ws_session( tokio::select! { ws_in = socket.recv() => { match ws_in { - Some(Ok(Message::Binary(_))) => { - let _ = send_ws_error(&mut socket, "binary input is not supported; use text JSON frames").await; - } - Some(Ok(Message::Text(text))) => { - let parsed = serde_json::from_str::(&text); - match parsed { - Ok(TerminalClientFrame::Input { data, encoding }) => { - let input = match decode_input_bytes(&data, encoding.as_deref().unwrap_or("utf8")) { - Ok(input) => input, - Err(err) => { - let _ = send_ws_error(&mut socket, &err.to_string()).await; - continue; - } - }; + Some(Ok(Message::Binary(bytes))) => { + let Some((&channel, payload)) = bytes.split_first() else { + let _ = send_status_error(&mut socket, "invalid terminal frame: missing channel byte").await; + continue; + }; + + match channel { + CH_STDIN => { + let input = payload.to_vec(); let max_input = runtime.max_input_bytes().await; if input.len() > max_input { - let _ = send_ws_error(&mut socket, &format!("input payload exceeds maxInputBytesPerRequest ({max_input})")).await; + let _ = send_status_error(&mut socket, &format!("input payload exceeds maxInputBytesPerRequest ({max_input})")).await; continue; } if let Err(err) = runtime.write_input(&id, &input).await { - let _ = send_ws_error(&mut socket, &err.to_string()).await; + let _ = send_status_error(&mut socket, &err.to_string()).await; } } - Ok(TerminalClientFrame::Resize { cols, rows }) => { - if let Err(err) = runtime.resize_terminal(&id, cols, rows).await { - let _ = send_ws_error(&mut socket, &err.to_string()).await; + CH_RESIZE => { + let resize = match serde_json::from_slice::(payload) { + Ok(resize) => resize, + Err(err) => { + let _ = send_status_error(&mut socket, &format!("invalid resize payload: {err}")).await; + continue; + } + }; + + if let Err(err) = runtime + .resize_terminal(&id, resize.cols, resize.rows) + .await + { + let _ = send_status_error(&mut socket, &err.to_string()).await; } } - Ok(TerminalClientFrame::Close) => { + CH_CLOSE => { + let _ = send_close_signal(&mut socket).await; let _ = socket.close().await; break; } - Err(err) => { - let _ = send_ws_error(&mut socket, &format!("invalid terminal frame: {err}")).await; + _ => { + let _ = send_status_error(&mut socket, &format!("unsupported terminal channel: {channel}")).await; } } } + Some(Ok(Message::Text(_))) => { + let _ = send_status_error( + &mut socket, + "text frames are not supported; use channel.k8s.io binary frames", + ) + .await; + } Some(Ok(Message::Ping(payload))) => { let _ = socket.send(Message::Pong(payload)).await; } @@ -1785,7 +1754,7 @@ async fn process_terminal_ws_session( use base64::Engine; BASE64_ENGINE.decode(&line.data).unwrap_or_default() }; - if socket.send(Message::Binary(bytes)).await.is_err() { + if send_channel_frame(&mut socket, CH_STDOUT, bytes).await.is_err() { break; } } @@ -1796,7 +1765,7 @@ async fn process_terminal_ws_session( _ = exit_poll.tick() => { if let Ok(snapshot) = runtime.snapshot(&id).await { if snapshot.status == ProcessStatus::Exited { - let _ = send_ws_json( + let _ = send_status_json( &mut socket, json!({ "type": "exit", @@ -1804,6 +1773,7 @@ async fn process_terminal_ws_session( }), ) .await; + let _ = send_close_signal(&mut socket).await; let _ = socket.close().await; break; } @@ -1813,17 +1783,30 @@ async fn process_terminal_ws_session( } } -async fn send_ws_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> { +async fn send_channel_frame( + socket: &mut WebSocket, + channel: u8, + payload: impl Into>, +) -> Result<(), ()> { + let mut frame = vec![channel]; + frame.extend(payload.into()); socket - .send(Message::Text( - serde_json::to_string(&payload).map_err(|_| ())?, - )) + .send(Message::Binary(frame.into())) .await .map_err(|_| ()) } -async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> { - send_ws_json( +async fn send_status_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> { + send_channel_frame( + socket, + CH_STATUS, + serde_json::to_vec(&payload).map_err(|_| ())?, + ) + .await +} + +async fn send_status_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> { + send_status_json( socket, json!({ "type": "error", @@ -1833,6 +1816,10 @@ async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> .await } +async fn send_close_signal(socket: &mut WebSocket) -> Result<(), ()> { + send_channel_frame(socket, CH_CLOSE, Vec::::new()).await +} + #[utoipa::path( get, path = "/v1/config/mcp", diff --git a/server/packages/sandbox-agent/src/router/types.rs b/server/packages/sandbox-agent/src/router/types.rs index 6d40e2a..07b389d 100644 --- a/server/packages/sandbox-agent/src/router/types.rs +++ b/server/packages/sandbox-agent/src/router/types.rs @@ -512,20 +512,6 @@ pub struct ProcessSignalQuery { pub wait_ms: Option, } -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ProcessTerminalResizeRequest { - pub cols: u16, - pub rows: u16, -} - -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct ProcessTerminalResizeResponse { - pub cols: u16, - pub rows: u16, -} - #[derive(Debug, Clone, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct ProcessWsQuery { diff --git a/server/packages/sandbox-agent/tests/v1_api/processes.rs b/server/packages/sandbox-agent/tests/v1_api/processes.rs index aaf072d..3b619bf 100644 --- a/server/packages/sandbox-agent/tests/v1_api/processes.rs +++ b/server/packages/sandbox-agent/tests/v1_api/processes.rs @@ -3,8 +3,17 @@ use base64::engine::general_purpose::STANDARD as BASE64; use base64::Engine; use futures::{SinkExt, StreamExt}; use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::client::IntoClientRequest; +use tokio_tungstenite::tungstenite::http::HeaderValue; use tokio_tungstenite::tungstenite::Message; +const CHANNEL_K8S_IO_PROTOCOL: &str = "channel.k8s.io"; +const CH_STDIN: u8 = 0; +const CH_STDOUT: u8 = 1; +const CH_STATUS: u8 = 3; +const CH_RESIZE: u8 = 4; +const CH_CLOSE: u8 = 255; + async fn wait_for_exited(test_app: &TestApp, process_id: &str) { for _ in 0..30 { let (status, _, body) = send_request( @@ -48,6 +57,19 @@ async fn recv_ws_message( .expect("websocket frame") } +fn make_channel_frame(channel: u8, payload: impl AsRef<[u8]>) -> Vec { + let payload = payload.as_ref(); + let mut frame = Vec::with_capacity(payload.len() + 1); + frame.push(channel); + frame.extend_from_slice(payload); + frame +} + +fn parse_channel_frame(bytes: &[u8]) -> (u8, &[u8]) { + let (&channel, payload) = bytes.split_first().expect("channel frame"); + (channel, payload) +} + #[tokio::test] async fn v1_processes_config_round_trip() { let test_app = TestApp::new(AuthConfig::disabled()); @@ -519,59 +541,84 @@ async fn v1_process_terminal_ws_e2e_is_deterministic() { .expect("create process response"); assert_eq!(create_response.status(), reqwest::StatusCode::OK); let create_body: Value = create_response.json().await.expect("create process json"); - let process_id = create_body["id"] - .as_str() - .expect("process id") - .to_string(); + let process_id = create_body["id"].as_str().expect("process id").to_string(); let ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws")); - let (mut ws, _) = connect_async(&ws_url) - .await - .expect("connect websocket"); + let mut ws_request = ws_url.into_client_request().expect("ws request"); + ws_request.headers_mut().insert( + "Sec-WebSocket-Protocol", + HeaderValue::from_static(CHANNEL_K8S_IO_PROTOCOL), + ); + let (mut ws, response) = connect_async(ws_request).await.expect("connect websocket"); + assert_eq!( + response + .headers() + .get("Sec-WebSocket-Protocol") + .and_then(|value| value.to_str().ok()), + Some(CHANNEL_K8S_IO_PROTOCOL) + ); let ready = recv_ws_message(&mut ws).await; - let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame")) - .expect("ready json"); + let ready_bytes = ready.into_data(); + let (ready_channel, ready_payload) = parse_channel_frame(&ready_bytes); + assert_eq!(ready_channel, CH_STATUS); + let ready_payload: Value = serde_json::from_slice(ready_payload).expect("ready json"); assert_eq!(ready_payload["type"], "ready"); assert_eq!(ready_payload["processId"], process_id); - ws.send(Message::Text( - json!({ - "type": "input", - "data": "hello from ws\n" - }) - .to_string(), + ws.send(Message::Binary( + make_channel_frame(CH_STDIN, b"hello from ws\n").into(), )) .await .expect("send input frame"); - let mut saw_binary_output = false; + ws.send(Message::Binary( + make_channel_frame(CH_RESIZE, br#"{"cols":120,"rows":40}"#).into(), + )) + .await + .expect("send resize frame"); + + let mut saw_stdout = false; let mut saw_exit = false; + let mut saw_close = false; for _ in 0..10 { let frame = recv_ws_message(&mut ws).await; match frame { Message::Binary(bytes) => { - let text = String::from_utf8_lossy(&bytes); - if text.contains("got:hello from ws") { - saw_binary_output = true; + let (channel, payload) = parse_channel_frame(&bytes); + match channel { + CH_STDOUT => { + let text = String::from_utf8_lossy(payload); + if text.contains("got:hello from ws") { + saw_stdout = true; + } + } + CH_STATUS => { + let payload: Value = + serde_json::from_slice(payload).expect("ws status json"); + if payload["type"] == "exit" { + saw_exit = true; + } else { + assert_ne!(payload["type"], "error"); + } + } + CH_CLOSE => { + assert!(payload.is_empty(), "close channel payload must be empty"); + saw_close = true; + break; + } + other => panic!("unexpected websocket channel: {other}"), } } - Message::Text(text) => { - let payload: Value = serde_json::from_str(&text).expect("ws json"); - if payload["type"] == "exit" { - saw_exit = true; - break; - } - assert_ne!(payload["type"], "error"); - } Message::Close(_) => break, Message::Ping(_) | Message::Pong(_) => {} _ => {} } } - assert!(saw_binary_output, "expected pty binary output over websocket"); - assert!(saw_exit, "expected exit control frame over websocket"); + assert!(saw_stdout, "expected pty stdout over websocket"); + assert!(saw_exit, "expected exit status frame over websocket"); + assert!(saw_close, "expected close channel frame over websocket"); let _ = ws.close(None).await; @@ -605,10 +652,7 @@ async fn v1_process_terminal_ws_auth_e2e() { .expect("create process response"); assert_eq!(create_response.status(), reqwest::StatusCode::OK); let create_body: Value = create_response.json().await.expect("create process json"); - let process_id = create_body["id"] - .as_str() - .expect("process id") - .to_string(); + let process_id = create_body["id"].as_str().expect("process id").to_string(); let unauth_ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws")); let unauth_err = connect_async(&unauth_ws_url) @@ -624,25 +668,42 @@ async fn v1_process_terminal_ws_auth_e2e() { let auth_ws_url = live_server.ws_url(&format!( "/v1/processes/{process_id}/terminal/ws?access_token={token}" )); - let (mut ws, _) = connect_async(&auth_ws_url) + let mut ws_request = auth_ws_url.into_client_request().expect("ws request"); + ws_request.headers_mut().insert( + "Sec-WebSocket-Protocol", + HeaderValue::from_static(CHANNEL_K8S_IO_PROTOCOL), + ); + let (mut ws, response) = connect_async(ws_request) .await .expect("authenticated websocket handshake"); + assert_eq!( + response + .headers() + .get("Sec-WebSocket-Protocol") + .and_then(|value| value.to_str().ok()), + Some(CHANNEL_K8S_IO_PROTOCOL) + ); let ready = recv_ws_message(&mut ws).await; - let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame")) - .expect("ready json"); + let ready_bytes = ready.into_data(); + let (ready_channel, ready_payload) = parse_channel_frame(&ready_bytes); + assert_eq!(ready_channel, CH_STATUS); + let ready_payload: Value = serde_json::from_slice(ready_payload).expect("ready json"); assert_eq!(ready_payload["type"], "ready"); assert_eq!(ready_payload["processId"], process_id); let _ = ws - .send(Message::Text(json!({ "type": "close" }).to_string())) + .send(Message::Binary(make_channel_frame(CH_CLOSE, []).into())) .await; + let close = recv_ws_message(&mut ws).await; + let close_bytes = close.into_data(); + let (close_channel, close_payload) = parse_channel_frame(&close_bytes); + assert_eq!(close_channel, CH_CLOSE); + assert!(close_payload.is_empty()); let _ = ws.close(None).await; let kill_response = http - .post(live_server.http_url(&format!( - "/v1/processes/{process_id}/kill?waitMs=1000" - ))) + .post(live_server.http_url(&format!("/v1/processes/{process_id}/kill?waitMs=1000"))) .bearer_auth(token) .send() .await