feat: refine process API — WebSocket binary protocol, SDK terminal session, updated tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Nathan Flurry 2026-03-06 12:12:24 -08:00
parent 6c91323ca6
commit 636eefb553
11 changed files with 700 additions and 512 deletions

View file

@ -954,6 +954,8 @@
"tags": [
"v1"
],
"summary": "List all managed processes.",
"description": "Returns a list of all processes (running and exited) currently tracked\nby the runtime, sorted by process ID.",
"operationId": "get_v1_processes",
"responses": {
"200": {
@ -982,6 +984,8 @@
"tags": [
"v1"
],
"summary": "Create a long-lived managed process.",
"description": "Spawns a new process with the given command and arguments. Supports both\npipe-based and PTY (tty) modes. Returns the process descriptor on success.",
"operationId": "post_v1_processes",
"requestBody": {
"content": {
@ -1042,6 +1046,8 @@
"tags": [
"v1"
],
"summary": "Get process runtime configuration.",
"description": "Returns the current runtime configuration for the process management API,\nincluding limits for concurrency, timeouts, and buffer sizes.",
"operationId": "get_v1_processes_config",
"responses": {
"200": {
@ -1070,6 +1076,8 @@
"tags": [
"v1"
],
"summary": "Update process runtime configuration.",
"description": "Replaces the runtime configuration for the process management API.\nValidates that all values are non-zero and clamps default timeout to max.",
"operationId": "post_v1_processes_config",
"requestBody": {
"content": {
@ -1120,6 +1128,8 @@
"tags": [
"v1"
],
"summary": "Run a one-shot command.",
"description": "Executes a command to completion and returns its stdout, stderr, exit code,\nand duration. Supports configurable timeout and output size limits.",
"operationId": "post_v1_processes_run",
"requestBody": {
"content": {
@ -1170,6 +1180,8 @@
"tags": [
"v1"
],
"summary": "Get a single process by ID.",
"description": "Returns the current state of a managed process including its status,\nPID, exit code, and creation/exit timestamps.",
"operationId": "get_v1_process",
"parameters": [
{
@ -1219,6 +1231,8 @@
"tags": [
"v1"
],
"summary": "Delete a process record.",
"description": "Removes a stopped process from the runtime. Returns 409 if the process\nis still running; stop or kill it first.",
"operationId": "delete_v1_process",
"parameters": [
{
@ -1273,6 +1287,8 @@
"tags": [
"v1"
],
"summary": "Write input to a process.",
"description": "Sends data to a process's stdin (pipe mode) or PTY writer (tty mode).\nData can be encoded as base64, utf8, or text. Returns 413 if the decoded\npayload exceeds the configured `maxInputBytesPerRequest` limit.",
"operationId": "post_v1_process_input",
"parameters": [
{
@ -1354,6 +1370,8 @@
"tags": [
"v1"
],
"summary": "Send SIGKILL to a process.",
"description": "Sends SIGKILL to the process and optionally waits up to `waitMs`\nmilliseconds for the process to exit before returning.",
"operationId": "post_v1_process_kill",
"parameters": [
{
@ -1417,6 +1435,8 @@
"tags": [
"v1"
],
"summary": "Fetch process logs.",
"description": "Returns buffered log entries for a process. Supports filtering by stream\ntype, tail count, and sequence-based resumption. When `follow=true`,\nreturns an SSE stream that replays buffered entries then streams live output.",
"operationId": "get_v1_process_logs",
"parameters": [
{
@ -1515,6 +1535,8 @@
"tags": [
"v1"
],
"summary": "Send SIGTERM to a process.",
"description": "Sends SIGTERM to the process and optionally waits up to `waitMs`\nmilliseconds for the process to exit before returning.",
"operationId": "post_v1_process_stop",
"parameters": [
{
@ -1573,92 +1595,13 @@
}
}
},
"/v1/processes/{id}/terminal/resize": {
"post": {
"tags": [
"v1"
],
"operationId": "post_v1_process_terminal_resize",
"parameters": [
{
"name": "id",
"in": "path",
"description": "Process ID",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ProcessTerminalResizeRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "Resize accepted",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ProcessTerminalResizeResponse"
}
}
}
},
"400": {
"description": "Invalid request",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ProblemDetails"
}
}
}
},
"404": {
"description": "Unknown process",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ProblemDetails"
}
}
}
},
"409": {
"description": "Not a terminal process",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ProblemDetails"
}
}
}
},
"501": {
"description": "Process API unsupported on this platform",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ProblemDetails"
}
}
}
}
}
}
},
"/v1/processes/{id}/terminal/ws": {
"get": {
"tags": [
"v1"
],
"summary": "Open an interactive WebSocket terminal session.",
"description": "Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts\n`access_token` query param for browser-based auth (WebSocket API cannot\nsend custom headers). Uses the `channel.k8s.io` binary subprotocol:\nchannel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize,\nand channel 255 close.",
"operationId": "get_v1_process_terminal_ws",
"parameters": [
{
@ -2013,6 +1956,7 @@
"permission_denied",
"not_acceptable",
"unsupported_media_type",
"not_found",
"session_not_found",
"session_already_exists",
"mode_not_supported",
@ -2730,44 +2674,6 @@
"exited"
]
},
"ProcessTerminalResizeRequest": {
"type": "object",
"required": [
"cols",
"rows"
],
"properties": {
"cols": {
"type": "integer",
"format": "int32",
"minimum": 0
},
"rows": {
"type": "integer",
"format": "int32",
"minimum": 0
}
}
},
"ProcessTerminalResizeResponse": {
"type": "object",
"required": [
"cols",
"rows"
],
"properties": {
"cols": {
"type": "integer",
"format": "int32",
"minimum": 0
},
"rows": {
"type": "integer",
"format": "int32",
"minimum": 0
}
}
},
"ServerStatus": {
"type": "string",
"enum": [

View file

@ -1,7 +1,7 @@
import { AlertCircle, Loader2, PlugZap, SquareTerminal } from "lucide-react";
import { FitAddon, Terminal, init } from "ghostty-web";
import { useEffect, useRef, useState } from "react";
import type { ProcessTerminalServerFrame, SandboxAgent } from "sandbox-agent";
import type { SandboxAgent } from "sandbox-agent";
type ConnectionState = "connecting" | "ready" | "closed" | "error";
@ -29,21 +29,6 @@ const terminalTheme = {
brightWhite: "#fafafa",
};
const toUint8Array = async (data: Blob | ArrayBuffer): Promise<Uint8Array> => {
if (data instanceof ArrayBuffer) {
return new Uint8Array(data);
}
return new Uint8Array(await data.arrayBuffer());
};
const isServerFrame = (value: unknown): value is ProcessTerminalServerFrame => {
if (!value || typeof value !== "object") {
return false;
}
const type = (value as { type?: unknown }).type;
return type === "ready" || type === "exit" || type === "error";
};
const GhosttyTerminal = ({
client,
processId,
@ -62,24 +47,16 @@ const GhosttyTerminal = ({
let cancelled = false;
let terminal: Terminal | null = null;
let fitAddon: FitAddon | null = null;
let socket: WebSocket | null = null;
let session: ReturnType<SandboxAgent["connectProcessTerminal"]> | null = null;
let resizeRaf = 0;
let removeDataListener: { dispose(): void } | null = null;
let removeResizeListener: { dispose(): void } | null = null;
const sendFrame = (payload: unknown) => {
if (!socket || socket.readyState !== WebSocket.OPEN) {
return;
}
socket.send(JSON.stringify(payload));
};
const syncSize = () => {
if (!terminal) {
if (!terminal || !session) {
return;
}
sendFrame({
type: "resize",
session.resize({
cols: terminal.cols,
rows: terminal.rows,
});
@ -110,7 +87,7 @@ const GhosttyTerminal = ({
terminal.focus();
removeDataListener = terminal.onData((data) => {
sendFrame({ type: "input", data });
session?.sendInput(data);
});
removeResizeListener = terminal.onResize(() => {
@ -120,38 +97,29 @@ const GhosttyTerminal = ({
resizeRaf = window.requestAnimationFrame(syncSize);
});
const nextSocket = client.connectProcessTerminalWebSocket(processId);
socket = nextSocket;
nextSocket.binaryType = "arraybuffer";
const nextSession = client.connectProcessTerminal(processId);
session = nextSession;
const tryParseControlFrame = (raw: string | ArrayBuffer | Blob): ProcessTerminalServerFrame | null => {
let text: string | undefined;
if (typeof raw === "string") {
text = raw;
} else if (raw instanceof ArrayBuffer) {
// Server may send JSON control frames as binary; try to decode small messages as JSON.
if (raw.byteLength < 256) {
try {
text = new TextDecoder().decode(raw);
} catch {
// not decodable, treat as terminal data
}
}
nextSession.onReady((frame) => {
if (cancelled) {
return;
}
if (!text) return null;
try {
const parsed = JSON.parse(text);
return isServerFrame(parsed) ? parsed : null;
} catch {
return null;
}
};
const handleControlFrame = (frame: ProcessTerminalServerFrame): void => {
if (frame.type === "ready") {
setConnectionState("ready");
setStatusMessage("Connected");
syncSize();
}
});
nextSession.onData((bytes) => {
if (cancelled || !terminal) {
return;
}
terminal.write(bytes);
});
nextSession.onExit((frame) => {
if (cancelled) {
return;
}
if (frame.type === "exit") {
@ -161,47 +129,24 @@ const GhosttyTerminal = ({
frame.exitCode == null ? "Process exited." : `Process exited with code ${frame.exitCode}.`
);
onExit?.();
return;
}
if (frame.type === "error") {
setConnectionState("error");
setStatusMessage(frame.message);
}
};
nextSocket.addEventListener("message", (event) => {
if (cancelled || !terminal) {
return;
}
const controlFrame = tryParseControlFrame(event.data);
if (controlFrame) {
handleControlFrame(controlFrame);
return;
}
void toUint8Array(event.data).then((bytes) => {
if (!cancelled && terminal) {
terminal.write(bytes);
}
});
});
nextSocket.addEventListener("close", () => {
nextSession.onError((error) => {
if (cancelled) {
return;
}
setConnectionState("error");
setStatusMessage(error instanceof Error ? error.message : error.message);
});
nextSession.onClose(() => {
if (cancelled) {
return;
}
setConnectionState((current) => (current === "error" ? current : "closed"));
setStatusMessage((current) => (current === "Connected" ? "Terminal disconnected." : current));
});
nextSocket.addEventListener("error", () => {
if (cancelled) {
return;
}
setConnectionState("error");
setStatusMessage("WebSocket connection failed.");
});
} catch (error) {
if (cancelled) {
return;
@ -220,15 +165,7 @@ const GhosttyTerminal = ({
}
removeDataListener?.dispose();
removeResizeListener?.dispose();
if (socket?.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({ type: "close" }));
socket.close();
} else if (socket?.readyState === WebSocket.CONNECTING) {
const pendingSocket = socket;
pendingSocket.addEventListener("open", () => {
pendingSocket.close();
}, { once: true });
}
session?.close();
terminal?.dispose();
};
}, [client, onExit, processId]);

View file

@ -51,13 +51,17 @@ import {
type ProcessRunRequest,
type ProcessRunResponse,
type ProcessSignalQuery,
type ProcessTerminalResizeRequest,
type ProcessTerminalResizeResponse,
type SessionEvent,
type SessionPersistDriver,
type SessionRecord,
type SkillsConfig,
type SkillsConfigQuery,
TerminalChannel,
type TerminalErrorStatus,
type TerminalExitStatus,
type TerminalReadyStatus,
type TerminalResizePayload,
type TerminalStatusMessage,
} from "./types.ts";
const API_PREFIX = "/v1";
@ -134,6 +138,8 @@ export interface ProcessTerminalConnectOptions extends ProcessTerminalWebSocketU
WebSocket?: typeof WebSocket;
}
export type ProcessTerminalSessionOptions = ProcessTerminalConnectOptions;
export class SandboxAgentError extends Error {
readonly status: number;
readonly problem?: ProblemDetails;
@ -472,6 +478,188 @@ export class LiveAcpConnection {
}
}
export class ProcessTerminalSession {
readonly socket: WebSocket;
readonly closed: Promise<void>;
private readonly readyListeners = new Set<(status: TerminalReadyStatus) => void>();
private readonly dataListeners = new Set<(data: Uint8Array) => void>();
private readonly exitListeners = new Set<(status: TerminalExitStatus) => void>();
private readonly errorListeners = new Set<(error: TerminalErrorStatus | Error) => void>();
private readonly closeListeners = new Set<() => void>();
private readonly textEncoder = new TextEncoder();
private closeSignalSent = false;
private closedResolve!: () => void;
constructor(socket: WebSocket) {
this.socket = socket;
this.socket.binaryType = "arraybuffer";
this.closed = new Promise<void>((resolve) => {
this.closedResolve = resolve;
});
this.socket.addEventListener("message", (event) => {
void this.handleMessage(event.data);
});
this.socket.addEventListener("error", () => {
this.emitError(new Error("Terminal websocket connection failed."));
});
this.socket.addEventListener("close", () => {
this.closedResolve();
for (const listener of this.closeListeners) {
listener();
}
});
}
onReady(listener: (status: TerminalReadyStatus) => void): () => void {
this.readyListeners.add(listener);
return () => {
this.readyListeners.delete(listener);
};
}
onData(listener: (data: Uint8Array) => void): () => void {
this.dataListeners.add(listener);
return () => {
this.dataListeners.delete(listener);
};
}
onExit(listener: (status: TerminalExitStatus) => void): () => void {
this.exitListeners.add(listener);
return () => {
this.exitListeners.delete(listener);
};
}
onError(listener: (error: TerminalErrorStatus | Error) => void): () => void {
this.errorListeners.add(listener);
return () => {
this.errorListeners.delete(listener);
};
}
onClose(listener: () => void): () => void {
this.closeListeners.add(listener);
return () => {
this.closeListeners.delete(listener);
};
}
sendInput(data: string | ArrayBuffer | ArrayBufferView): void {
this.sendChannel(TerminalChannel.stdin, encodeTerminalBytes(data));
}
resize(payload: TerminalResizePayload): void {
this.sendChannel(
TerminalChannel.resize,
this.textEncoder.encode(JSON.stringify(payload)),
);
}
close(): void {
if (this.socket.readyState === WebSocket.CONNECTING) {
this.socket.addEventListener(
"open",
() => {
this.close();
},
{ once: true },
);
return;
}
if (this.socket.readyState === WebSocket.OPEN) {
if (!this.closeSignalSent) {
this.closeSignalSent = true;
this.sendChannel(TerminalChannel.close, new Uint8Array());
}
this.socket.close();
return;
}
if (this.socket.readyState !== WebSocket.CLOSED) {
this.socket.close();
}
}
private async handleMessage(data: unknown): Promise<void> {
try {
const bytes = await decodeTerminalBytes(data);
if (bytes.length === 0) {
this.emitError(new Error("Received terminal frame without a channel byte."));
return;
}
const channel = bytes[0];
const payload = bytes.subarray(1);
if (channel === TerminalChannel.stdout || channel === TerminalChannel.stderr) {
for (const listener of this.dataListeners) {
listener(payload);
}
return;
}
if (channel === TerminalChannel.status) {
const text = new TextDecoder().decode(payload);
const parsed = JSON.parse(text) as unknown;
if (!isTerminalStatusMessage(parsed)) {
this.emitError(new Error("Received invalid terminal status payload."));
return;
}
if (parsed.type === "ready") {
for (const listener of this.readyListeners) {
listener(parsed);
}
return;
}
if (parsed.type === "exit") {
for (const listener of this.exitListeners) {
listener(parsed);
}
return;
}
this.emitError(parsed);
return;
}
if (channel === TerminalChannel.close) {
if (this.socket.readyState === WebSocket.OPEN) {
this.socket.close();
}
return;
}
this.emitError(new Error(`Received unsupported terminal channel ${channel}.`));
} catch (error) {
this.emitError(error instanceof Error ? error : new Error(String(error)));
}
}
private sendChannel(channel: number, payload: Uint8Array): void {
if (this.socket.readyState !== WebSocket.OPEN) {
return;
}
const frame = new Uint8Array(payload.length + 1);
frame[0] = channel;
frame.set(payload, 1);
this.socket.send(frame);
}
private emitError(error: TerminalErrorStatus | Error): void {
for (const listener of this.errorListeners) {
listener(error);
}
}
}
export class SandboxAgent {
private readonly baseUrl: string;
private readonly token?: string;
@ -893,19 +1081,6 @@ export class SandboxAgent {
});
}
async resizeProcessTerminal(
id: string,
request: ProcessTerminalResizeRequest,
): Promise<ProcessTerminalResizeResponse> {
return this.requestJson(
"POST",
`${API_PREFIX}/processes/${encodeURIComponent(id)}/terminal/resize`,
{
body: request,
},
);
}
buildProcessTerminalWebSocketUrl(
id: string,
options: ProcessTerminalWebSocketUrlOptions = {},
@ -930,10 +1105,17 @@ export class SandboxAgent {
this.buildProcessTerminalWebSocketUrl(id, {
accessToken: options.accessToken,
}),
options.protocols,
options.protocols ?? "channel.k8s.io",
);
}
connectProcessTerminal(
id: string,
options: ProcessTerminalSessionOptions = {},
): ProcessTerminalSession {
return new ProcessTerminalSession(this.connectProcessTerminalWebSocket(id, options));
}
private async getLiveConnection(agent: string): Promise<LiveAcpConnection> {
const existing = this.liveConnections.get(agent);
if (existing) {
@ -1204,6 +1386,62 @@ type RequestOptions = {
signal?: AbortSignal;
};
function isTerminalStatusMessage(value: unknown): value is TerminalStatusMessage {
if (!isRecord(value) || typeof value.type !== "string") {
return false;
}
if (value.type === "ready") {
return typeof value.processId === "string";
}
if (value.type === "exit") {
return (
value.exitCode === undefined ||
value.exitCode === null ||
typeof value.exitCode === "number"
);
}
if (value.type === "error") {
return typeof value.message === "string";
}
return false;
}
function encodeTerminalBytes(data: string | ArrayBuffer | ArrayBufferView): Uint8Array {
if (typeof data === "string") {
return new TextEncoder().encode(data);
}
if (data instanceof ArrayBuffer) {
return new Uint8Array(data);
}
return new Uint8Array(data.buffer, data.byteOffset, data.byteLength).slice();
}
async function decodeTerminalBytes(data: unknown): Promise<Uint8Array> {
if (data instanceof ArrayBuffer) {
return new Uint8Array(data);
}
if (ArrayBuffer.isView(data)) {
return new Uint8Array(data.buffer, data.byteOffset, data.byteLength).slice();
}
if (typeof Blob !== "undefined" && data instanceof Blob) {
return new Uint8Array(await data.arrayBuffer());
}
if (typeof data === "string") {
throw new Error("Received text terminal frame; expected channel.k8s.io binary data.");
}
throw new Error(`Unsupported terminal frame payload: ${String(data)}`);
}
/**
* Auto-select and call `authenticate` based on the agent's advertised auth methods.
* Prefers env-var-based methods that the server process already has configured.

View file

@ -58,36 +58,98 @@ export interface paths {
get: operations["get_v1_health"];
};
"/v1/processes": {
/**
* List all managed processes.
* @description Returns a list of all processes (running and exited) currently tracked
* by the runtime, sorted by process ID.
*/
get: operations["get_v1_processes"];
/**
* Create a long-lived managed process.
* @description Spawns a new process with the given command and arguments. Supports both
* pipe-based and PTY (tty) modes. Returns the process descriptor on success.
*/
post: operations["post_v1_processes"];
};
"/v1/processes/config": {
/**
* Get process runtime configuration.
* @description Returns the current runtime configuration for the process management API,
* including limits for concurrency, timeouts, and buffer sizes.
*/
get: operations["get_v1_processes_config"];
/**
* Update process runtime configuration.
* @description Replaces the runtime configuration for the process management API.
* Validates that all values are non-zero and clamps default timeout to max.
*/
post: operations["post_v1_processes_config"];
};
"/v1/processes/run": {
/**
* Run a one-shot command.
* @description Executes a command to completion and returns its stdout, stderr, exit code,
* and duration. Supports configurable timeout and output size limits.
*/
post: operations["post_v1_processes_run"];
};
"/v1/processes/{id}": {
/**
* Get a single process by ID.
* @description Returns the current state of a managed process including its status,
* PID, exit code, and creation/exit timestamps.
*/
get: operations["get_v1_process"];
/**
* Delete a process record.
* @description Removes a stopped process from the runtime. Returns 409 if the process
* is still running; stop or kill it first.
*/
delete: operations["delete_v1_process"];
};
"/v1/processes/{id}/input": {
/**
* Write input to a process.
* @description Sends data to a process's stdin (pipe mode) or PTY writer (tty mode).
* Data can be encoded as base64, utf8, or text. Returns 413 if the decoded
* payload exceeds the configured `maxInputBytesPerRequest` limit.
*/
post: operations["post_v1_process_input"];
};
"/v1/processes/{id}/kill": {
/**
* Send SIGKILL to a process.
* @description Sends SIGKILL to the process and optionally waits up to `waitMs`
* milliseconds for the process to exit before returning.
*/
post: operations["post_v1_process_kill"];
};
"/v1/processes/{id}/logs": {
/**
* Fetch process logs.
* @description Returns buffered log entries for a process. Supports filtering by stream
* type, tail count, and sequence-based resumption. When `follow=true`,
* returns an SSE stream that replays buffered entries then streams live output.
*/
get: operations["get_v1_process_logs"];
};
"/v1/processes/{id}/stop": {
/**
* Send SIGTERM to a process.
* @description Sends SIGTERM to the process and optionally waits up to `waitMs`
* milliseconds for the process to exit before returning.
*/
post: operations["post_v1_process_stop"];
};
"/v1/processes/{id}/terminal/resize": {
post: operations["post_v1_process_terminal_resize"];
};
"/v1/processes/{id}/terminal/ws": {
/**
* Open an interactive WebSocket terminal session.
* @description Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts
* `access_token` query param for browser-based auth (WebSocket API cannot
* send custom headers). Uses the `channel.k8s.io` binary subprotocol:
* channel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize,
* and channel 255 close.
*/
get: operations["get_v1_process_terminal_ws"];
};
}
@ -166,7 +228,7 @@ export interface components {
agents: components["schemas"]["AgentInfo"][];
};
/** @enum {string} */
ErrorType: "invalid_request" | "conflict" | "unsupported_agent" | "agent_not_installed" | "install_failed" | "agent_process_exited" | "token_invalid" | "permission_denied" | "not_acceptable" | "unsupported_media_type" | "session_not_found" | "session_already_exists" | "mode_not_supported" | "stream_error" | "timeout";
ErrorType: "invalid_request" | "conflict" | "unsupported_agent" | "agent_not_installed" | "install_failed" | "agent_process_exited" | "token_invalid" | "permission_denied" | "not_acceptable" | "unsupported_media_type" | "not_found" | "session_not_found" | "session_already_exists" | "mode_not_supported" | "stream_error" | "timeout";
FsActionResponse: {
path: string;
};
@ -361,18 +423,6 @@ export interface components {
};
/** @enum {string} */
ProcessState: "running" | "exited";
ProcessTerminalResizeRequest: {
/** Format: int32 */
cols: number;
/** Format: int32 */
rows: number;
};
ProcessTerminalResizeResponse: {
/** Format: int32 */
cols: number;
/** Format: int32 */
rows: number;
};
/** @enum {string} */
ServerStatus: "running" | "stopped";
ServerStatusInfo: {
@ -891,6 +941,11 @@ export interface operations {
};
};
};
/**
* List all managed processes.
* @description Returns a list of all processes (running and exited) currently tracked
* by the runtime, sorted by process ID.
*/
get_v1_processes: {
responses: {
/** @description List processes */
@ -907,6 +962,11 @@ export interface operations {
};
};
};
/**
* Create a long-lived managed process.
* @description Spawns a new process with the given command and arguments. Supports both
* pipe-based and PTY (tty) modes. Returns the process descriptor on success.
*/
post_v1_processes: {
requestBody: {
content: {
@ -940,6 +1000,11 @@ export interface operations {
};
};
};
/**
* Get process runtime configuration.
* @description Returns the current runtime configuration for the process management API,
* including limits for concurrency, timeouts, and buffer sizes.
*/
get_v1_processes_config: {
responses: {
/** @description Current runtime process config */
@ -956,6 +1021,11 @@ export interface operations {
};
};
};
/**
* Update process runtime configuration.
* @description Replaces the runtime configuration for the process management API.
* Validates that all values are non-zero and clamps default timeout to max.
*/
post_v1_processes_config: {
requestBody: {
content: {
@ -983,6 +1053,11 @@ export interface operations {
};
};
};
/**
* Run a one-shot command.
* @description Executes a command to completion and returns its stdout, stderr, exit code,
* and duration. Supports configurable timeout and output size limits.
*/
post_v1_processes_run: {
requestBody: {
content: {
@ -1010,6 +1085,11 @@ export interface operations {
};
};
};
/**
* Get a single process by ID.
* @description Returns the current state of a managed process including its status,
* PID, exit code, and creation/exit timestamps.
*/
get_v1_process: {
parameters: {
path: {
@ -1038,6 +1118,11 @@ export interface operations {
};
};
};
/**
* Delete a process record.
* @description Removes a stopped process from the runtime. Returns 409 if the process
* is still running; stop or kill it first.
*/
delete_v1_process: {
parameters: {
path: {
@ -1070,6 +1155,12 @@ export interface operations {
};
};
};
/**
* Write input to a process.
* @description Sends data to a process's stdin (pipe mode) or PTY writer (tty mode).
* Data can be encoded as base64, utf8, or text. Returns 413 if the decoded
* payload exceeds the configured `maxInputBytesPerRequest` limit.
*/
post_v1_process_input: {
parameters: {
path: {
@ -1115,6 +1206,11 @@ export interface operations {
};
};
};
/**
* Send SIGKILL to a process.
* @description Sends SIGKILL to the process and optionally waits up to `waitMs`
* milliseconds for the process to exit before returning.
*/
post_v1_process_kill: {
parameters: {
query?: {
@ -1147,6 +1243,12 @@ export interface operations {
};
};
};
/**
* Fetch process logs.
* @description Returns buffered log entries for a process. Supports filtering by stream
* type, tail count, and sequence-based resumption. When `follow=true`,
* returns an SSE stream that replays buffered entries then streams live output.
*/
get_v1_process_logs: {
parameters: {
query?: {
@ -1185,6 +1287,11 @@ export interface operations {
};
};
};
/**
* Send SIGTERM to a process.
* @description Sends SIGTERM to the process and optionally waits up to `waitMs`
* milliseconds for the process to exit before returning.
*/
post_v1_process_stop: {
parameters: {
query?: {
@ -1217,51 +1324,14 @@ export interface operations {
};
};
};
post_v1_process_terminal_resize: {
parameters: {
path: {
/** @description Process ID */
id: string;
};
};
requestBody: {
content: {
"application/json": components["schemas"]["ProcessTerminalResizeRequest"];
};
};
responses: {
/** @description Resize accepted */
200: {
content: {
"application/json": components["schemas"]["ProcessTerminalResizeResponse"];
};
};
/** @description Invalid request */
400: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Unknown process */
404: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Not a terminal process */
409: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
/**
* Open an interactive WebSocket terminal session.
* @description Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts
* `access_token` query param for browser-based auth (WebSocket API cannot
* send custom headers). Uses the `channel.k8s.io` binary subprotocol:
* channel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize,
* and channel 255 close.
*/
get_v1_process_terminal_ws: {
parameters: {
query?: {

View file

@ -1,5 +1,6 @@
export {
LiveAcpConnection,
ProcessTerminalSession,
SandboxAgent,
SandboxAgentError,
Session,
@ -15,6 +16,7 @@ export type {
ProcessLogListener,
ProcessLogSubscription,
ProcessTerminalConnectOptions,
ProcessTerminalSessionOptions,
ProcessTerminalWebSocketUrlOptions,
SandboxAgentConnectOptions,
SandboxAgentStartOptions,
@ -28,6 +30,7 @@ export type { InspectorUrlOptions } from "./inspector.ts";
export {
InMemorySessionPersistDriver,
TerminalChannel,
} from "./types.ts";
export type {
@ -72,18 +75,16 @@ export type {
ProcessRunResponse,
ProcessSignalQuery,
ProcessState,
ProcessTerminalClientFrame,
ProcessTerminalErrorFrame,
ProcessTerminalExitFrame,
ProcessTerminalReadyFrame,
ProcessTerminalResizeRequest,
ProcessTerminalResizeResponse,
ProcessTerminalServerFrame,
SessionEvent,
SessionPersistDriver,
SessionRecord,
SkillsConfig,
SkillsConfigQuery,
TerminalErrorStatus,
TerminalExitStatus,
TerminalReadyStatus,
TerminalResizePayload,
TerminalStatusMessage,
} from "./types.ts";
export type {

View file

@ -46,43 +46,40 @@ export type ProcessRunRequest = JsonRequestBody<operations["post_v1_processes_ru
export type ProcessRunResponse = JsonResponse<operations["post_v1_processes_run"], 200>;
export type ProcessSignalQuery = QueryParams<operations["post_v1_process_stop"]>;
export type ProcessState = components["schemas"]["ProcessState"];
export type ProcessTerminalResizeRequest = JsonRequestBody<operations["post_v1_process_terminal_resize"]>;
export type ProcessTerminalResizeResponse = JsonResponse<operations["post_v1_process_terminal_resize"], 200>;
export type ProcessTerminalClientFrame =
| {
type: "input";
data: string;
encoding?: string;
}
| {
type: "resize";
cols: number;
rows: number;
}
| {
type: "close";
};
export const TerminalChannel = {
stdin: 0,
stdout: 1,
stderr: 2,
status: 3,
resize: 4,
close: 255,
} as const;
export interface ProcessTerminalReadyFrame {
export interface TerminalReadyStatus {
type: "ready";
processId: string;
}
export interface ProcessTerminalExitFrame {
export interface TerminalExitStatus {
type: "exit";
exitCode?: number | null;
}
export interface ProcessTerminalErrorFrame {
export interface TerminalErrorStatus {
type: "error";
message: string;
}
export type ProcessTerminalServerFrame =
| ProcessTerminalReadyFrame
| ProcessTerminalExitFrame
| ProcessTerminalErrorFrame;
export type TerminalStatusMessage =
| TerminalReadyStatus
| TerminalExitStatus
| TerminalErrorStatus;
export interface TerminalResizePayload {
cols: number;
rows: number;
}
export interface SessionRecord {
id: string;

View file

@ -136,22 +136,6 @@ function writeTarChecksum(buffer: Buffer, checksum: number): void {
buffer[155] = 0x20;
}
function decodeSocketPayload(data: unknown): string {
if (typeof data === "string") {
return data;
}
if (data instanceof ArrayBuffer) {
return Buffer.from(data).toString("utf8");
}
if (ArrayBuffer.isView(data)) {
return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString("utf8");
}
if (typeof Blob !== "undefined" && data instanceof Blob) {
throw new Error("Blob socket payloads are not supported in this test");
}
throw new Error(`Unsupported socket payload type: ${typeof data}`);
}
function decodeProcessLogData(data: string, encoding: string): string {
if (encoding === "base64") {
return Buffer.from(data, "base64").toString("utf8");
@ -582,47 +566,53 @@ describe("Integration: TypeScript SDK flat session API", () => {
});
ttyProcessId = ttyProcess.id;
const resized = await sdk.resizeProcessTerminal(ttyProcess.id, {
cols: 120,
rows: 40,
});
expect(resized.cols).toBe(120);
expect(resized.rows).toBe(40);
const wsUrl = sdk.buildProcessTerminalWebSocketUrl(ttyProcess.id);
expect(wsUrl.startsWith("ws://") || wsUrl.startsWith("wss://")).toBe(true);
const ws = sdk.connectProcessTerminalWebSocket(ttyProcess.id, {
const session = sdk.connectProcessTerminal(ttyProcess.id, {
WebSocket: WebSocket as unknown as typeof globalThis.WebSocket,
});
ws.binaryType = "arraybuffer";
const readyFrames: string[] = [];
const ttyOutput: string[] = [];
const exitFrames: Array<number | null | undefined> = [];
const terminalErrors: string[] = [];
let closeCount = 0;
const socketTextFrames: string[] = [];
const socketBinaryFrames: string[] = [];
ws.addEventListener("message", (event) => {
if (typeof event.data === "string") {
socketTextFrames.push(event.data);
return;
}
socketBinaryFrames.push(decodeSocketPayload(event.data));
session.onReady((status) => {
readyFrames.push(status.processId);
});
session.onData((bytes) => {
ttyOutput.push(Buffer.from(bytes).toString("utf8"));
});
session.onExit((status) => {
exitFrames.push(status.exitCode);
});
session.onError((error) => {
terminalErrors.push(error instanceof Error ? error.message : error.message);
});
session.onClose(() => {
closeCount += 1;
});
await waitFor(() => {
const ready = socketTextFrames.find((frame) => frame.includes('"type":"ready"'));
return ready;
await waitFor(() => readyFrames[0]);
session.resize({
cols: 120,
rows: 40,
});
ws.send(JSON.stringify({
type: "input",
data: "hello tty\n",
}));
session.sendInput("hello tty\n");
await waitFor(() => {
const joined = socketBinaryFrames.join("");
const joined = ttyOutput.join("");
return joined.includes("hello tty") ? joined : undefined;
});
ws.close();
session.close();
await session.closed;
expect(closeCount).toBeGreaterThan(0);
expect(exitFrames).toHaveLength(0);
expect(terminalErrors).toEqual([]);
await waitForAsync(async () => {
const processInfo = await sdk.getProcess(ttyProcess.id);
return processInfo.status === "running" ? processInfo : undefined;

View file

@ -8,7 +8,7 @@ use base64::Engine;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::{broadcast, Mutex, RwLock};
use tokio::sync::{broadcast, Mutex, RwLock, Semaphore};
use sandbox_agent_error::SandboxError;
@ -119,6 +119,7 @@ pub struct ProcessRuntime {
struct ProcessRuntimeInner {
next_id: AtomicU64,
processes: RwLock<HashMap<String, Arc<ManagedProcess>>>,
run_once_semaphore: Semaphore,
}
#[derive(Debug)]
@ -182,6 +183,9 @@ impl ProcessRuntime {
inner: Arc::new(ProcessRuntimeInner {
next_id: AtomicU64::new(1),
processes: RwLock::new(HashMap::new()),
run_once_semaphore: Semaphore::new(
ProcessRuntimeConfig::default().max_concurrent_processes,
),
}),
}
}
@ -324,6 +328,14 @@ impl ProcessRuntime {
});
}
let _permit =
self.inner
.run_once_semaphore
.try_acquire()
.map_err(|_| SandboxError::Conflict {
message: "too many concurrent run_once operations".to_string(),
})?;
let config = self.get_config().await;
let mut timeout_ms = spec.timeout_ms.unwrap_or(config.default_run_timeout_ms);
if timeout_ms == 0 {
@ -331,7 +343,10 @@ impl ProcessRuntime {
}
timeout_ms = timeout_ms.min(config.max_run_timeout_ms);
let max_output_bytes = spec.max_output_bytes.unwrap_or(config.max_output_bytes);
let max_output_bytes = spec
.max_output_bytes
.unwrap_or(config.max_output_bytes)
.min(config.max_output_bytes);
let mut cmd = Command::new(&spec.command);
cmd.args(&spec.args)

View file

@ -50,6 +50,12 @@ pub use self::types::*;
const APPLICATION_JSON: &str = "application/json";
const TEXT_EVENT_STREAM: &str = "text/event-stream";
const CHANNEL_K8S_IO_PROTOCOL: &str = "channel.k8s.io";
const CH_STDIN: u8 = 0;
const CH_STDOUT: u8 = 1;
const CH_STATUS: u8 = 3;
const CH_RESIZE: u8 = 4;
const CH_CLOSE: u8 = 255;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum BrandingMode {
@ -196,10 +202,6 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
.route("/processes/:id/kill", post(post_v1_process_kill))
.route("/processes/:id/logs", get(get_v1_process_logs))
.route("/processes/:id/input", post(post_v1_process_input))
.route(
"/processes/:id/terminal/resize",
post(post_v1_process_terminal_resize),
)
.route(
"/processes/:id/terminal/ws",
get(get_v1_process_terminal_ws),
@ -344,7 +346,6 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
delete_v1_process,
get_v1_process_logs,
post_v1_process_input,
post_v1_process_terminal_resize,
get_v1_process_terminal_ws,
get_v1_config_mcp,
put_v1_config_mcp,
@ -394,8 +395,6 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
ProcessInputRequest,
ProcessInputResponse,
ProcessSignalQuery,
ProcessTerminalResizeRequest,
ProcessTerminalResizeResponse,
AcpPostQuery,
AcpServerInfo,
AcpServerListResponse,
@ -1602,51 +1601,13 @@ async fn post_v1_process_input(
Ok(Json(ProcessInputResponse { bytes_written }))
}
/// Resize a process terminal.
///
/// Sets the PTY window size (columns and rows) for a tty-mode process and
/// sends SIGWINCH so the child process can adapt.
#[utoipa::path(
post,
path = "/v1/processes/{id}/terminal/resize",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
request_body = ProcessTerminalResizeRequest,
responses(
(status = 200, description = "Resize accepted", body = ProcessTerminalResizeResponse),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 409, description = "Not a terminal process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_terminal_resize(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Json(body): Json<ProcessTerminalResizeRequest>,
) -> Result<Json<ProcessTerminalResizeResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
state
.process_runtime()
.resize_terminal(&id, body.cols, body.rows)
.await?;
Ok(Json(ProcessTerminalResizeResponse {
cols: body.cols,
rows: body.rows,
}))
}
/// Open an interactive WebSocket terminal session.
///
/// Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts
/// `access_token` query param for browser-based auth (WebSocket API cannot
/// send custom headers). Streams raw PTY output as binary frames and accepts
/// JSON control frames for input, resize, and close.
/// send custom headers). Uses the `channel.k8s.io` binary subprotocol:
/// channel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize,
/// and channel 255 close.
#[utoipa::path(
get,
path = "/v1/processes/{id}/terminal/ws",
@ -1682,23 +1643,16 @@ async fn get_v1_process_terminal_ws(
}
Ok(ws
.protocols([CHANNEL_K8S_IO_PROTOCOL])
.on_upgrade(move |socket| process_terminal_ws_session(socket, runtime, id))
.into_response())
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
enum TerminalClientFrame {
Input {
data: String,
#[serde(default)]
encoding: Option<String>,
},
Resize {
cols: u16,
rows: u16,
},
Close,
#[serde(rename_all = "camelCase")]
struct TerminalResizePayload {
cols: u16,
rows: u16,
}
async fn process_terminal_ws_session(
@ -1706,7 +1660,7 @@ async fn process_terminal_ws_session(
runtime: Arc<ProcessRuntime>,
id: String,
) {
let _ = send_ws_json(
let _ = send_status_json(
&mut socket,
json!({
"type": "ready",
@ -1718,7 +1672,8 @@ async fn process_terminal_ws_session(
let mut log_rx = match runtime.subscribe_logs(&id).await {
Ok(rx) => rx,
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
let _ = send_status_error(&mut socket, &err.to_string()).await;
let _ = send_close_signal(&mut socket).await;
let _ = socket.close().await;
return;
}
@ -1729,43 +1684,57 @@ async fn process_terminal_ws_session(
tokio::select! {
ws_in = socket.recv() => {
match ws_in {
Some(Ok(Message::Binary(_))) => {
let _ = send_ws_error(&mut socket, "binary input is not supported; use text JSON frames").await;
}
Some(Ok(Message::Text(text))) => {
let parsed = serde_json::from_str::<TerminalClientFrame>(&text);
match parsed {
Ok(TerminalClientFrame::Input { data, encoding }) => {
let input = match decode_input_bytes(&data, encoding.as_deref().unwrap_or("utf8")) {
Ok(input) => input,
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
continue;
}
};
Some(Ok(Message::Binary(bytes))) => {
let Some((&channel, payload)) = bytes.split_first() else {
let _ = send_status_error(&mut socket, "invalid terminal frame: missing channel byte").await;
continue;
};
match channel {
CH_STDIN => {
let input = payload.to_vec();
let max_input = runtime.max_input_bytes().await;
if input.len() > max_input {
let _ = send_ws_error(&mut socket, &format!("input payload exceeds maxInputBytesPerRequest ({max_input})")).await;
let _ = send_status_error(&mut socket, &format!("input payload exceeds maxInputBytesPerRequest ({max_input})")).await;
continue;
}
if let Err(err) = runtime.write_input(&id, &input).await {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
let _ = send_status_error(&mut socket, &err.to_string()).await;
}
}
Ok(TerminalClientFrame::Resize { cols, rows }) => {
if let Err(err) = runtime.resize_terminal(&id, cols, rows).await {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
CH_RESIZE => {
let resize = match serde_json::from_slice::<TerminalResizePayload>(payload) {
Ok(resize) => resize,
Err(err) => {
let _ = send_status_error(&mut socket, &format!("invalid resize payload: {err}")).await;
continue;
}
};
if let Err(err) = runtime
.resize_terminal(&id, resize.cols, resize.rows)
.await
{
let _ = send_status_error(&mut socket, &err.to_string()).await;
}
}
Ok(TerminalClientFrame::Close) => {
CH_CLOSE => {
let _ = send_close_signal(&mut socket).await;
let _ = socket.close().await;
break;
}
Err(err) => {
let _ = send_ws_error(&mut socket, &format!("invalid terminal frame: {err}")).await;
_ => {
let _ = send_status_error(&mut socket, &format!("unsupported terminal channel: {channel}")).await;
}
}
}
Some(Ok(Message::Text(_))) => {
let _ = send_status_error(
&mut socket,
"text frames are not supported; use channel.k8s.io binary frames",
)
.await;
}
Some(Ok(Message::Ping(payload))) => {
let _ = socket.send(Message::Pong(payload)).await;
}
@ -1785,7 +1754,7 @@ async fn process_terminal_ws_session(
use base64::Engine;
BASE64_ENGINE.decode(&line.data).unwrap_or_default()
};
if socket.send(Message::Binary(bytes)).await.is_err() {
if send_channel_frame(&mut socket, CH_STDOUT, bytes).await.is_err() {
break;
}
}
@ -1796,7 +1765,7 @@ async fn process_terminal_ws_session(
_ = exit_poll.tick() => {
if let Ok(snapshot) = runtime.snapshot(&id).await {
if snapshot.status == ProcessStatus::Exited {
let _ = send_ws_json(
let _ = send_status_json(
&mut socket,
json!({
"type": "exit",
@ -1804,6 +1773,7 @@ async fn process_terminal_ws_session(
}),
)
.await;
let _ = send_close_signal(&mut socket).await;
let _ = socket.close().await;
break;
}
@ -1813,17 +1783,30 @@ async fn process_terminal_ws_session(
}
}
async fn send_ws_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {
async fn send_channel_frame(
socket: &mut WebSocket,
channel: u8,
payload: impl Into<Vec<u8>>,
) -> Result<(), ()> {
let mut frame = vec![channel];
frame.extend(payload.into());
socket
.send(Message::Text(
serde_json::to_string(&payload).map_err(|_| ())?,
))
.send(Message::Binary(frame.into()))
.await
.map_err(|_| ())
}
async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> {
send_ws_json(
async fn send_status_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {
send_channel_frame(
socket,
CH_STATUS,
serde_json::to_vec(&payload).map_err(|_| ())?,
)
.await
}
async fn send_status_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> {
send_status_json(
socket,
json!({
"type": "error",
@ -1833,6 +1816,10 @@ async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()>
.await
}
async fn send_close_signal(socket: &mut WebSocket) -> Result<(), ()> {
send_channel_frame(socket, CH_CLOSE, Vec::<u8>::new()).await
}
#[utoipa::path(
get,
path = "/v1/config/mcp",

View file

@ -512,20 +512,6 @@ pub struct ProcessSignalQuery {
pub wait_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessTerminalResizeRequest {
pub cols: u16,
pub rows: u16,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessTerminalResizeResponse {
pub cols: u16,
pub rows: u16,
}
#[derive(Debug, Clone, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessWsQuery {

View file

@ -3,8 +3,17 @@ use base64::engine::general_purpose::STANDARD as BASE64;
use base64::Engine;
use futures::{SinkExt, StreamExt};
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::http::HeaderValue;
use tokio_tungstenite::tungstenite::Message;
const CHANNEL_K8S_IO_PROTOCOL: &str = "channel.k8s.io";
const CH_STDIN: u8 = 0;
const CH_STDOUT: u8 = 1;
const CH_STATUS: u8 = 3;
const CH_RESIZE: u8 = 4;
const CH_CLOSE: u8 = 255;
async fn wait_for_exited(test_app: &TestApp, process_id: &str) {
for _ in 0..30 {
let (status, _, body) = send_request(
@ -48,6 +57,19 @@ async fn recv_ws_message(
.expect("websocket frame")
}
fn make_channel_frame(channel: u8, payload: impl AsRef<[u8]>) -> Vec<u8> {
let payload = payload.as_ref();
let mut frame = Vec::with_capacity(payload.len() + 1);
frame.push(channel);
frame.extend_from_slice(payload);
frame
}
fn parse_channel_frame(bytes: &[u8]) -> (u8, &[u8]) {
let (&channel, payload) = bytes.split_first().expect("channel frame");
(channel, payload)
}
#[tokio::test]
async fn v1_processes_config_round_trip() {
let test_app = TestApp::new(AuthConfig::disabled());
@ -519,59 +541,84 @@ async fn v1_process_terminal_ws_e2e_is_deterministic() {
.expect("create process response");
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
let create_body: Value = create_response.json().await.expect("create process json");
let process_id = create_body["id"]
.as_str()
.expect("process id")
.to_string();
let process_id = create_body["id"].as_str().expect("process id").to_string();
let ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
let (mut ws, _) = connect_async(&ws_url)
.await
.expect("connect websocket");
let mut ws_request = ws_url.into_client_request().expect("ws request");
ws_request.headers_mut().insert(
"Sec-WebSocket-Protocol",
HeaderValue::from_static(CHANNEL_K8S_IO_PROTOCOL),
);
let (mut ws, response) = connect_async(ws_request).await.expect("connect websocket");
assert_eq!(
response
.headers()
.get("Sec-WebSocket-Protocol")
.and_then(|value| value.to_str().ok()),
Some(CHANNEL_K8S_IO_PROTOCOL)
);
let ready = recv_ws_message(&mut ws).await;
let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame"))
.expect("ready json");
let ready_bytes = ready.into_data();
let (ready_channel, ready_payload) = parse_channel_frame(&ready_bytes);
assert_eq!(ready_channel, CH_STATUS);
let ready_payload: Value = serde_json::from_slice(ready_payload).expect("ready json");
assert_eq!(ready_payload["type"], "ready");
assert_eq!(ready_payload["processId"], process_id);
ws.send(Message::Text(
json!({
"type": "input",
"data": "hello from ws\n"
})
.to_string(),
ws.send(Message::Binary(
make_channel_frame(CH_STDIN, b"hello from ws\n").into(),
))
.await
.expect("send input frame");
let mut saw_binary_output = false;
ws.send(Message::Binary(
make_channel_frame(CH_RESIZE, br#"{"cols":120,"rows":40}"#).into(),
))
.await
.expect("send resize frame");
let mut saw_stdout = false;
let mut saw_exit = false;
let mut saw_close = false;
for _ in 0..10 {
let frame = recv_ws_message(&mut ws).await;
match frame {
Message::Binary(bytes) => {
let text = String::from_utf8_lossy(&bytes);
if text.contains("got:hello from ws") {
saw_binary_output = true;
let (channel, payload) = parse_channel_frame(&bytes);
match channel {
CH_STDOUT => {
let text = String::from_utf8_lossy(payload);
if text.contains("got:hello from ws") {
saw_stdout = true;
}
}
CH_STATUS => {
let payload: Value =
serde_json::from_slice(payload).expect("ws status json");
if payload["type"] == "exit" {
saw_exit = true;
} else {
assert_ne!(payload["type"], "error");
}
}
CH_CLOSE => {
assert!(payload.is_empty(), "close channel payload must be empty");
saw_close = true;
break;
}
other => panic!("unexpected websocket channel: {other}"),
}
}
Message::Text(text) => {
let payload: Value = serde_json::from_str(&text).expect("ws json");
if payload["type"] == "exit" {
saw_exit = true;
break;
}
assert_ne!(payload["type"], "error");
}
Message::Close(_) => break,
Message::Ping(_) | Message::Pong(_) => {}
_ => {}
}
}
assert!(saw_binary_output, "expected pty binary output over websocket");
assert!(saw_exit, "expected exit control frame over websocket");
assert!(saw_stdout, "expected pty stdout over websocket");
assert!(saw_exit, "expected exit status frame over websocket");
assert!(saw_close, "expected close channel frame over websocket");
let _ = ws.close(None).await;
@ -605,10 +652,7 @@ async fn v1_process_terminal_ws_auth_e2e() {
.expect("create process response");
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
let create_body: Value = create_response.json().await.expect("create process json");
let process_id = create_body["id"]
.as_str()
.expect("process id")
.to_string();
let process_id = create_body["id"].as_str().expect("process id").to_string();
let unauth_ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
let unauth_err = connect_async(&unauth_ws_url)
@ -624,25 +668,42 @@ async fn v1_process_terminal_ws_auth_e2e() {
let auth_ws_url = live_server.ws_url(&format!(
"/v1/processes/{process_id}/terminal/ws?access_token={token}"
));
let (mut ws, _) = connect_async(&auth_ws_url)
let mut ws_request = auth_ws_url.into_client_request().expect("ws request");
ws_request.headers_mut().insert(
"Sec-WebSocket-Protocol",
HeaderValue::from_static(CHANNEL_K8S_IO_PROTOCOL),
);
let (mut ws, response) = connect_async(ws_request)
.await
.expect("authenticated websocket handshake");
assert_eq!(
response
.headers()
.get("Sec-WebSocket-Protocol")
.and_then(|value| value.to_str().ok()),
Some(CHANNEL_K8S_IO_PROTOCOL)
);
let ready = recv_ws_message(&mut ws).await;
let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame"))
.expect("ready json");
let ready_bytes = ready.into_data();
let (ready_channel, ready_payload) = parse_channel_frame(&ready_bytes);
assert_eq!(ready_channel, CH_STATUS);
let ready_payload: Value = serde_json::from_slice(ready_payload).expect("ready json");
assert_eq!(ready_payload["type"], "ready");
assert_eq!(ready_payload["processId"], process_id);
let _ = ws
.send(Message::Text(json!({ "type": "close" }).to_string()))
.send(Message::Binary(make_channel_frame(CH_CLOSE, []).into()))
.await;
let close = recv_ws_message(&mut ws).await;
let close_bytes = close.into_data();
let (close_channel, close_payload) = parse_channel_frame(&close_bytes);
assert_eq!(close_channel, CH_CLOSE);
assert!(close_payload.is_empty());
let _ = ws.close(None).await;
let kill_response = http
.post(live_server.http_url(&format!(
"/v1/processes/{process_id}/kill?waitMs=1000"
)))
.post(live_server.http_url(&format!("/v1/processes/{process_id}/kill?waitMs=1000")))
.bearer_auth(token)
.send()
.await