mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-15 07:04:48 +00:00
feat: refine process API — WebSocket binary protocol, SDK terminal session, updated tests
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
6c91323ca6
commit
636eefb553
11 changed files with 700 additions and 512 deletions
|
|
@ -954,6 +954,8 @@
|
|||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "List all managed processes.",
|
||||
"description": "Returns a list of all processes (running and exited) currently tracked\nby the runtime, sorted by process ID.",
|
||||
"operationId": "get_v1_processes",
|
||||
"responses": {
|
||||
"200": {
|
||||
|
|
@ -982,6 +984,8 @@
|
|||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "Create a long-lived managed process.",
|
||||
"description": "Spawns a new process with the given command and arguments. Supports both\npipe-based and PTY (tty) modes. Returns the process descriptor on success.",
|
||||
"operationId": "post_v1_processes",
|
||||
"requestBody": {
|
||||
"content": {
|
||||
|
|
@ -1042,6 +1046,8 @@
|
|||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "Get process runtime configuration.",
|
||||
"description": "Returns the current runtime configuration for the process management API,\nincluding limits for concurrency, timeouts, and buffer sizes.",
|
||||
"operationId": "get_v1_processes_config",
|
||||
"responses": {
|
||||
"200": {
|
||||
|
|
@ -1070,6 +1076,8 @@
|
|||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "Update process runtime configuration.",
|
||||
"description": "Replaces the runtime configuration for the process management API.\nValidates that all values are non-zero and clamps default timeout to max.",
|
||||
"operationId": "post_v1_processes_config",
|
||||
"requestBody": {
|
||||
"content": {
|
||||
|
|
@ -1120,6 +1128,8 @@
|
|||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "Run a one-shot command.",
|
||||
"description": "Executes a command to completion and returns its stdout, stderr, exit code,\nand duration. Supports configurable timeout and output size limits.",
|
||||
"operationId": "post_v1_processes_run",
|
||||
"requestBody": {
|
||||
"content": {
|
||||
|
|
@ -1170,6 +1180,8 @@
|
|||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "Get a single process by ID.",
|
||||
"description": "Returns the current state of a managed process including its status,\nPID, exit code, and creation/exit timestamps.",
|
||||
"operationId": "get_v1_process",
|
||||
"parameters": [
|
||||
{
|
||||
|
|
@ -1219,6 +1231,8 @@
|
|||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "Delete a process record.",
|
||||
"description": "Removes a stopped process from the runtime. Returns 409 if the process\nis still running; stop or kill it first.",
|
||||
"operationId": "delete_v1_process",
|
||||
"parameters": [
|
||||
{
|
||||
|
|
@ -1273,6 +1287,8 @@
|
|||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "Write input to a process.",
|
||||
"description": "Sends data to a process's stdin (pipe mode) or PTY writer (tty mode).\nData can be encoded as base64, utf8, or text. Returns 413 if the decoded\npayload exceeds the configured `maxInputBytesPerRequest` limit.",
|
||||
"operationId": "post_v1_process_input",
|
||||
"parameters": [
|
||||
{
|
||||
|
|
@ -1354,6 +1370,8 @@
|
|||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "Send SIGKILL to a process.",
|
||||
"description": "Sends SIGKILL to the process and optionally waits up to `waitMs`\nmilliseconds for the process to exit before returning.",
|
||||
"operationId": "post_v1_process_kill",
|
||||
"parameters": [
|
||||
{
|
||||
|
|
@ -1417,6 +1435,8 @@
|
|||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "Fetch process logs.",
|
||||
"description": "Returns buffered log entries for a process. Supports filtering by stream\ntype, tail count, and sequence-based resumption. When `follow=true`,\nreturns an SSE stream that replays buffered entries then streams live output.",
|
||||
"operationId": "get_v1_process_logs",
|
||||
"parameters": [
|
||||
{
|
||||
|
|
@ -1515,6 +1535,8 @@
|
|||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "Send SIGTERM to a process.",
|
||||
"description": "Sends SIGTERM to the process and optionally waits up to `waitMs`\nmilliseconds for the process to exit before returning.",
|
||||
"operationId": "post_v1_process_stop",
|
||||
"parameters": [
|
||||
{
|
||||
|
|
@ -1573,92 +1595,13 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"/v1/processes/{id}/terminal/resize": {
|
||||
"post": {
|
||||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"operationId": "post_v1_process_terminal_resize",
|
||||
"parameters": [
|
||||
{
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"description": "Process ID",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
],
|
||||
"requestBody": {
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/ProcessTerminalResizeRequest"
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": true
|
||||
},
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Resize accepted",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/ProcessTerminalResizeResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "Invalid request",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/ProblemDetails"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "Unknown process",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/ProblemDetails"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"409": {
|
||||
"description": "Not a terminal process",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/ProblemDetails"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"501": {
|
||||
"description": "Process API unsupported on this platform",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/ProblemDetails"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v1/processes/{id}/terminal/ws": {
|
||||
"get": {
|
||||
"tags": [
|
||||
"v1"
|
||||
],
|
||||
"summary": "Open an interactive WebSocket terminal session.",
|
||||
"description": "Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts\n`access_token` query param for browser-based auth (WebSocket API cannot\nsend custom headers). Uses the `channel.k8s.io` binary subprotocol:\nchannel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize,\nand channel 255 close.",
|
||||
"operationId": "get_v1_process_terminal_ws",
|
||||
"parameters": [
|
||||
{
|
||||
|
|
@ -2013,6 +1956,7 @@
|
|||
"permission_denied",
|
||||
"not_acceptable",
|
||||
"unsupported_media_type",
|
||||
"not_found",
|
||||
"session_not_found",
|
||||
"session_already_exists",
|
||||
"mode_not_supported",
|
||||
|
|
@ -2730,44 +2674,6 @@
|
|||
"exited"
|
||||
]
|
||||
},
|
||||
"ProcessTerminalResizeRequest": {
|
||||
"type": "object",
|
||||
"required": [
|
||||
"cols",
|
||||
"rows"
|
||||
],
|
||||
"properties": {
|
||||
"cols": {
|
||||
"type": "integer",
|
||||
"format": "int32",
|
||||
"minimum": 0
|
||||
},
|
||||
"rows": {
|
||||
"type": "integer",
|
||||
"format": "int32",
|
||||
"minimum": 0
|
||||
}
|
||||
}
|
||||
},
|
||||
"ProcessTerminalResizeResponse": {
|
||||
"type": "object",
|
||||
"required": [
|
||||
"cols",
|
||||
"rows"
|
||||
],
|
||||
"properties": {
|
||||
"cols": {
|
||||
"type": "integer",
|
||||
"format": "int32",
|
||||
"minimum": 0
|
||||
},
|
||||
"rows": {
|
||||
"type": "integer",
|
||||
"format": "int32",
|
||||
"minimum": 0
|
||||
}
|
||||
}
|
||||
},
|
||||
"ServerStatus": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import { AlertCircle, Loader2, PlugZap, SquareTerminal } from "lucide-react";
|
||||
import { FitAddon, Terminal, init } from "ghostty-web";
|
||||
import { useEffect, useRef, useState } from "react";
|
||||
import type { ProcessTerminalServerFrame, SandboxAgent } from "sandbox-agent";
|
||||
import type { SandboxAgent } from "sandbox-agent";
|
||||
|
||||
type ConnectionState = "connecting" | "ready" | "closed" | "error";
|
||||
|
||||
|
|
@ -29,21 +29,6 @@ const terminalTheme = {
|
|||
brightWhite: "#fafafa",
|
||||
};
|
||||
|
||||
const toUint8Array = async (data: Blob | ArrayBuffer): Promise<Uint8Array> => {
|
||||
if (data instanceof ArrayBuffer) {
|
||||
return new Uint8Array(data);
|
||||
}
|
||||
return new Uint8Array(await data.arrayBuffer());
|
||||
};
|
||||
|
||||
const isServerFrame = (value: unknown): value is ProcessTerminalServerFrame => {
|
||||
if (!value || typeof value !== "object") {
|
||||
return false;
|
||||
}
|
||||
const type = (value as { type?: unknown }).type;
|
||||
return type === "ready" || type === "exit" || type === "error";
|
||||
};
|
||||
|
||||
const GhosttyTerminal = ({
|
||||
client,
|
||||
processId,
|
||||
|
|
@ -62,24 +47,16 @@ const GhosttyTerminal = ({
|
|||
let cancelled = false;
|
||||
let terminal: Terminal | null = null;
|
||||
let fitAddon: FitAddon | null = null;
|
||||
let socket: WebSocket | null = null;
|
||||
let session: ReturnType<SandboxAgent["connectProcessTerminal"]> | null = null;
|
||||
let resizeRaf = 0;
|
||||
let removeDataListener: { dispose(): void } | null = null;
|
||||
let removeResizeListener: { dispose(): void } | null = null;
|
||||
|
||||
const sendFrame = (payload: unknown) => {
|
||||
if (!socket || socket.readyState !== WebSocket.OPEN) {
|
||||
return;
|
||||
}
|
||||
socket.send(JSON.stringify(payload));
|
||||
};
|
||||
|
||||
const syncSize = () => {
|
||||
if (!terminal) {
|
||||
if (!terminal || !session) {
|
||||
return;
|
||||
}
|
||||
sendFrame({
|
||||
type: "resize",
|
||||
session.resize({
|
||||
cols: terminal.cols,
|
||||
rows: terminal.rows,
|
||||
});
|
||||
|
|
@ -110,7 +87,7 @@ const GhosttyTerminal = ({
|
|||
terminal.focus();
|
||||
|
||||
removeDataListener = terminal.onData((data) => {
|
||||
sendFrame({ type: "input", data });
|
||||
session?.sendInput(data);
|
||||
});
|
||||
|
||||
removeResizeListener = terminal.onResize(() => {
|
||||
|
|
@ -120,38 +97,29 @@ const GhosttyTerminal = ({
|
|||
resizeRaf = window.requestAnimationFrame(syncSize);
|
||||
});
|
||||
|
||||
const nextSocket = client.connectProcessTerminalWebSocket(processId);
|
||||
socket = nextSocket;
|
||||
nextSocket.binaryType = "arraybuffer";
|
||||
const nextSession = client.connectProcessTerminal(processId);
|
||||
session = nextSession;
|
||||
|
||||
const tryParseControlFrame = (raw: string | ArrayBuffer | Blob): ProcessTerminalServerFrame | null => {
|
||||
let text: string | undefined;
|
||||
if (typeof raw === "string") {
|
||||
text = raw;
|
||||
} else if (raw instanceof ArrayBuffer) {
|
||||
// Server may send JSON control frames as binary; try to decode small messages as JSON.
|
||||
if (raw.byteLength < 256) {
|
||||
try {
|
||||
text = new TextDecoder().decode(raw);
|
||||
} catch {
|
||||
// not decodable, treat as terminal data
|
||||
}
|
||||
}
|
||||
nextSession.onReady((frame) => {
|
||||
if (cancelled) {
|
||||
return;
|
||||
}
|
||||
if (!text) return null;
|
||||
try {
|
||||
const parsed = JSON.parse(text);
|
||||
return isServerFrame(parsed) ? parsed : null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
const handleControlFrame = (frame: ProcessTerminalServerFrame): void => {
|
||||
if (frame.type === "ready") {
|
||||
setConnectionState("ready");
|
||||
setStatusMessage("Connected");
|
||||
syncSize();
|
||||
}
|
||||
});
|
||||
|
||||
nextSession.onData((bytes) => {
|
||||
if (cancelled || !terminal) {
|
||||
return;
|
||||
}
|
||||
terminal.write(bytes);
|
||||
});
|
||||
|
||||
nextSession.onExit((frame) => {
|
||||
if (cancelled) {
|
||||
return;
|
||||
}
|
||||
if (frame.type === "exit") {
|
||||
|
|
@ -161,47 +129,24 @@ const GhosttyTerminal = ({
|
|||
frame.exitCode == null ? "Process exited." : `Process exited with code ${frame.exitCode}.`
|
||||
);
|
||||
onExit?.();
|
||||
return;
|
||||
}
|
||||
if (frame.type === "error") {
|
||||
setConnectionState("error");
|
||||
setStatusMessage(frame.message);
|
||||
}
|
||||
};
|
||||
|
||||
nextSocket.addEventListener("message", (event) => {
|
||||
if (cancelled || !terminal) {
|
||||
return;
|
||||
}
|
||||
|
||||
const controlFrame = tryParseControlFrame(event.data);
|
||||
if (controlFrame) {
|
||||
handleControlFrame(controlFrame);
|
||||
return;
|
||||
}
|
||||
|
||||
void toUint8Array(event.data).then((bytes) => {
|
||||
if (!cancelled && terminal) {
|
||||
terminal.write(bytes);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
nextSocket.addEventListener("close", () => {
|
||||
nextSession.onError((error) => {
|
||||
if (cancelled) {
|
||||
return;
|
||||
}
|
||||
setConnectionState("error");
|
||||
setStatusMessage(error instanceof Error ? error.message : error.message);
|
||||
});
|
||||
|
||||
nextSession.onClose(() => {
|
||||
if (cancelled) {
|
||||
return;
|
||||
}
|
||||
setConnectionState((current) => (current === "error" ? current : "closed"));
|
||||
setStatusMessage((current) => (current === "Connected" ? "Terminal disconnected." : current));
|
||||
});
|
||||
|
||||
nextSocket.addEventListener("error", () => {
|
||||
if (cancelled) {
|
||||
return;
|
||||
}
|
||||
setConnectionState("error");
|
||||
setStatusMessage("WebSocket connection failed.");
|
||||
});
|
||||
} catch (error) {
|
||||
if (cancelled) {
|
||||
return;
|
||||
|
|
@ -220,15 +165,7 @@ const GhosttyTerminal = ({
|
|||
}
|
||||
removeDataListener?.dispose();
|
||||
removeResizeListener?.dispose();
|
||||
if (socket?.readyState === WebSocket.OPEN) {
|
||||
socket.send(JSON.stringify({ type: "close" }));
|
||||
socket.close();
|
||||
} else if (socket?.readyState === WebSocket.CONNECTING) {
|
||||
const pendingSocket = socket;
|
||||
pendingSocket.addEventListener("open", () => {
|
||||
pendingSocket.close();
|
||||
}, { once: true });
|
||||
}
|
||||
session?.close();
|
||||
terminal?.dispose();
|
||||
};
|
||||
}, [client, onExit, processId]);
|
||||
|
|
|
|||
|
|
@ -51,13 +51,17 @@ import {
|
|||
type ProcessRunRequest,
|
||||
type ProcessRunResponse,
|
||||
type ProcessSignalQuery,
|
||||
type ProcessTerminalResizeRequest,
|
||||
type ProcessTerminalResizeResponse,
|
||||
type SessionEvent,
|
||||
type SessionPersistDriver,
|
||||
type SessionRecord,
|
||||
type SkillsConfig,
|
||||
type SkillsConfigQuery,
|
||||
TerminalChannel,
|
||||
type TerminalErrorStatus,
|
||||
type TerminalExitStatus,
|
||||
type TerminalReadyStatus,
|
||||
type TerminalResizePayload,
|
||||
type TerminalStatusMessage,
|
||||
} from "./types.ts";
|
||||
|
||||
const API_PREFIX = "/v1";
|
||||
|
|
@ -134,6 +138,8 @@ export interface ProcessTerminalConnectOptions extends ProcessTerminalWebSocketU
|
|||
WebSocket?: typeof WebSocket;
|
||||
}
|
||||
|
||||
export type ProcessTerminalSessionOptions = ProcessTerminalConnectOptions;
|
||||
|
||||
export class SandboxAgentError extends Error {
|
||||
readonly status: number;
|
||||
readonly problem?: ProblemDetails;
|
||||
|
|
@ -472,6 +478,188 @@ export class LiveAcpConnection {
|
|||
}
|
||||
}
|
||||
|
||||
export class ProcessTerminalSession {
|
||||
readonly socket: WebSocket;
|
||||
readonly closed: Promise<void>;
|
||||
|
||||
private readonly readyListeners = new Set<(status: TerminalReadyStatus) => void>();
|
||||
private readonly dataListeners = new Set<(data: Uint8Array) => void>();
|
||||
private readonly exitListeners = new Set<(status: TerminalExitStatus) => void>();
|
||||
private readonly errorListeners = new Set<(error: TerminalErrorStatus | Error) => void>();
|
||||
private readonly closeListeners = new Set<() => void>();
|
||||
private readonly textEncoder = new TextEncoder();
|
||||
|
||||
private closeSignalSent = false;
|
||||
private closedResolve!: () => void;
|
||||
|
||||
constructor(socket: WebSocket) {
|
||||
this.socket = socket;
|
||||
this.socket.binaryType = "arraybuffer";
|
||||
this.closed = new Promise<void>((resolve) => {
|
||||
this.closedResolve = resolve;
|
||||
});
|
||||
|
||||
this.socket.addEventListener("message", (event) => {
|
||||
void this.handleMessage(event.data);
|
||||
});
|
||||
this.socket.addEventListener("error", () => {
|
||||
this.emitError(new Error("Terminal websocket connection failed."));
|
||||
});
|
||||
this.socket.addEventListener("close", () => {
|
||||
this.closedResolve();
|
||||
for (const listener of this.closeListeners) {
|
||||
listener();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
onReady(listener: (status: TerminalReadyStatus) => void): () => void {
|
||||
this.readyListeners.add(listener);
|
||||
return () => {
|
||||
this.readyListeners.delete(listener);
|
||||
};
|
||||
}
|
||||
|
||||
onData(listener: (data: Uint8Array) => void): () => void {
|
||||
this.dataListeners.add(listener);
|
||||
return () => {
|
||||
this.dataListeners.delete(listener);
|
||||
};
|
||||
}
|
||||
|
||||
onExit(listener: (status: TerminalExitStatus) => void): () => void {
|
||||
this.exitListeners.add(listener);
|
||||
return () => {
|
||||
this.exitListeners.delete(listener);
|
||||
};
|
||||
}
|
||||
|
||||
onError(listener: (error: TerminalErrorStatus | Error) => void): () => void {
|
||||
this.errorListeners.add(listener);
|
||||
return () => {
|
||||
this.errorListeners.delete(listener);
|
||||
};
|
||||
}
|
||||
|
||||
onClose(listener: () => void): () => void {
|
||||
this.closeListeners.add(listener);
|
||||
return () => {
|
||||
this.closeListeners.delete(listener);
|
||||
};
|
||||
}
|
||||
|
||||
sendInput(data: string | ArrayBuffer | ArrayBufferView): void {
|
||||
this.sendChannel(TerminalChannel.stdin, encodeTerminalBytes(data));
|
||||
}
|
||||
|
||||
resize(payload: TerminalResizePayload): void {
|
||||
this.sendChannel(
|
||||
TerminalChannel.resize,
|
||||
this.textEncoder.encode(JSON.stringify(payload)),
|
||||
);
|
||||
}
|
||||
|
||||
close(): void {
|
||||
if (this.socket.readyState === WebSocket.CONNECTING) {
|
||||
this.socket.addEventListener(
|
||||
"open",
|
||||
() => {
|
||||
this.close();
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.socket.readyState === WebSocket.OPEN) {
|
||||
if (!this.closeSignalSent) {
|
||||
this.closeSignalSent = true;
|
||||
this.sendChannel(TerminalChannel.close, new Uint8Array());
|
||||
}
|
||||
this.socket.close();
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.socket.readyState !== WebSocket.CLOSED) {
|
||||
this.socket.close();
|
||||
}
|
||||
}
|
||||
|
||||
private async handleMessage(data: unknown): Promise<void> {
|
||||
try {
|
||||
const bytes = await decodeTerminalBytes(data);
|
||||
if (bytes.length === 0) {
|
||||
this.emitError(new Error("Received terminal frame without a channel byte."));
|
||||
return;
|
||||
}
|
||||
|
||||
const channel = bytes[0];
|
||||
const payload = bytes.subarray(1);
|
||||
|
||||
if (channel === TerminalChannel.stdout || channel === TerminalChannel.stderr) {
|
||||
for (const listener of this.dataListeners) {
|
||||
listener(payload);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (channel === TerminalChannel.status) {
|
||||
const text = new TextDecoder().decode(payload);
|
||||
const parsed = JSON.parse(text) as unknown;
|
||||
if (!isTerminalStatusMessage(parsed)) {
|
||||
this.emitError(new Error("Received invalid terminal status payload."));
|
||||
return;
|
||||
}
|
||||
|
||||
if (parsed.type === "ready") {
|
||||
for (const listener of this.readyListeners) {
|
||||
listener(parsed);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (parsed.type === "exit") {
|
||||
for (const listener of this.exitListeners) {
|
||||
listener(parsed);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
this.emitError(parsed);
|
||||
return;
|
||||
}
|
||||
|
||||
if (channel === TerminalChannel.close) {
|
||||
if (this.socket.readyState === WebSocket.OPEN) {
|
||||
this.socket.close();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
this.emitError(new Error(`Received unsupported terminal channel ${channel}.`));
|
||||
} catch (error) {
|
||||
this.emitError(error instanceof Error ? error : new Error(String(error)));
|
||||
}
|
||||
}
|
||||
|
||||
private sendChannel(channel: number, payload: Uint8Array): void {
|
||||
if (this.socket.readyState !== WebSocket.OPEN) {
|
||||
return;
|
||||
}
|
||||
|
||||
const frame = new Uint8Array(payload.length + 1);
|
||||
frame[0] = channel;
|
||||
frame.set(payload, 1);
|
||||
this.socket.send(frame);
|
||||
}
|
||||
|
||||
private emitError(error: TerminalErrorStatus | Error): void {
|
||||
for (const listener of this.errorListeners) {
|
||||
listener(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class SandboxAgent {
|
||||
private readonly baseUrl: string;
|
||||
private readonly token?: string;
|
||||
|
|
@ -893,19 +1081,6 @@ export class SandboxAgent {
|
|||
});
|
||||
}
|
||||
|
||||
async resizeProcessTerminal(
|
||||
id: string,
|
||||
request: ProcessTerminalResizeRequest,
|
||||
): Promise<ProcessTerminalResizeResponse> {
|
||||
return this.requestJson(
|
||||
"POST",
|
||||
`${API_PREFIX}/processes/${encodeURIComponent(id)}/terminal/resize`,
|
||||
{
|
||||
body: request,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
buildProcessTerminalWebSocketUrl(
|
||||
id: string,
|
||||
options: ProcessTerminalWebSocketUrlOptions = {},
|
||||
|
|
@ -930,10 +1105,17 @@ export class SandboxAgent {
|
|||
this.buildProcessTerminalWebSocketUrl(id, {
|
||||
accessToken: options.accessToken,
|
||||
}),
|
||||
options.protocols,
|
||||
options.protocols ?? "channel.k8s.io",
|
||||
);
|
||||
}
|
||||
|
||||
connectProcessTerminal(
|
||||
id: string,
|
||||
options: ProcessTerminalSessionOptions = {},
|
||||
): ProcessTerminalSession {
|
||||
return new ProcessTerminalSession(this.connectProcessTerminalWebSocket(id, options));
|
||||
}
|
||||
|
||||
private async getLiveConnection(agent: string): Promise<LiveAcpConnection> {
|
||||
const existing = this.liveConnections.get(agent);
|
||||
if (existing) {
|
||||
|
|
@ -1204,6 +1386,62 @@ type RequestOptions = {
|
|||
signal?: AbortSignal;
|
||||
};
|
||||
|
||||
function isTerminalStatusMessage(value: unknown): value is TerminalStatusMessage {
|
||||
if (!isRecord(value) || typeof value.type !== "string") {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (value.type === "ready") {
|
||||
return typeof value.processId === "string";
|
||||
}
|
||||
|
||||
if (value.type === "exit") {
|
||||
return (
|
||||
value.exitCode === undefined ||
|
||||
value.exitCode === null ||
|
||||
typeof value.exitCode === "number"
|
||||
);
|
||||
}
|
||||
|
||||
if (value.type === "error") {
|
||||
return typeof value.message === "string";
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function encodeTerminalBytes(data: string | ArrayBuffer | ArrayBufferView): Uint8Array {
|
||||
if (typeof data === "string") {
|
||||
return new TextEncoder().encode(data);
|
||||
}
|
||||
|
||||
if (data instanceof ArrayBuffer) {
|
||||
return new Uint8Array(data);
|
||||
}
|
||||
|
||||
return new Uint8Array(data.buffer, data.byteOffset, data.byteLength).slice();
|
||||
}
|
||||
|
||||
async function decodeTerminalBytes(data: unknown): Promise<Uint8Array> {
|
||||
if (data instanceof ArrayBuffer) {
|
||||
return new Uint8Array(data);
|
||||
}
|
||||
|
||||
if (ArrayBuffer.isView(data)) {
|
||||
return new Uint8Array(data.buffer, data.byteOffset, data.byteLength).slice();
|
||||
}
|
||||
|
||||
if (typeof Blob !== "undefined" && data instanceof Blob) {
|
||||
return new Uint8Array(await data.arrayBuffer());
|
||||
}
|
||||
|
||||
if (typeof data === "string") {
|
||||
throw new Error("Received text terminal frame; expected channel.k8s.io binary data.");
|
||||
}
|
||||
|
||||
throw new Error(`Unsupported terminal frame payload: ${String(data)}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Auto-select and call `authenticate` based on the agent's advertised auth methods.
|
||||
* Prefers env-var-based methods that the server process already has configured.
|
||||
|
|
|
|||
|
|
@ -58,36 +58,98 @@ export interface paths {
|
|||
get: operations["get_v1_health"];
|
||||
};
|
||||
"/v1/processes": {
|
||||
/**
|
||||
* List all managed processes.
|
||||
* @description Returns a list of all processes (running and exited) currently tracked
|
||||
* by the runtime, sorted by process ID.
|
||||
*/
|
||||
get: operations["get_v1_processes"];
|
||||
/**
|
||||
* Create a long-lived managed process.
|
||||
* @description Spawns a new process with the given command and arguments. Supports both
|
||||
* pipe-based and PTY (tty) modes. Returns the process descriptor on success.
|
||||
*/
|
||||
post: operations["post_v1_processes"];
|
||||
};
|
||||
"/v1/processes/config": {
|
||||
/**
|
||||
* Get process runtime configuration.
|
||||
* @description Returns the current runtime configuration for the process management API,
|
||||
* including limits for concurrency, timeouts, and buffer sizes.
|
||||
*/
|
||||
get: operations["get_v1_processes_config"];
|
||||
/**
|
||||
* Update process runtime configuration.
|
||||
* @description Replaces the runtime configuration for the process management API.
|
||||
* Validates that all values are non-zero and clamps default timeout to max.
|
||||
*/
|
||||
post: operations["post_v1_processes_config"];
|
||||
};
|
||||
"/v1/processes/run": {
|
||||
/**
|
||||
* Run a one-shot command.
|
||||
* @description Executes a command to completion and returns its stdout, stderr, exit code,
|
||||
* and duration. Supports configurable timeout and output size limits.
|
||||
*/
|
||||
post: operations["post_v1_processes_run"];
|
||||
};
|
||||
"/v1/processes/{id}": {
|
||||
/**
|
||||
* Get a single process by ID.
|
||||
* @description Returns the current state of a managed process including its status,
|
||||
* PID, exit code, and creation/exit timestamps.
|
||||
*/
|
||||
get: operations["get_v1_process"];
|
||||
/**
|
||||
* Delete a process record.
|
||||
* @description Removes a stopped process from the runtime. Returns 409 if the process
|
||||
* is still running; stop or kill it first.
|
||||
*/
|
||||
delete: operations["delete_v1_process"];
|
||||
};
|
||||
"/v1/processes/{id}/input": {
|
||||
/**
|
||||
* Write input to a process.
|
||||
* @description Sends data to a process's stdin (pipe mode) or PTY writer (tty mode).
|
||||
* Data can be encoded as base64, utf8, or text. Returns 413 if the decoded
|
||||
* payload exceeds the configured `maxInputBytesPerRequest` limit.
|
||||
*/
|
||||
post: operations["post_v1_process_input"];
|
||||
};
|
||||
"/v1/processes/{id}/kill": {
|
||||
/**
|
||||
* Send SIGKILL to a process.
|
||||
* @description Sends SIGKILL to the process and optionally waits up to `waitMs`
|
||||
* milliseconds for the process to exit before returning.
|
||||
*/
|
||||
post: operations["post_v1_process_kill"];
|
||||
};
|
||||
"/v1/processes/{id}/logs": {
|
||||
/**
|
||||
* Fetch process logs.
|
||||
* @description Returns buffered log entries for a process. Supports filtering by stream
|
||||
* type, tail count, and sequence-based resumption. When `follow=true`,
|
||||
* returns an SSE stream that replays buffered entries then streams live output.
|
||||
*/
|
||||
get: operations["get_v1_process_logs"];
|
||||
};
|
||||
"/v1/processes/{id}/stop": {
|
||||
/**
|
||||
* Send SIGTERM to a process.
|
||||
* @description Sends SIGTERM to the process and optionally waits up to `waitMs`
|
||||
* milliseconds for the process to exit before returning.
|
||||
*/
|
||||
post: operations["post_v1_process_stop"];
|
||||
};
|
||||
"/v1/processes/{id}/terminal/resize": {
|
||||
post: operations["post_v1_process_terminal_resize"];
|
||||
};
|
||||
"/v1/processes/{id}/terminal/ws": {
|
||||
/**
|
||||
* Open an interactive WebSocket terminal session.
|
||||
* @description Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts
|
||||
* `access_token` query param for browser-based auth (WebSocket API cannot
|
||||
* send custom headers). Uses the `channel.k8s.io` binary subprotocol:
|
||||
* channel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize,
|
||||
* and channel 255 close.
|
||||
*/
|
||||
get: operations["get_v1_process_terminal_ws"];
|
||||
};
|
||||
}
|
||||
|
|
@ -166,7 +228,7 @@ export interface components {
|
|||
agents: components["schemas"]["AgentInfo"][];
|
||||
};
|
||||
/** @enum {string} */
|
||||
ErrorType: "invalid_request" | "conflict" | "unsupported_agent" | "agent_not_installed" | "install_failed" | "agent_process_exited" | "token_invalid" | "permission_denied" | "not_acceptable" | "unsupported_media_type" | "session_not_found" | "session_already_exists" | "mode_not_supported" | "stream_error" | "timeout";
|
||||
ErrorType: "invalid_request" | "conflict" | "unsupported_agent" | "agent_not_installed" | "install_failed" | "agent_process_exited" | "token_invalid" | "permission_denied" | "not_acceptable" | "unsupported_media_type" | "not_found" | "session_not_found" | "session_already_exists" | "mode_not_supported" | "stream_error" | "timeout";
|
||||
FsActionResponse: {
|
||||
path: string;
|
||||
};
|
||||
|
|
@ -361,18 +423,6 @@ export interface components {
|
|||
};
|
||||
/** @enum {string} */
|
||||
ProcessState: "running" | "exited";
|
||||
ProcessTerminalResizeRequest: {
|
||||
/** Format: int32 */
|
||||
cols: number;
|
||||
/** Format: int32 */
|
||||
rows: number;
|
||||
};
|
||||
ProcessTerminalResizeResponse: {
|
||||
/** Format: int32 */
|
||||
cols: number;
|
||||
/** Format: int32 */
|
||||
rows: number;
|
||||
};
|
||||
/** @enum {string} */
|
||||
ServerStatus: "running" | "stopped";
|
||||
ServerStatusInfo: {
|
||||
|
|
@ -891,6 +941,11 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* List all managed processes.
|
||||
* @description Returns a list of all processes (running and exited) currently tracked
|
||||
* by the runtime, sorted by process ID.
|
||||
*/
|
||||
get_v1_processes: {
|
||||
responses: {
|
||||
/** @description List processes */
|
||||
|
|
@ -907,6 +962,11 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* Create a long-lived managed process.
|
||||
* @description Spawns a new process with the given command and arguments. Supports both
|
||||
* pipe-based and PTY (tty) modes. Returns the process descriptor on success.
|
||||
*/
|
||||
post_v1_processes: {
|
||||
requestBody: {
|
||||
content: {
|
||||
|
|
@ -940,6 +1000,11 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* Get process runtime configuration.
|
||||
* @description Returns the current runtime configuration for the process management API,
|
||||
* including limits for concurrency, timeouts, and buffer sizes.
|
||||
*/
|
||||
get_v1_processes_config: {
|
||||
responses: {
|
||||
/** @description Current runtime process config */
|
||||
|
|
@ -956,6 +1021,11 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* Update process runtime configuration.
|
||||
* @description Replaces the runtime configuration for the process management API.
|
||||
* Validates that all values are non-zero and clamps default timeout to max.
|
||||
*/
|
||||
post_v1_processes_config: {
|
||||
requestBody: {
|
||||
content: {
|
||||
|
|
@ -983,6 +1053,11 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* Run a one-shot command.
|
||||
* @description Executes a command to completion and returns its stdout, stderr, exit code,
|
||||
* and duration. Supports configurable timeout and output size limits.
|
||||
*/
|
||||
post_v1_processes_run: {
|
||||
requestBody: {
|
||||
content: {
|
||||
|
|
@ -1010,6 +1085,11 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* Get a single process by ID.
|
||||
* @description Returns the current state of a managed process including its status,
|
||||
* PID, exit code, and creation/exit timestamps.
|
||||
*/
|
||||
get_v1_process: {
|
||||
parameters: {
|
||||
path: {
|
||||
|
|
@ -1038,6 +1118,11 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* Delete a process record.
|
||||
* @description Removes a stopped process from the runtime. Returns 409 if the process
|
||||
* is still running; stop or kill it first.
|
||||
*/
|
||||
delete_v1_process: {
|
||||
parameters: {
|
||||
path: {
|
||||
|
|
@ -1070,6 +1155,12 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* Write input to a process.
|
||||
* @description Sends data to a process's stdin (pipe mode) or PTY writer (tty mode).
|
||||
* Data can be encoded as base64, utf8, or text. Returns 413 if the decoded
|
||||
* payload exceeds the configured `maxInputBytesPerRequest` limit.
|
||||
*/
|
||||
post_v1_process_input: {
|
||||
parameters: {
|
||||
path: {
|
||||
|
|
@ -1115,6 +1206,11 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* Send SIGKILL to a process.
|
||||
* @description Sends SIGKILL to the process and optionally waits up to `waitMs`
|
||||
* milliseconds for the process to exit before returning.
|
||||
*/
|
||||
post_v1_process_kill: {
|
||||
parameters: {
|
||||
query?: {
|
||||
|
|
@ -1147,6 +1243,12 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* Fetch process logs.
|
||||
* @description Returns buffered log entries for a process. Supports filtering by stream
|
||||
* type, tail count, and sequence-based resumption. When `follow=true`,
|
||||
* returns an SSE stream that replays buffered entries then streams live output.
|
||||
*/
|
||||
get_v1_process_logs: {
|
||||
parameters: {
|
||||
query?: {
|
||||
|
|
@ -1185,6 +1287,11 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* Send SIGTERM to a process.
|
||||
* @description Sends SIGTERM to the process and optionally waits up to `waitMs`
|
||||
* milliseconds for the process to exit before returning.
|
||||
*/
|
||||
post_v1_process_stop: {
|
||||
parameters: {
|
||||
query?: {
|
||||
|
|
@ -1217,51 +1324,14 @@ export interface operations {
|
|||
};
|
||||
};
|
||||
};
|
||||
post_v1_process_terminal_resize: {
|
||||
parameters: {
|
||||
path: {
|
||||
/** @description Process ID */
|
||||
id: string;
|
||||
};
|
||||
};
|
||||
requestBody: {
|
||||
content: {
|
||||
"application/json": components["schemas"]["ProcessTerminalResizeRequest"];
|
||||
};
|
||||
};
|
||||
responses: {
|
||||
/** @description Resize accepted */
|
||||
200: {
|
||||
content: {
|
||||
"application/json": components["schemas"]["ProcessTerminalResizeResponse"];
|
||||
};
|
||||
};
|
||||
/** @description Invalid request */
|
||||
400: {
|
||||
content: {
|
||||
"application/json": components["schemas"]["ProblemDetails"];
|
||||
};
|
||||
};
|
||||
/** @description Unknown process */
|
||||
404: {
|
||||
content: {
|
||||
"application/json": components["schemas"]["ProblemDetails"];
|
||||
};
|
||||
};
|
||||
/** @description Not a terminal process */
|
||||
409: {
|
||||
content: {
|
||||
"application/json": components["schemas"]["ProblemDetails"];
|
||||
};
|
||||
};
|
||||
/** @description Process API unsupported on this platform */
|
||||
501: {
|
||||
content: {
|
||||
"application/json": components["schemas"]["ProblemDetails"];
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
/**
|
||||
* Open an interactive WebSocket terminal session.
|
||||
* @description Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts
|
||||
* `access_token` query param for browser-based auth (WebSocket API cannot
|
||||
* send custom headers). Uses the `channel.k8s.io` binary subprotocol:
|
||||
* channel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize,
|
||||
* and channel 255 close.
|
||||
*/
|
||||
get_v1_process_terminal_ws: {
|
||||
parameters: {
|
||||
query?: {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
export {
|
||||
LiveAcpConnection,
|
||||
ProcessTerminalSession,
|
||||
SandboxAgent,
|
||||
SandboxAgentError,
|
||||
Session,
|
||||
|
|
@ -15,6 +16,7 @@ export type {
|
|||
ProcessLogListener,
|
||||
ProcessLogSubscription,
|
||||
ProcessTerminalConnectOptions,
|
||||
ProcessTerminalSessionOptions,
|
||||
ProcessTerminalWebSocketUrlOptions,
|
||||
SandboxAgentConnectOptions,
|
||||
SandboxAgentStartOptions,
|
||||
|
|
@ -28,6 +30,7 @@ export type { InspectorUrlOptions } from "./inspector.ts";
|
|||
|
||||
export {
|
||||
InMemorySessionPersistDriver,
|
||||
TerminalChannel,
|
||||
} from "./types.ts";
|
||||
|
||||
export type {
|
||||
|
|
@ -72,18 +75,16 @@ export type {
|
|||
ProcessRunResponse,
|
||||
ProcessSignalQuery,
|
||||
ProcessState,
|
||||
ProcessTerminalClientFrame,
|
||||
ProcessTerminalErrorFrame,
|
||||
ProcessTerminalExitFrame,
|
||||
ProcessTerminalReadyFrame,
|
||||
ProcessTerminalResizeRequest,
|
||||
ProcessTerminalResizeResponse,
|
||||
ProcessTerminalServerFrame,
|
||||
SessionEvent,
|
||||
SessionPersistDriver,
|
||||
SessionRecord,
|
||||
SkillsConfig,
|
||||
SkillsConfigQuery,
|
||||
TerminalErrorStatus,
|
||||
TerminalExitStatus,
|
||||
TerminalReadyStatus,
|
||||
TerminalResizePayload,
|
||||
TerminalStatusMessage,
|
||||
} from "./types.ts";
|
||||
|
||||
export type {
|
||||
|
|
|
|||
|
|
@ -46,43 +46,40 @@ export type ProcessRunRequest = JsonRequestBody<operations["post_v1_processes_ru
|
|||
export type ProcessRunResponse = JsonResponse<operations["post_v1_processes_run"], 200>;
|
||||
export type ProcessSignalQuery = QueryParams<operations["post_v1_process_stop"]>;
|
||||
export type ProcessState = components["schemas"]["ProcessState"];
|
||||
export type ProcessTerminalResizeRequest = JsonRequestBody<operations["post_v1_process_terminal_resize"]>;
|
||||
export type ProcessTerminalResizeResponse = JsonResponse<operations["post_v1_process_terminal_resize"], 200>;
|
||||
|
||||
export type ProcessTerminalClientFrame =
|
||||
| {
|
||||
type: "input";
|
||||
data: string;
|
||||
encoding?: string;
|
||||
}
|
||||
| {
|
||||
type: "resize";
|
||||
cols: number;
|
||||
rows: number;
|
||||
}
|
||||
| {
|
||||
type: "close";
|
||||
};
|
||||
export const TerminalChannel = {
|
||||
stdin: 0,
|
||||
stdout: 1,
|
||||
stderr: 2,
|
||||
status: 3,
|
||||
resize: 4,
|
||||
close: 255,
|
||||
} as const;
|
||||
|
||||
export interface ProcessTerminalReadyFrame {
|
||||
export interface TerminalReadyStatus {
|
||||
type: "ready";
|
||||
processId: string;
|
||||
}
|
||||
|
||||
export interface ProcessTerminalExitFrame {
|
||||
export interface TerminalExitStatus {
|
||||
type: "exit";
|
||||
exitCode?: number | null;
|
||||
}
|
||||
|
||||
export interface ProcessTerminalErrorFrame {
|
||||
export interface TerminalErrorStatus {
|
||||
type: "error";
|
||||
message: string;
|
||||
}
|
||||
|
||||
export type ProcessTerminalServerFrame =
|
||||
| ProcessTerminalReadyFrame
|
||||
| ProcessTerminalExitFrame
|
||||
| ProcessTerminalErrorFrame;
|
||||
export type TerminalStatusMessage =
|
||||
| TerminalReadyStatus
|
||||
| TerminalExitStatus
|
||||
| TerminalErrorStatus;
|
||||
|
||||
export interface TerminalResizePayload {
|
||||
cols: number;
|
||||
rows: number;
|
||||
}
|
||||
|
||||
export interface SessionRecord {
|
||||
id: string;
|
||||
|
|
|
|||
|
|
@ -136,22 +136,6 @@ function writeTarChecksum(buffer: Buffer, checksum: number): void {
|
|||
buffer[155] = 0x20;
|
||||
}
|
||||
|
||||
function decodeSocketPayload(data: unknown): string {
|
||||
if (typeof data === "string") {
|
||||
return data;
|
||||
}
|
||||
if (data instanceof ArrayBuffer) {
|
||||
return Buffer.from(data).toString("utf8");
|
||||
}
|
||||
if (ArrayBuffer.isView(data)) {
|
||||
return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString("utf8");
|
||||
}
|
||||
if (typeof Blob !== "undefined" && data instanceof Blob) {
|
||||
throw new Error("Blob socket payloads are not supported in this test");
|
||||
}
|
||||
throw new Error(`Unsupported socket payload type: ${typeof data}`);
|
||||
}
|
||||
|
||||
function decodeProcessLogData(data: string, encoding: string): string {
|
||||
if (encoding === "base64") {
|
||||
return Buffer.from(data, "base64").toString("utf8");
|
||||
|
|
@ -582,47 +566,53 @@ describe("Integration: TypeScript SDK flat session API", () => {
|
|||
});
|
||||
ttyProcessId = ttyProcess.id;
|
||||
|
||||
const resized = await sdk.resizeProcessTerminal(ttyProcess.id, {
|
||||
cols: 120,
|
||||
rows: 40,
|
||||
});
|
||||
expect(resized.cols).toBe(120);
|
||||
expect(resized.rows).toBe(40);
|
||||
|
||||
const wsUrl = sdk.buildProcessTerminalWebSocketUrl(ttyProcess.id);
|
||||
expect(wsUrl.startsWith("ws://") || wsUrl.startsWith("wss://")).toBe(true);
|
||||
|
||||
const ws = sdk.connectProcessTerminalWebSocket(ttyProcess.id, {
|
||||
const session = sdk.connectProcessTerminal(ttyProcess.id, {
|
||||
WebSocket: WebSocket as unknown as typeof globalThis.WebSocket,
|
||||
});
|
||||
ws.binaryType = "arraybuffer";
|
||||
const readyFrames: string[] = [];
|
||||
const ttyOutput: string[] = [];
|
||||
const exitFrames: Array<number | null | undefined> = [];
|
||||
const terminalErrors: string[] = [];
|
||||
let closeCount = 0;
|
||||
|
||||
const socketTextFrames: string[] = [];
|
||||
const socketBinaryFrames: string[] = [];
|
||||
ws.addEventListener("message", (event) => {
|
||||
if (typeof event.data === "string") {
|
||||
socketTextFrames.push(event.data);
|
||||
return;
|
||||
}
|
||||
socketBinaryFrames.push(decodeSocketPayload(event.data));
|
||||
session.onReady((status) => {
|
||||
readyFrames.push(status.processId);
|
||||
});
|
||||
session.onData((bytes) => {
|
||||
ttyOutput.push(Buffer.from(bytes).toString("utf8"));
|
||||
});
|
||||
session.onExit((status) => {
|
||||
exitFrames.push(status.exitCode);
|
||||
});
|
||||
session.onError((error) => {
|
||||
terminalErrors.push(error instanceof Error ? error.message : error.message);
|
||||
});
|
||||
session.onClose(() => {
|
||||
closeCount += 1;
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
const ready = socketTextFrames.find((frame) => frame.includes('"type":"ready"'));
|
||||
return ready;
|
||||
await waitFor(() => readyFrames[0]);
|
||||
|
||||
session.resize({
|
||||
cols: 120,
|
||||
rows: 40,
|
||||
});
|
||||
|
||||
ws.send(JSON.stringify({
|
||||
type: "input",
|
||||
data: "hello tty\n",
|
||||
}));
|
||||
session.sendInput("hello tty\n");
|
||||
|
||||
await waitFor(() => {
|
||||
const joined = socketBinaryFrames.join("");
|
||||
const joined = ttyOutput.join("");
|
||||
return joined.includes("hello tty") ? joined : undefined;
|
||||
});
|
||||
|
||||
ws.close();
|
||||
session.close();
|
||||
await session.closed;
|
||||
expect(closeCount).toBeGreaterThan(0);
|
||||
expect(exitFrames).toHaveLength(0);
|
||||
expect(terminalErrors).toEqual([]);
|
||||
|
||||
await waitForAsync(async () => {
|
||||
const processInfo = await sdk.getProcess(ttyProcess.id);
|
||||
return processInfo.status === "running" ? processInfo : undefined;
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ use base64::Engine;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::process::{Child, ChildStdin, Command};
|
||||
use tokio::sync::{broadcast, Mutex, RwLock};
|
||||
use tokio::sync::{broadcast, Mutex, RwLock, Semaphore};
|
||||
|
||||
use sandbox_agent_error::SandboxError;
|
||||
|
||||
|
|
@ -119,6 +119,7 @@ pub struct ProcessRuntime {
|
|||
struct ProcessRuntimeInner {
|
||||
next_id: AtomicU64,
|
||||
processes: RwLock<HashMap<String, Arc<ManagedProcess>>>,
|
||||
run_once_semaphore: Semaphore,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
@ -182,6 +183,9 @@ impl ProcessRuntime {
|
|||
inner: Arc::new(ProcessRuntimeInner {
|
||||
next_id: AtomicU64::new(1),
|
||||
processes: RwLock::new(HashMap::new()),
|
||||
run_once_semaphore: Semaphore::new(
|
||||
ProcessRuntimeConfig::default().max_concurrent_processes,
|
||||
),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
|
@ -324,6 +328,14 @@ impl ProcessRuntime {
|
|||
});
|
||||
}
|
||||
|
||||
let _permit =
|
||||
self.inner
|
||||
.run_once_semaphore
|
||||
.try_acquire()
|
||||
.map_err(|_| SandboxError::Conflict {
|
||||
message: "too many concurrent run_once operations".to_string(),
|
||||
})?;
|
||||
|
||||
let config = self.get_config().await;
|
||||
let mut timeout_ms = spec.timeout_ms.unwrap_or(config.default_run_timeout_ms);
|
||||
if timeout_ms == 0 {
|
||||
|
|
@ -331,7 +343,10 @@ impl ProcessRuntime {
|
|||
}
|
||||
timeout_ms = timeout_ms.min(config.max_run_timeout_ms);
|
||||
|
||||
let max_output_bytes = spec.max_output_bytes.unwrap_or(config.max_output_bytes);
|
||||
let max_output_bytes = spec
|
||||
.max_output_bytes
|
||||
.unwrap_or(config.max_output_bytes)
|
||||
.min(config.max_output_bytes);
|
||||
|
||||
let mut cmd = Command::new(&spec.command);
|
||||
cmd.args(&spec.args)
|
||||
|
|
|
|||
|
|
@ -50,6 +50,12 @@ pub use self::types::*;
|
|||
|
||||
const APPLICATION_JSON: &str = "application/json";
|
||||
const TEXT_EVENT_STREAM: &str = "text/event-stream";
|
||||
const CHANNEL_K8S_IO_PROTOCOL: &str = "channel.k8s.io";
|
||||
const CH_STDIN: u8 = 0;
|
||||
const CH_STDOUT: u8 = 1;
|
||||
const CH_STATUS: u8 = 3;
|
||||
const CH_RESIZE: u8 = 4;
|
||||
const CH_CLOSE: u8 = 255;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub enum BrandingMode {
|
||||
|
|
@ -196,10 +202,6 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
|
|||
.route("/processes/:id/kill", post(post_v1_process_kill))
|
||||
.route("/processes/:id/logs", get(get_v1_process_logs))
|
||||
.route("/processes/:id/input", post(post_v1_process_input))
|
||||
.route(
|
||||
"/processes/:id/terminal/resize",
|
||||
post(post_v1_process_terminal_resize),
|
||||
)
|
||||
.route(
|
||||
"/processes/:id/terminal/ws",
|
||||
get(get_v1_process_terminal_ws),
|
||||
|
|
@ -344,7 +346,6 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
|
|||
delete_v1_process,
|
||||
get_v1_process_logs,
|
||||
post_v1_process_input,
|
||||
post_v1_process_terminal_resize,
|
||||
get_v1_process_terminal_ws,
|
||||
get_v1_config_mcp,
|
||||
put_v1_config_mcp,
|
||||
|
|
@ -394,8 +395,6 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
|
|||
ProcessInputRequest,
|
||||
ProcessInputResponse,
|
||||
ProcessSignalQuery,
|
||||
ProcessTerminalResizeRequest,
|
||||
ProcessTerminalResizeResponse,
|
||||
AcpPostQuery,
|
||||
AcpServerInfo,
|
||||
AcpServerListResponse,
|
||||
|
|
@ -1602,51 +1601,13 @@ async fn post_v1_process_input(
|
|||
Ok(Json(ProcessInputResponse { bytes_written }))
|
||||
}
|
||||
|
||||
/// Resize a process terminal.
|
||||
///
|
||||
/// Sets the PTY window size (columns and rows) for a tty-mode process and
|
||||
/// sends SIGWINCH so the child process can adapt.
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/v1/processes/{id}/terminal/resize",
|
||||
tag = "v1",
|
||||
params(
|
||||
("id" = String, Path, description = "Process ID")
|
||||
),
|
||||
request_body = ProcessTerminalResizeRequest,
|
||||
responses(
|
||||
(status = 200, description = "Resize accepted", body = ProcessTerminalResizeResponse),
|
||||
(status = 400, description = "Invalid request", body = ProblemDetails),
|
||||
(status = 404, description = "Unknown process", body = ProblemDetails),
|
||||
(status = 409, description = "Not a terminal process", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn post_v1_process_terminal_resize(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
Json(body): Json<ProcessTerminalResizeRequest>,
|
||||
) -> Result<Json<ProcessTerminalResizeResponse>, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
state
|
||||
.process_runtime()
|
||||
.resize_terminal(&id, body.cols, body.rows)
|
||||
.await?;
|
||||
Ok(Json(ProcessTerminalResizeResponse {
|
||||
cols: body.cols,
|
||||
rows: body.rows,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Open an interactive WebSocket terminal session.
|
||||
///
|
||||
/// Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts
|
||||
/// `access_token` query param for browser-based auth (WebSocket API cannot
|
||||
/// send custom headers). Streams raw PTY output as binary frames and accepts
|
||||
/// JSON control frames for input, resize, and close.
|
||||
/// send custom headers). Uses the `channel.k8s.io` binary subprotocol:
|
||||
/// channel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize,
|
||||
/// and channel 255 close.
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/processes/{id}/terminal/ws",
|
||||
|
|
@ -1682,23 +1643,16 @@ async fn get_v1_process_terminal_ws(
|
|||
}
|
||||
|
||||
Ok(ws
|
||||
.protocols([CHANNEL_K8S_IO_PROTOCOL])
|
||||
.on_upgrade(move |socket| process_terminal_ws_session(socket, runtime, id))
|
||||
.into_response())
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(tag = "type", rename_all = "camelCase")]
|
||||
enum TerminalClientFrame {
|
||||
Input {
|
||||
data: String,
|
||||
#[serde(default)]
|
||||
encoding: Option<String>,
|
||||
},
|
||||
Resize {
|
||||
cols: u16,
|
||||
rows: u16,
|
||||
},
|
||||
Close,
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct TerminalResizePayload {
|
||||
cols: u16,
|
||||
rows: u16,
|
||||
}
|
||||
|
||||
async fn process_terminal_ws_session(
|
||||
|
|
@ -1706,7 +1660,7 @@ async fn process_terminal_ws_session(
|
|||
runtime: Arc<ProcessRuntime>,
|
||||
id: String,
|
||||
) {
|
||||
let _ = send_ws_json(
|
||||
let _ = send_status_json(
|
||||
&mut socket,
|
||||
json!({
|
||||
"type": "ready",
|
||||
|
|
@ -1718,7 +1672,8 @@ async fn process_terminal_ws_session(
|
|||
let mut log_rx = match runtime.subscribe_logs(&id).await {
|
||||
Ok(rx) => rx,
|
||||
Err(err) => {
|
||||
let _ = send_ws_error(&mut socket, &err.to_string()).await;
|
||||
let _ = send_status_error(&mut socket, &err.to_string()).await;
|
||||
let _ = send_close_signal(&mut socket).await;
|
||||
let _ = socket.close().await;
|
||||
return;
|
||||
}
|
||||
|
|
@ -1729,43 +1684,57 @@ async fn process_terminal_ws_session(
|
|||
tokio::select! {
|
||||
ws_in = socket.recv() => {
|
||||
match ws_in {
|
||||
Some(Ok(Message::Binary(_))) => {
|
||||
let _ = send_ws_error(&mut socket, "binary input is not supported; use text JSON frames").await;
|
||||
}
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
let parsed = serde_json::from_str::<TerminalClientFrame>(&text);
|
||||
match parsed {
|
||||
Ok(TerminalClientFrame::Input { data, encoding }) => {
|
||||
let input = match decode_input_bytes(&data, encoding.as_deref().unwrap_or("utf8")) {
|
||||
Ok(input) => input,
|
||||
Err(err) => {
|
||||
let _ = send_ws_error(&mut socket, &err.to_string()).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
Some(Ok(Message::Binary(bytes))) => {
|
||||
let Some((&channel, payload)) = bytes.split_first() else {
|
||||
let _ = send_status_error(&mut socket, "invalid terminal frame: missing channel byte").await;
|
||||
continue;
|
||||
};
|
||||
|
||||
match channel {
|
||||
CH_STDIN => {
|
||||
let input = payload.to_vec();
|
||||
let max_input = runtime.max_input_bytes().await;
|
||||
if input.len() > max_input {
|
||||
let _ = send_ws_error(&mut socket, &format!("input payload exceeds maxInputBytesPerRequest ({max_input})")).await;
|
||||
let _ = send_status_error(&mut socket, &format!("input payload exceeds maxInputBytesPerRequest ({max_input})")).await;
|
||||
continue;
|
||||
}
|
||||
if let Err(err) = runtime.write_input(&id, &input).await {
|
||||
let _ = send_ws_error(&mut socket, &err.to_string()).await;
|
||||
let _ = send_status_error(&mut socket, &err.to_string()).await;
|
||||
}
|
||||
}
|
||||
Ok(TerminalClientFrame::Resize { cols, rows }) => {
|
||||
if let Err(err) = runtime.resize_terminal(&id, cols, rows).await {
|
||||
let _ = send_ws_error(&mut socket, &err.to_string()).await;
|
||||
CH_RESIZE => {
|
||||
let resize = match serde_json::from_slice::<TerminalResizePayload>(payload) {
|
||||
Ok(resize) => resize,
|
||||
Err(err) => {
|
||||
let _ = send_status_error(&mut socket, &format!("invalid resize payload: {err}")).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = runtime
|
||||
.resize_terminal(&id, resize.cols, resize.rows)
|
||||
.await
|
||||
{
|
||||
let _ = send_status_error(&mut socket, &err.to_string()).await;
|
||||
}
|
||||
}
|
||||
Ok(TerminalClientFrame::Close) => {
|
||||
CH_CLOSE => {
|
||||
let _ = send_close_signal(&mut socket).await;
|
||||
let _ = socket.close().await;
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = send_ws_error(&mut socket, &format!("invalid terminal frame: {err}")).await;
|
||||
_ => {
|
||||
let _ = send_status_error(&mut socket, &format!("unsupported terminal channel: {channel}")).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Ok(Message::Text(_))) => {
|
||||
let _ = send_status_error(
|
||||
&mut socket,
|
||||
"text frames are not supported; use channel.k8s.io binary frames",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Some(Ok(Message::Ping(payload))) => {
|
||||
let _ = socket.send(Message::Pong(payload)).await;
|
||||
}
|
||||
|
|
@ -1785,7 +1754,7 @@ async fn process_terminal_ws_session(
|
|||
use base64::Engine;
|
||||
BASE64_ENGINE.decode(&line.data).unwrap_or_default()
|
||||
};
|
||||
if socket.send(Message::Binary(bytes)).await.is_err() {
|
||||
if send_channel_frame(&mut socket, CH_STDOUT, bytes).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -1796,7 +1765,7 @@ async fn process_terminal_ws_session(
|
|||
_ = exit_poll.tick() => {
|
||||
if let Ok(snapshot) = runtime.snapshot(&id).await {
|
||||
if snapshot.status == ProcessStatus::Exited {
|
||||
let _ = send_ws_json(
|
||||
let _ = send_status_json(
|
||||
&mut socket,
|
||||
json!({
|
||||
"type": "exit",
|
||||
|
|
@ -1804,6 +1773,7 @@ async fn process_terminal_ws_session(
|
|||
}),
|
||||
)
|
||||
.await;
|
||||
let _ = send_close_signal(&mut socket).await;
|
||||
let _ = socket.close().await;
|
||||
break;
|
||||
}
|
||||
|
|
@ -1813,17 +1783,30 @@ async fn process_terminal_ws_session(
|
|||
}
|
||||
}
|
||||
|
||||
async fn send_ws_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {
|
||||
async fn send_channel_frame(
|
||||
socket: &mut WebSocket,
|
||||
channel: u8,
|
||||
payload: impl Into<Vec<u8>>,
|
||||
) -> Result<(), ()> {
|
||||
let mut frame = vec![channel];
|
||||
frame.extend(payload.into());
|
||||
socket
|
||||
.send(Message::Text(
|
||||
serde_json::to_string(&payload).map_err(|_| ())?,
|
||||
))
|
||||
.send(Message::Binary(frame.into()))
|
||||
.await
|
||||
.map_err(|_| ())
|
||||
}
|
||||
|
||||
async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> {
|
||||
send_ws_json(
|
||||
async fn send_status_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {
|
||||
send_channel_frame(
|
||||
socket,
|
||||
CH_STATUS,
|
||||
serde_json::to_vec(&payload).map_err(|_| ())?,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn send_status_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> {
|
||||
send_status_json(
|
||||
socket,
|
||||
json!({
|
||||
"type": "error",
|
||||
|
|
@ -1833,6 +1816,10 @@ async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()>
|
|||
.await
|
||||
}
|
||||
|
||||
async fn send_close_signal(socket: &mut WebSocket) -> Result<(), ()> {
|
||||
send_channel_frame(socket, CH_CLOSE, Vec::<u8>::new()).await
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/config/mcp",
|
||||
|
|
|
|||
|
|
@ -512,20 +512,6 @@ pub struct ProcessSignalQuery {
|
|||
pub wait_ms: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessTerminalResizeRequest {
|
||||
pub cols: u16,
|
||||
pub rows: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessTerminalResizeResponse {
|
||||
pub cols: u16,
|
||||
pub rows: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessWsQuery {
|
||||
|
|
|
|||
|
|
@ -3,8 +3,17 @@ use base64::engine::general_purpose::STANDARD as BASE64;
|
|||
use base64::Engine;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
|
||||
use tokio_tungstenite::tungstenite::http::HeaderValue;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
const CHANNEL_K8S_IO_PROTOCOL: &str = "channel.k8s.io";
|
||||
const CH_STDIN: u8 = 0;
|
||||
const CH_STDOUT: u8 = 1;
|
||||
const CH_STATUS: u8 = 3;
|
||||
const CH_RESIZE: u8 = 4;
|
||||
const CH_CLOSE: u8 = 255;
|
||||
|
||||
async fn wait_for_exited(test_app: &TestApp, process_id: &str) {
|
||||
for _ in 0..30 {
|
||||
let (status, _, body) = send_request(
|
||||
|
|
@ -48,6 +57,19 @@ async fn recv_ws_message(
|
|||
.expect("websocket frame")
|
||||
}
|
||||
|
||||
fn make_channel_frame(channel: u8, payload: impl AsRef<[u8]>) -> Vec<u8> {
|
||||
let payload = payload.as_ref();
|
||||
let mut frame = Vec::with_capacity(payload.len() + 1);
|
||||
frame.push(channel);
|
||||
frame.extend_from_slice(payload);
|
||||
frame
|
||||
}
|
||||
|
||||
fn parse_channel_frame(bytes: &[u8]) -> (u8, &[u8]) {
|
||||
let (&channel, payload) = bytes.split_first().expect("channel frame");
|
||||
(channel, payload)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_processes_config_round_trip() {
|
||||
let test_app = TestApp::new(AuthConfig::disabled());
|
||||
|
|
@ -519,59 +541,84 @@ async fn v1_process_terminal_ws_e2e_is_deterministic() {
|
|||
.expect("create process response");
|
||||
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
|
||||
let create_body: Value = create_response.json().await.expect("create process json");
|
||||
let process_id = create_body["id"]
|
||||
.as_str()
|
||||
.expect("process id")
|
||||
.to_string();
|
||||
let process_id = create_body["id"].as_str().expect("process id").to_string();
|
||||
|
||||
let ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
|
||||
let (mut ws, _) = connect_async(&ws_url)
|
||||
.await
|
||||
.expect("connect websocket");
|
||||
let mut ws_request = ws_url.into_client_request().expect("ws request");
|
||||
ws_request.headers_mut().insert(
|
||||
"Sec-WebSocket-Protocol",
|
||||
HeaderValue::from_static(CHANNEL_K8S_IO_PROTOCOL),
|
||||
);
|
||||
let (mut ws, response) = connect_async(ws_request).await.expect("connect websocket");
|
||||
assert_eq!(
|
||||
response
|
||||
.headers()
|
||||
.get("Sec-WebSocket-Protocol")
|
||||
.and_then(|value| value.to_str().ok()),
|
||||
Some(CHANNEL_K8S_IO_PROTOCOL)
|
||||
);
|
||||
|
||||
let ready = recv_ws_message(&mut ws).await;
|
||||
let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame"))
|
||||
.expect("ready json");
|
||||
let ready_bytes = ready.into_data();
|
||||
let (ready_channel, ready_payload) = parse_channel_frame(&ready_bytes);
|
||||
assert_eq!(ready_channel, CH_STATUS);
|
||||
let ready_payload: Value = serde_json::from_slice(ready_payload).expect("ready json");
|
||||
assert_eq!(ready_payload["type"], "ready");
|
||||
assert_eq!(ready_payload["processId"], process_id);
|
||||
|
||||
ws.send(Message::Text(
|
||||
json!({
|
||||
"type": "input",
|
||||
"data": "hello from ws\n"
|
||||
})
|
||||
.to_string(),
|
||||
ws.send(Message::Binary(
|
||||
make_channel_frame(CH_STDIN, b"hello from ws\n").into(),
|
||||
))
|
||||
.await
|
||||
.expect("send input frame");
|
||||
|
||||
let mut saw_binary_output = false;
|
||||
ws.send(Message::Binary(
|
||||
make_channel_frame(CH_RESIZE, br#"{"cols":120,"rows":40}"#).into(),
|
||||
))
|
||||
.await
|
||||
.expect("send resize frame");
|
||||
|
||||
let mut saw_stdout = false;
|
||||
let mut saw_exit = false;
|
||||
let mut saw_close = false;
|
||||
for _ in 0..10 {
|
||||
let frame = recv_ws_message(&mut ws).await;
|
||||
match frame {
|
||||
Message::Binary(bytes) => {
|
||||
let text = String::from_utf8_lossy(&bytes);
|
||||
if text.contains("got:hello from ws") {
|
||||
saw_binary_output = true;
|
||||
let (channel, payload) = parse_channel_frame(&bytes);
|
||||
match channel {
|
||||
CH_STDOUT => {
|
||||
let text = String::from_utf8_lossy(payload);
|
||||
if text.contains("got:hello from ws") {
|
||||
saw_stdout = true;
|
||||
}
|
||||
}
|
||||
CH_STATUS => {
|
||||
let payload: Value =
|
||||
serde_json::from_slice(payload).expect("ws status json");
|
||||
if payload["type"] == "exit" {
|
||||
saw_exit = true;
|
||||
} else {
|
||||
assert_ne!(payload["type"], "error");
|
||||
}
|
||||
}
|
||||
CH_CLOSE => {
|
||||
assert!(payload.is_empty(), "close channel payload must be empty");
|
||||
saw_close = true;
|
||||
break;
|
||||
}
|
||||
other => panic!("unexpected websocket channel: {other}"),
|
||||
}
|
||||
}
|
||||
Message::Text(text) => {
|
||||
let payload: Value = serde_json::from_str(&text).expect("ws json");
|
||||
if payload["type"] == "exit" {
|
||||
saw_exit = true;
|
||||
break;
|
||||
}
|
||||
assert_ne!(payload["type"], "error");
|
||||
}
|
||||
Message::Close(_) => break,
|
||||
Message::Ping(_) | Message::Pong(_) => {}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(saw_binary_output, "expected pty binary output over websocket");
|
||||
assert!(saw_exit, "expected exit control frame over websocket");
|
||||
assert!(saw_stdout, "expected pty stdout over websocket");
|
||||
assert!(saw_exit, "expected exit status frame over websocket");
|
||||
assert!(saw_close, "expected close channel frame over websocket");
|
||||
|
||||
let _ = ws.close(None).await;
|
||||
|
||||
|
|
@ -605,10 +652,7 @@ async fn v1_process_terminal_ws_auth_e2e() {
|
|||
.expect("create process response");
|
||||
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
|
||||
let create_body: Value = create_response.json().await.expect("create process json");
|
||||
let process_id = create_body["id"]
|
||||
.as_str()
|
||||
.expect("process id")
|
||||
.to_string();
|
||||
let process_id = create_body["id"].as_str().expect("process id").to_string();
|
||||
|
||||
let unauth_ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
|
||||
let unauth_err = connect_async(&unauth_ws_url)
|
||||
|
|
@ -624,25 +668,42 @@ async fn v1_process_terminal_ws_auth_e2e() {
|
|||
let auth_ws_url = live_server.ws_url(&format!(
|
||||
"/v1/processes/{process_id}/terminal/ws?access_token={token}"
|
||||
));
|
||||
let (mut ws, _) = connect_async(&auth_ws_url)
|
||||
let mut ws_request = auth_ws_url.into_client_request().expect("ws request");
|
||||
ws_request.headers_mut().insert(
|
||||
"Sec-WebSocket-Protocol",
|
||||
HeaderValue::from_static(CHANNEL_K8S_IO_PROTOCOL),
|
||||
);
|
||||
let (mut ws, response) = connect_async(ws_request)
|
||||
.await
|
||||
.expect("authenticated websocket handshake");
|
||||
assert_eq!(
|
||||
response
|
||||
.headers()
|
||||
.get("Sec-WebSocket-Protocol")
|
||||
.and_then(|value| value.to_str().ok()),
|
||||
Some(CHANNEL_K8S_IO_PROTOCOL)
|
||||
);
|
||||
|
||||
let ready = recv_ws_message(&mut ws).await;
|
||||
let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame"))
|
||||
.expect("ready json");
|
||||
let ready_bytes = ready.into_data();
|
||||
let (ready_channel, ready_payload) = parse_channel_frame(&ready_bytes);
|
||||
assert_eq!(ready_channel, CH_STATUS);
|
||||
let ready_payload: Value = serde_json::from_slice(ready_payload).expect("ready json");
|
||||
assert_eq!(ready_payload["type"], "ready");
|
||||
assert_eq!(ready_payload["processId"], process_id);
|
||||
|
||||
let _ = ws
|
||||
.send(Message::Text(json!({ "type": "close" }).to_string()))
|
||||
.send(Message::Binary(make_channel_frame(CH_CLOSE, []).into()))
|
||||
.await;
|
||||
let close = recv_ws_message(&mut ws).await;
|
||||
let close_bytes = close.into_data();
|
||||
let (close_channel, close_payload) = parse_channel_frame(&close_bytes);
|
||||
assert_eq!(close_channel, CH_CLOSE);
|
||||
assert!(close_payload.is_empty());
|
||||
let _ = ws.close(None).await;
|
||||
|
||||
let kill_response = http
|
||||
.post(live_server.http_url(&format!(
|
||||
"/v1/processes/{process_id}/kill?waitMs=1000"
|
||||
)))
|
||||
.post(live_server.http_url(&format!("/v1/processes/{process_id}/kill?waitMs=1000")))
|
||||
.bearer_auth(token)
|
||||
.send()
|
||||
.await
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue