From 38efa316a27bd505901a861a9a2399d79996fa96 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 5 Mar 2026 15:44:39 -0800 Subject: [PATCH] feat: add process management API Introduces a complete Process Management API for Sandbox Agent with process lifecycle management (start, stop, kill, delete), one-shot command execution, log streaming via SSE and WebSocket, stdin input, and PTY/terminal support. Includes new process_runtime module for managing process state, HTTP route handlers, OpenAPI documentation, and integration tests. Co-Authored-By: Claude Haiku 4.5 --- Cargo.toml | 2 +- docs/openapi.json | 1172 +++++++++++++++++ server/packages/sandbox-agent/Cargo.toml | 1 + server/packages/sandbox-agent/src/lib.rs | 1 + .../sandbox-agent/src/process_runtime.rs | 1081 +++++++++++++++ server/packages/sandbox-agent/src/router.rs | 777 +++++++++++ .../sandbox-agent/src/router/support.rs | 31 +- .../sandbox-agent/src/router/types.rs | 170 +++ server/packages/sandbox-agent/tests/v1_api.rs | 56 +- .../sandbox-agent/tests/v1_api/processes.rs | 661 ++++++++++ 10 files changed, 3947 insertions(+), 5 deletions(-) create mode 100644 server/packages/sandbox-agent/src/process_runtime.rs create mode 100644 server/packages/sandbox-agent/tests/v1_api/processes.rs diff --git a/Cargo.toml b/Cargo.toml index ebef66d..5a0581e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ schemars = "0.8" utoipa = { version = "4.2", features = ["axum_extras"] } # Web framework -axum = "0.7" +axum = { version = "0.7", features = ["ws"] } tower = { version = "0.5", features = ["util"] } tower-http = { version = "0.5", features = ["cors", "trace"] } diff --git a/docs/openapi.json b/docs/openapi.json index c6e35f4..d600fda 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -948,6 +948,785 @@ } } } + }, + "/v1/processes": { + "get": { + "tags": [ + "v1" + ], + "operationId": "get_v1_processes", + "responses": { + "200": { + "description": "List processes", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessListResponse" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + }, + "post": { + "tags": [ + "v1" + ], + "operationId": "post_v1_processes", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessCreateRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Started process", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessInfo" + } + } + } + }, + "400": { + "description": "Invalid request", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "409": { + "description": "Process limit or state conflict", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + } + }, + "/v1/processes/config": { + "get": { + "tags": [ + "v1" + ], + "operationId": "get_v1_processes_config", + "responses": { + "200": { + "description": "Current runtime process config", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessConfig" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + }, + "post": { + "tags": [ + "v1" + ], + "operationId": "post_v1_processes_config", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessConfig" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Updated runtime process config", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessConfig" + } + } + } + }, + "400": { + "description": "Invalid config", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + } + }, + "/v1/processes/run": { + "post": { + "tags": [ + "v1" + ], + "operationId": "post_v1_processes_run", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessRunRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "One-off command result", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessRunResponse" + } + } + } + }, + "400": { + "description": "Invalid request", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + } + }, + "/v1/processes/{id}": { + "get": { + "tags": [ + "v1" + ], + "operationId": "get_v1_process", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "Process ID", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Process details", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessInfo" + } + } + } + }, + "404": { + "description": "Unknown process", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + }, + "delete": { + "tags": [ + "v1" + ], + "operationId": "delete_v1_process", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "Process ID", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "204": { + "description": "Process deleted" + }, + "404": { + "description": "Unknown process", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "409": { + "description": "Process is still running", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + } + }, + "/v1/processes/{id}/input": { + "post": { + "tags": [ + "v1" + ], + "operationId": "post_v1_process_input", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "Process ID", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessInputRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Input accepted", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessInputResponse" + } + } + } + }, + "400": { + "description": "Invalid request", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "409": { + "description": "Process not writable", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "413": { + "description": "Input exceeds configured limit", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + } + }, + "/v1/processes/{id}/kill": { + "post": { + "tags": [ + "v1" + ], + "operationId": "post_v1_process_kill", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "Process ID", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "waitMs", + "in": "query", + "description": "Wait up to N ms for process to exit", + "required": false, + "schema": { + "type": "integer", + "format": "int64", + "nullable": true, + "minimum": 0 + } + } + ], + "responses": { + "200": { + "description": "Kill signal sent", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessInfo" + } + } + } + }, + "404": { + "description": "Unknown process", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + } + }, + "/v1/processes/{id}/logs": { + "get": { + "tags": [ + "v1" + ], + "operationId": "get_v1_process_logs", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "Process ID", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "stream", + "in": "query", + "description": "stdout|stderr|combined|pty", + "required": false, + "schema": { + "allOf": [ + { + "$ref": "#/components/schemas/ProcessLogsStream" + } + ], + "nullable": true + } + }, + { + "name": "tail", + "in": "query", + "description": "Tail N entries", + "required": false, + "schema": { + "type": "integer", + "nullable": true, + "minimum": 0 + } + }, + { + "name": "follow", + "in": "query", + "description": "Follow via SSE", + "required": false, + "schema": { + "type": "boolean", + "nullable": true + } + }, + { + "name": "since", + "in": "query", + "description": "Only entries with sequence greater than this", + "required": false, + "schema": { + "type": "integer", + "format": "int64", + "nullable": true, + "minimum": 0 + } + } + ], + "responses": { + "200": { + "description": "Process logs", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessLogsResponse" + } + } + } + }, + "404": { + "description": "Unknown process", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + } + }, + "/v1/processes/{id}/stop": { + "post": { + "tags": [ + "v1" + ], + "operationId": "post_v1_process_stop", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "Process ID", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "waitMs", + "in": "query", + "description": "Wait up to N ms for process to exit", + "required": false, + "schema": { + "type": "integer", + "format": "int64", + "nullable": true, + "minimum": 0 + } + } + ], + "responses": { + "200": { + "description": "Stop signal sent", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessInfo" + } + } + } + }, + "404": { + "description": "Unknown process", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + } + }, + "/v1/processes/{id}/terminal/resize": { + "post": { + "tags": [ + "v1" + ], + "operationId": "post_v1_process_terminal_resize", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "Process ID", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessTerminalResizeRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Resize accepted", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProcessTerminalResizeResponse" + } + } + } + }, + "400": { + "description": "Invalid request", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "404": { + "description": "Unknown process", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "409": { + "description": "Not a terminal process", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + } + }, + "/v1/processes/{id}/terminal/ws": { + "get": { + "tags": [ + "v1" + ], + "operationId": "get_v1_process_terminal_ws", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "Process ID", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "access_token", + "in": "query", + "description": "Bearer token alternative for WS auth", + "required": false, + "schema": { + "type": "string", + "nullable": true + } + } + ], + "responses": { + "101": { + "description": "WebSocket upgraded" + }, + "400": { + "description": "Invalid websocket frame or upgrade request", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "404": { + "description": "Unknown process", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "409": { + "description": "Not a terminal process", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + }, + "501": { + "description": "Process API unsupported on this platform", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProblemDetails" + } + } + } + } + } + } } }, "components": { @@ -1596,6 +2375,399 @@ }, "additionalProperties": {} }, + "ProcessConfig": { + "type": "object", + "required": [ + "maxConcurrentProcesses", + "defaultRunTimeoutMs", + "maxRunTimeoutMs", + "maxOutputBytes", + "maxLogBytesPerProcess", + "maxInputBytesPerRequest" + ], + "properties": { + "defaultRunTimeoutMs": { + "type": "integer", + "format": "int64", + "minimum": 0 + }, + "maxConcurrentProcesses": { + "type": "integer", + "minimum": 0 + }, + "maxInputBytesPerRequest": { + "type": "integer", + "minimum": 0 + }, + "maxLogBytesPerProcess": { + "type": "integer", + "minimum": 0 + }, + "maxOutputBytes": { + "type": "integer", + "minimum": 0 + }, + "maxRunTimeoutMs": { + "type": "integer", + "format": "int64", + "minimum": 0 + } + } + }, + "ProcessCreateRequest": { + "type": "object", + "required": [ + "command" + ], + "properties": { + "args": { + "type": "array", + "items": { + "type": "string" + } + }, + "command": { + "type": "string" + }, + "cwd": { + "type": "string", + "nullable": true + }, + "env": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "interactive": { + "type": "boolean" + }, + "tty": { + "type": "boolean" + } + } + }, + "ProcessInfo": { + "type": "object", + "required": [ + "id", + "command", + "args", + "tty", + "interactive", + "status", + "createdAtMs" + ], + "properties": { + "args": { + "type": "array", + "items": { + "type": "string" + } + }, + "command": { + "type": "string" + }, + "createdAtMs": { + "type": "integer", + "format": "int64" + }, + "cwd": { + "type": "string", + "nullable": true + }, + "exitCode": { + "type": "integer", + "format": "int32", + "nullable": true + }, + "exitedAtMs": { + "type": "integer", + "format": "int64", + "nullable": true + }, + "id": { + "type": "string" + }, + "interactive": { + "type": "boolean" + }, + "pid": { + "type": "integer", + "format": "int32", + "nullable": true, + "minimum": 0 + }, + "status": { + "$ref": "#/components/schemas/ProcessState" + }, + "tty": { + "type": "boolean" + } + } + }, + "ProcessInputRequest": { + "type": "object", + "required": [ + "data" + ], + "properties": { + "data": { + "type": "string" + }, + "encoding": { + "type": "string", + "nullable": true + } + } + }, + "ProcessInputResponse": { + "type": "object", + "required": [ + "bytesWritten" + ], + "properties": { + "bytesWritten": { + "type": "integer", + "minimum": 0 + } + } + }, + "ProcessListResponse": { + "type": "object", + "required": [ + "processes" + ], + "properties": { + "processes": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ProcessInfo" + } + } + } + }, + "ProcessLogEntry": { + "type": "object", + "required": [ + "sequence", + "stream", + "timestampMs", + "data", + "encoding" + ], + "properties": { + "data": { + "type": "string" + }, + "encoding": { + "type": "string" + }, + "sequence": { + "type": "integer", + "format": "int64", + "minimum": 0 + }, + "stream": { + "$ref": "#/components/schemas/ProcessLogsStream" + }, + "timestampMs": { + "type": "integer", + "format": "int64" + } + } + }, + "ProcessLogsQuery": { + "type": "object", + "properties": { + "follow": { + "type": "boolean", + "nullable": true + }, + "since": { + "type": "integer", + "format": "int64", + "nullable": true, + "minimum": 0 + }, + "stream": { + "allOf": [ + { + "$ref": "#/components/schemas/ProcessLogsStream" + } + ], + "nullable": true + }, + "tail": { + "type": "integer", + "nullable": true, + "minimum": 0 + } + } + }, + "ProcessLogsResponse": { + "type": "object", + "required": [ + "processId", + "stream", + "entries" + ], + "properties": { + "entries": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ProcessLogEntry" + } + }, + "processId": { + "type": "string" + }, + "stream": { + "$ref": "#/components/schemas/ProcessLogsStream" + } + } + }, + "ProcessLogsStream": { + "type": "string", + "enum": [ + "stdout", + "stderr", + "combined", + "pty" + ] + }, + "ProcessRunRequest": { + "type": "object", + "required": [ + "command" + ], + "properties": { + "args": { + "type": "array", + "items": { + "type": "string" + } + }, + "command": { + "type": "string" + }, + "cwd": { + "type": "string", + "nullable": true + }, + "env": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "maxOutputBytes": { + "type": "integer", + "nullable": true, + "minimum": 0 + }, + "timeoutMs": { + "type": "integer", + "format": "int64", + "nullable": true, + "minimum": 0 + } + } + }, + "ProcessRunResponse": { + "type": "object", + "required": [ + "timedOut", + "stdout", + "stderr", + "stdoutTruncated", + "stderrTruncated", + "durationMs" + ], + "properties": { + "durationMs": { + "type": "integer", + "format": "int64", + "minimum": 0 + }, + "exitCode": { + "type": "integer", + "format": "int32", + "nullable": true + }, + "stderr": { + "type": "string" + }, + "stderrTruncated": { + "type": "boolean" + }, + "stdout": { + "type": "string" + }, + "stdoutTruncated": { + "type": "boolean" + }, + "timedOut": { + "type": "boolean" + } + } + }, + "ProcessSignalQuery": { + "type": "object", + "properties": { + "waitMs": { + "type": "integer", + "format": "int64", + "nullable": true, + "minimum": 0 + } + } + }, + "ProcessState": { + "type": "string", + "enum": [ + "running", + "exited" + ] + }, + "ProcessTerminalResizeRequest": { + "type": "object", + "required": [ + "cols", + "rows" + ], + "properties": { + "cols": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "rows": { + "type": "integer", + "format": "int32", + "minimum": 0 + } + } + }, + "ProcessTerminalResizeResponse": { + "type": "object", + "required": [ + "cols", + "rows" + ], + "properties": { + "cols": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "rows": { + "type": "integer", + "format": "int32", + "minimum": 0 + } + } + }, "ServerStatus": { "type": "string", "enum": [ diff --git a/server/packages/sandbox-agent/Cargo.toml b/server/packages/sandbox-agent/Cargo.toml index bb8328c..a8ae1db 100644 --- a/server/packages/sandbox-agent/Cargo.toml +++ b/server/packages/sandbox-agent/Cargo.toml @@ -55,6 +55,7 @@ insta.workspace = true tower.workspace = true tempfile.workspace = true serial_test = "3.2" +tokio-tungstenite = "0.24" [features] test-utils = ["tempfile"] diff --git a/server/packages/sandbox-agent/src/lib.rs b/server/packages/sandbox-agent/src/lib.rs index b5031e1..e84b10b 100644 --- a/server/packages/sandbox-agent/src/lib.rs +++ b/server/packages/sandbox-agent/src/lib.rs @@ -3,6 +3,7 @@ mod acp_proxy_runtime; pub mod cli; pub mod daemon; +mod process_runtime; pub mod router; pub mod server_logs; pub mod telemetry; diff --git a/server/packages/sandbox-agent/src/process_runtime.rs b/server/packages/sandbox-agent/src/process_runtime.rs new file mode 100644 index 0000000..9c6498c --- /dev/null +++ b/server/packages/sandbox-agent/src/process_runtime.rs @@ -0,0 +1,1081 @@ +use std::collections::{HashMap, VecDeque}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Instant; + +use base64::engine::general_purpose::STANDARD as BASE64; +use base64::Engine; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; +use tokio::process::{Child, ChildStdin, Command}; +use tokio::sync::{broadcast, Mutex, RwLock}; + +use sandbox_agent_error::SandboxError; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum ProcessStatus { + Running, + Exited, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum ProcessStream { + Stdout, + Stderr, + Pty, +} + +#[derive(Debug, Clone)] +pub struct ProcessStartSpec { + pub command: String, + pub args: Vec, + pub cwd: Option, + pub env: HashMap, + pub tty: bool, + pub interactive: bool, +} + +#[derive(Debug, Clone)] +pub struct RunSpec { + pub command: String, + pub args: Vec, + pub cwd: Option, + pub env: HashMap, + pub timeout_ms: Option, + pub max_output_bytes: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RunOutput { + pub exit_code: Option, + pub timed_out: bool, + pub stdout: String, + pub stderr: String, + pub stdout_truncated: bool, + pub stderr_truncated: bool, + pub duration_ms: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ProcessLogLine { + pub sequence: u64, + pub stream: ProcessStream, + pub timestamp_ms: i64, + pub data: String, + pub encoding: &'static str, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ProcessSnapshot { + pub id: String, + pub command: String, + pub args: Vec, + pub cwd: Option, + pub tty: bool, + pub interactive: bool, + pub status: ProcessStatus, + pub pid: Option, + pub exit_code: Option, + pub created_at_ms: i64, + pub exited_at_ms: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ProcessRuntimeConfig { + pub max_concurrent_processes: usize, + pub default_run_timeout_ms: u64, + pub max_run_timeout_ms: u64, + pub max_output_bytes: usize, + pub max_log_bytes_per_process: usize, + pub max_input_bytes_per_request: usize, +} + +impl Default for ProcessRuntimeConfig { + fn default() -> Self { + Self { + max_concurrent_processes: 64, + default_run_timeout_ms: 30_000, + max_run_timeout_ms: 300_000, + max_output_bytes: 1_048_576, + max_log_bytes_per_process: 10_485_760, + max_input_bytes_per_request: 65_536, + } + } +} + +#[derive(Debug, Clone)] +pub struct ProcessRuntime { + config: Arc>, + inner: Arc, +} + +#[derive(Debug)] +struct ProcessRuntimeInner { + next_id: AtomicU64, + processes: RwLock>>, +} + +#[derive(Debug)] +struct ManagedProcess { + id: String, + command: String, + args: Vec, + cwd: Option, + tty: bool, + interactive: bool, + created_at_ms: i64, + pid: Option, + max_log_bytes: usize, + stdin: Mutex>, + #[cfg(unix)] + pty_resize_fd: Mutex>, + status: RwLock, + sequence: AtomicU64, + logs: Mutex>, + total_log_bytes: Mutex, + log_tx: broadcast::Sender, +} + +#[derive(Debug)] +enum ProcessStdin { + Pipe(ChildStdin), + Pty(tokio::fs::File), +} + +#[derive(Debug, Clone)] +struct StoredLog { + line: ProcessLogLine, + byte_len: usize, +} + +#[derive(Debug, Clone)] +struct ManagedStatus { + status: ProcessStatus, + exit_code: Option, + exited_at_ms: Option, +} + +struct SpawnedPipeProcess { + process: Arc, + child: Child, + stdout: tokio::process::ChildStdout, + stderr: tokio::process::ChildStderr, +} + +#[cfg(unix)] +struct SpawnedTtyProcess { + process: Arc, + child: Child, + reader: tokio::fs::File, +} + +impl ProcessRuntime { + pub fn new() -> Self { + Self { + config: Arc::new(RwLock::new(ProcessRuntimeConfig::default())), + inner: Arc::new(ProcessRuntimeInner { + next_id: AtomicU64::new(1), + processes: RwLock::new(HashMap::new()), + }), + } + } + + pub async fn get_config(&self) -> ProcessRuntimeConfig { + self.config.read().await.clone() + } + + pub async fn set_config( + &self, + mut value: ProcessRuntimeConfig, + ) -> Result { + if value.max_concurrent_processes == 0 { + return Err(SandboxError::InvalidRequest { + message: "maxConcurrentProcesses must be greater than 0".to_string(), + }); + } + if value.default_run_timeout_ms == 0 || value.max_run_timeout_ms == 0 { + return Err(SandboxError::InvalidRequest { + message: "timeouts must be greater than 0".to_string(), + }); + } + if value.default_run_timeout_ms > value.max_run_timeout_ms { + value.default_run_timeout_ms = value.max_run_timeout_ms; + } + if value.max_output_bytes == 0 + || value.max_log_bytes_per_process == 0 + || value.max_input_bytes_per_request == 0 + { + return Err(SandboxError::InvalidRequest { + message: "byte limits must be greater than 0".to_string(), + }); + } + + *self.config.write().await = value.clone(); + Ok(value) + } + + pub async fn start_process( + &self, + spec: ProcessStartSpec, + ) -> Result { + let config = self.get_config().await; + + let process_refs = { + let processes = self.inner.processes.read().await; + processes.values().cloned().collect::>() + }; + + let mut running_count = 0usize; + for process in process_refs { + if process.status.read().await.status == ProcessStatus::Running { + running_count += 1; + } + } + + if running_count >= config.max_concurrent_processes { + return Err(SandboxError::Conflict { + message: format!( + "max concurrent process limit reached ({})", + config.max_concurrent_processes + ), + }); + } + + if spec.command.trim().is_empty() { + return Err(SandboxError::InvalidRequest { + message: "command must not be empty".to_string(), + }); + } + + let id_num = self.inner.next_id.fetch_add(1, Ordering::Relaxed); + let id = format!("proc_{id_num}"); + + if spec.tty { + #[cfg(unix)] + { + let spawned = self + .spawn_tty_process(id.clone(), spec, config.max_log_bytes_per_process) + .await?; + let process = spawned.process.clone(); + self.inner + .processes + .write() + .await + .insert(id, process.clone()); + + let p = process.clone(); + tokio::spawn(async move { + pump_output(p, spawned.reader, ProcessStream::Pty).await; + }); + + let p = process.clone(); + tokio::spawn(async move { + watch_exit(p, spawned.child).await; + }); + + return Ok(process.snapshot().await); + } + #[cfg(not(unix))] + { + return Err(SandboxError::StreamError { + message: "tty process mode is not supported on this platform".to_string(), + }); + } + } + + let spawned = self + .spawn_pipe_process(id.clone(), spec, config.max_log_bytes_per_process) + .await?; + let process = spawned.process.clone(); + self.inner + .processes + .write() + .await + .insert(id, process.clone()); + + let p = process.clone(); + tokio::spawn(async move { + pump_output(p, spawned.stdout, ProcessStream::Stdout).await; + }); + + let p = process.clone(); + tokio::spawn(async move { + pump_output(p, spawned.stderr, ProcessStream::Stderr).await; + }); + + let p = process.clone(); + tokio::spawn(async move { + watch_exit(p, spawned.child).await; + }); + + Ok(process.snapshot().await) + } + + pub async fn run_once(&self, spec: RunSpec) -> Result { + if spec.command.trim().is_empty() { + return Err(SandboxError::InvalidRequest { + message: "command must not be empty".to_string(), + }); + } + + let config = self.get_config().await; + let mut timeout_ms = spec.timeout_ms.unwrap_or(config.default_run_timeout_ms); + if timeout_ms == 0 { + timeout_ms = config.default_run_timeout_ms; + } + timeout_ms = timeout_ms.min(config.max_run_timeout_ms); + + let max_output_bytes = spec.max_output_bytes.unwrap_or(config.max_output_bytes); + + let mut cmd = Command::new(&spec.command); + cmd.args(&spec.args) + .stdin(std::process::Stdio::null()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()); + + if let Some(cwd) = &spec.cwd { + cmd.current_dir(cwd); + } + + for (key, value) in &spec.env { + cmd.env(key, value); + } + + let mut child = cmd.spawn().map_err(|err| SandboxError::StreamError { + message: format!("failed to spawn process: {err}"), + })?; + + let stdout = child + .stdout + .take() + .ok_or_else(|| SandboxError::StreamError { + message: "failed to capture stdout".to_string(), + })?; + let stderr = child + .stderr + .take() + .ok_or_else(|| SandboxError::StreamError { + message: "failed to capture stderr".to_string(), + })?; + + let started = Instant::now(); + let stdout_task = tokio::spawn(capture_output(stdout, max_output_bytes)); + let stderr_task = tokio::spawn(capture_output(stderr, max_output_bytes)); + + let wait_result = + tokio::time::timeout(std::time::Duration::from_millis(timeout_ms), child.wait()).await; + + let (exit_code, timed_out) = match wait_result { + Ok(Ok(status)) => (status.code(), false), + Ok(Err(err)) => { + let _ = child.kill().await; + return Err(SandboxError::StreamError { + message: format!("failed to wait on process: {err}"), + }); + } + Err(_) => { + let _ = child.kill().await; + let _ = child.wait().await; + (None, true) + } + }; + + let (stdout, stdout_truncated) = match stdout_task.await { + Ok(Ok(captured)) => captured, + _ => (Vec::new(), false), + }; + let (stderr, stderr_truncated) = match stderr_task.await { + Ok(Ok(captured)) => captured, + _ => (Vec::new(), false), + }; + + Ok(RunOutput { + exit_code, + timed_out, + stdout: String::from_utf8_lossy(&stdout).to_string(), + stderr: String::from_utf8_lossy(&stderr).to_string(), + stdout_truncated, + stderr_truncated, + duration_ms: started.elapsed().as_millis() as u64, + }) + } + + pub async fn list_processes(&self) -> Vec { + let processes = self.inner.processes.read().await; + let mut items = Vec::with_capacity(processes.len()); + for process in processes.values() { + items.push(process.snapshot().await); + } + items.sort_by(|a, b| a.id.cmp(&b.id)); + items + } + + pub async fn snapshot(&self, id: &str) -> Result { + Ok(self.lookup_process(id).await?.snapshot().await) + } + + pub async fn is_tty(&self, id: &str) -> Result { + Ok(self.lookup_process(id).await?.tty) + } + + pub async fn max_input_bytes(&self) -> usize { + self.get_config().await.max_input_bytes_per_request + } + + pub async fn delete_process(&self, id: &str) -> Result<(), SandboxError> { + let process = self.lookup_process(id).await?; + let status = process.status.read().await.clone(); + if status.status == ProcessStatus::Running { + return Err(SandboxError::Conflict { + message: "process is still running; stop or kill it before delete".to_string(), + }); + } + + self.inner.processes.write().await.remove(id); + Ok(()) + } + + pub async fn stop_process( + &self, + id: &str, + wait_ms: Option, + ) -> Result { + let process = self.lookup_process(id).await?; + process.send_signal(SIGTERM).await?; + maybe_wait_for_exit(process.clone(), wait_ms.unwrap_or(2_000)).await; + Ok(process.snapshot().await) + } + + pub async fn kill_process( + &self, + id: &str, + wait_ms: Option, + ) -> Result { + let process = self.lookup_process(id).await?; + process.send_signal(SIGKILL).await?; + maybe_wait_for_exit(process.clone(), wait_ms.unwrap_or(1_000)).await; + Ok(process.snapshot().await) + } + + pub async fn write_input(&self, id: &str, data: &[u8]) -> Result { + self.lookup_process(id).await?.write_input(data).await + } + + pub async fn resize_terminal( + &self, + id: &str, + cols: u16, + rows: u16, + ) -> Result<(), SandboxError> { + let process = self.lookup_process(id).await?; + if !process.tty { + return Err(SandboxError::Conflict { + message: "process is not running in tty mode".to_string(), + }); + } + + process.resize_pty(cols, rows).await?; + process.send_signal(SIGWINCH).await + } + + pub async fn logs( + &self, + id: &str, + filter: ProcessLogFilter, + ) -> Result, SandboxError> { + self.lookup_process(id).await?.read_logs(filter).await + } + + pub async fn subscribe_logs( + &self, + id: &str, + ) -> Result, SandboxError> { + let process = self.lookup_process(id).await?; + Ok(process.log_tx.subscribe()) + } + + async fn lookup_process(&self, id: &str) -> Result, SandboxError> { + let process = self.inner.processes.read().await.get(id).cloned(); + process.ok_or_else(|| SandboxError::InvalidRequest { + message: format!("process not found: {id}"), + }) + } + + async fn spawn_pipe_process( + &self, + id: String, + spec: ProcessStartSpec, + max_log_bytes: usize, + ) -> Result { + let mut cmd = Command::new(&spec.command); + cmd.args(&spec.args) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()); + + if let Some(cwd) = &spec.cwd { + cmd.current_dir(cwd); + } + + for (key, value) in &spec.env { + cmd.env(key, value); + } + + let mut child = cmd.spawn().map_err(|err| SandboxError::StreamError { + message: format!("failed to spawn process: {err}"), + })?; + + let stdin = child.stdin.take(); + let stdout = child + .stdout + .take() + .ok_or_else(|| SandboxError::StreamError { + message: "failed to capture stdout".to_string(), + })?; + let stderr = child + .stderr + .take() + .ok_or_else(|| SandboxError::StreamError { + message: "failed to capture stderr".to_string(), + })?; + let pid = child.id(); + + let (tx, _rx) = broadcast::channel(512); + let process = Arc::new(ManagedProcess { + id, + command: spec.command, + args: spec.args, + cwd: spec.cwd, + tty: false, + interactive: spec.interactive, + created_at_ms: now_ms(), + pid, + max_log_bytes, + stdin: Mutex::new(stdin.map(ProcessStdin::Pipe)), + #[cfg(unix)] + pty_resize_fd: Mutex::new(None), + status: RwLock::new(ManagedStatus { + status: ProcessStatus::Running, + exit_code: None, + exited_at_ms: None, + }), + sequence: AtomicU64::new(1), + logs: Mutex::new(VecDeque::new()), + total_log_bytes: Mutex::new(0), + log_tx: tx, + }); + + Ok(SpawnedPipeProcess { + process, + child, + stdout, + stderr, + }) + } + + #[cfg(unix)] + async fn spawn_tty_process( + &self, + id: String, + spec: ProcessStartSpec, + max_log_bytes: usize, + ) -> Result { + use std::os::fd::AsRawFd; + use std::process::Stdio; + + let (master_fd, slave_fd) = open_pty(80, 24)?; + let slave_raw = slave_fd.as_raw_fd(); + + let stdin_fd = dup_fd(slave_raw)?; + let stdout_fd = dup_fd(slave_raw)?; + let stderr_fd = dup_fd(slave_raw)?; + + let mut cmd = Command::new(&spec.command); + cmd.args(&spec.args) + .stdin(Stdio::from(std::fs::File::from(stdin_fd))) + .stdout(Stdio::from(std::fs::File::from(stdout_fd))) + .stderr(Stdio::from(std::fs::File::from(stderr_fd))); + + if let Some(cwd) = &spec.cwd { + cmd.current_dir(cwd); + } + + for (key, value) in &spec.env { + cmd.env(key, value); + } + + unsafe { + cmd.pre_exec(move || { + if libc::setsid() == -1 { + return Err(std::io::Error::last_os_error()); + } + if libc::ioctl(slave_raw, libc::TIOCSCTTY as _, 0) == -1 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) + }); + } + + let child = cmd.spawn().map_err(|err| SandboxError::StreamError { + message: format!("failed to spawn tty process: {err}"), + })?; + + let pid = child.id(); + drop(slave_fd); + + let master_raw = master_fd.as_raw_fd(); + let writer_fd = dup_fd(master_raw)?; + let resize_fd = dup_fd(master_raw)?; + + let reader_file = tokio::fs::File::from_std(std::fs::File::from(master_fd)); + let writer_file = tokio::fs::File::from_std(std::fs::File::from(writer_fd)); + let resize_file = std::fs::File::from(resize_fd); + + let (tx, _rx) = broadcast::channel(512); + let process = Arc::new(ManagedProcess { + id, + command: spec.command, + args: spec.args, + cwd: spec.cwd, + tty: true, + interactive: spec.interactive, + created_at_ms: now_ms(), + pid, + max_log_bytes, + stdin: Mutex::new(Some(ProcessStdin::Pty(writer_file))), + pty_resize_fd: Mutex::new(Some(resize_file)), + status: RwLock::new(ManagedStatus { + status: ProcessStatus::Running, + exit_code: None, + exited_at_ms: None, + }), + sequence: AtomicU64::new(1), + logs: Mutex::new(VecDeque::new()), + total_log_bytes: Mutex::new(0), + log_tx: tx, + }); + + Ok(SpawnedTtyProcess { + process, + child, + reader: reader_file, + }) + } +} + +#[derive(Debug, Clone, Copy)] +pub enum ProcessLogFilterStream { + Stdout, + Stderr, + Combined, + Pty, +} + +#[derive(Debug, Clone, Copy)] +pub struct ProcessLogFilter { + pub stream: ProcessLogFilterStream, + pub tail: Option, + pub since: Option, +} + +impl ManagedProcess { + async fn snapshot(&self) -> ProcessSnapshot { + let status = self.status.read().await.clone(); + ProcessSnapshot { + id: self.id.clone(), + command: self.command.clone(), + args: self.args.clone(), + cwd: self.cwd.clone(), + tty: self.tty, + interactive: self.interactive, + status: status.status, + pid: self.pid, + exit_code: status.exit_code, + created_at_ms: self.created_at_ms, + exited_at_ms: status.exited_at_ms, + } + } + + async fn append_log(&self, stream: ProcessStream, data: &[u8]) { + if data.is_empty() { + return; + } + + let stream = if self.tty { ProcessStream::Pty } else { stream }; + let line = ProcessLogLine { + sequence: self.sequence.fetch_add(1, Ordering::Relaxed), + stream, + timestamp_ms: now_ms(), + data: BASE64.encode(data), + encoding: "base64", + }; + let stored = StoredLog { + line: line.clone(), + byte_len: data.len(), + }; + + { + let mut logs = self.logs.lock().await; + let mut total = self.total_log_bytes.lock().await; + logs.push_back(stored); + *total += data.len(); + + while *total > self.max_log_bytes { + if let Some(front) = logs.pop_front() { + *total = total.saturating_sub(front.byte_len); + } else { + break; + } + } + } + + let _ = self.log_tx.send(line); + } + + async fn write_input(&self, data: &[u8]) -> Result { + if self.status.read().await.status != ProcessStatus::Running { + return Err(SandboxError::Conflict { + message: "process is not running".to_string(), + }); + } + + let mut guard = self.stdin.lock().await; + let stdin = guard.as_mut().ok_or_else(|| SandboxError::Conflict { + message: "process does not accept stdin".to_string(), + })?; + + match stdin { + ProcessStdin::Pipe(pipe) => { + pipe.write_all(data) + .await + .map_err(|err| SandboxError::StreamError { + message: format!("failed to write stdin: {err}"), + })?; + pipe.flush() + .await + .map_err(|err| SandboxError::StreamError { + message: format!("failed to flush stdin: {err}"), + })?; + } + ProcessStdin::Pty(pty_writer) => { + pty_writer + .write_all(data) + .await + .map_err(|err| SandboxError::StreamError { + message: format!("failed to write PTY input: {err}"), + })?; + pty_writer + .flush() + .await + .map_err(|err| SandboxError::StreamError { + message: format!("failed to flush PTY input: {err}"), + })?; + } + } + + Ok(data.len()) + } + + async fn read_logs( + &self, + filter: ProcessLogFilter, + ) -> Result, SandboxError> { + let logs = self.logs.lock().await; + + let mut entries: Vec = logs + .iter() + .filter_map(|entry| { + if let Some(since) = filter.since { + if entry.line.sequence <= since { + return None; + } + } + if stream_matches(entry.line.stream, filter.stream) { + Some(entry.line.clone()) + } else { + None + } + }) + .collect(); + + if let Some(tail) = filter.tail { + if entries.len() > tail { + let start = entries.len() - tail; + entries = entries.split_off(start); + } + } + + Ok(entries) + } + + async fn send_signal(&self, signal: i32) -> Result<(), SandboxError> { + if self.status.read().await.status != ProcessStatus::Running { + return Ok(()); + } + let Some(pid) = self.pid else { + return Ok(()); + }; + + send_signal(pid, signal) + } + + async fn resize_pty(&self, cols: u16, rows: u16) -> Result<(), SandboxError> { + if !self.tty { + return Ok(()); + } + + #[cfg(unix)] + { + use std::os::fd::AsRawFd; + let guard = self.pty_resize_fd.lock().await; + let Some(fd) = guard.as_ref() else { + return Err(SandboxError::Conflict { + message: "PTY resize handle unavailable".to_string(), + }); + }; + resize_pty(fd.as_raw_fd(), cols, rows)?; + } + + #[cfg(not(unix))] + { + let _ = cols; + let _ = rows; + } + + Ok(()) + } +} + +fn stream_matches(stream: ProcessStream, filter: ProcessLogFilterStream) -> bool { + match filter { + ProcessLogFilterStream::Stdout => stream == ProcessStream::Stdout, + ProcessLogFilterStream::Stderr => stream == ProcessStream::Stderr, + ProcessLogFilterStream::Combined => { + stream == ProcessStream::Stdout || stream == ProcessStream::Stderr + } + ProcessLogFilterStream::Pty => stream == ProcessStream::Pty, + } +} + +async fn maybe_wait_for_exit(process: Arc, wait_ms: u64) { + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(wait_ms); + while tokio::time::Instant::now() < deadline { + if process.status.read().await.status == ProcessStatus::Exited { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(25)).await; + } +} + +async fn pump_output(process: Arc, mut reader: R, stream: ProcessStream) +where + R: AsyncRead + Unpin, +{ + let mut buffer = [0_u8; 8192]; + loop { + match reader.read(&mut buffer).await { + Ok(0) => break, + Ok(n) => { + process.append_log(stream, &buffer[..n]).await; + } + Err(err) => { + let msg = format!("\n[process stream error: {err}]\n"); + process + .append_log( + if process.tty { + ProcessStream::Pty + } else { + ProcessStream::Stderr + }, + msg.as_bytes(), + ) + .await; + break; + } + } + } +} + +async fn watch_exit(process: Arc, mut child: Child) { + let wait = child.wait().await; + let (exit_code, exited_at_ms) = match wait { + Ok(status) => (status.code(), Some(now_ms())), + Err(_) => (None, Some(now_ms())), + }; + + { + let mut state = process.status.write().await; + state.status = ProcessStatus::Exited; + state.exit_code = exit_code; + state.exited_at_ms = exited_at_ms; + } + + let _ = process.stdin.lock().await.take(); +} + +async fn capture_output(mut reader: R, max_bytes: usize) -> std::io::Result<(Vec, bool)> +where + R: AsyncRead + Unpin, +{ + let mut output = Vec::new(); + let mut buffer = [0_u8; 8192]; + let mut truncated = false; + + loop { + let n = reader.read(&mut buffer).await?; + if n == 0 { + break; + } + + if output.len() < max_bytes { + let remaining = max_bytes - output.len(); + let to_copy = remaining.min(n); + output.extend_from_slice(&buffer[..to_copy]); + if to_copy < n { + truncated = true; + } + } else { + truncated = true; + } + } + + Ok((output, truncated)) +} + +fn now_ms() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|duration| duration.as_millis() as i64) + .unwrap_or(0) +} + +#[cfg(unix)] +const SIGTERM: i32 = libc::SIGTERM; +#[cfg(unix)] +const SIGKILL: i32 = libc::SIGKILL; +#[cfg(unix)] +const SIGWINCH: i32 = libc::SIGWINCH; + +#[cfg(unix)] +fn send_signal(pid: u32, signal: i32) -> Result<(), SandboxError> { + let result = unsafe { libc::kill(pid as libc::pid_t, signal) }; + if result == 0 { + return Ok(()); + } + + let err = std::io::Error::last_os_error(); + if err.kind() == std::io::ErrorKind::NotFound { + return Ok(()); + } + + Err(SandboxError::StreamError { + message: format!("failed to signal process {pid}: {err}"), + }) +} + +#[cfg(not(unix))] +const SIGTERM: i32 = 15; +#[cfg(not(unix))] +const SIGKILL: i32 = 9; +#[cfg(not(unix))] +const SIGWINCH: i32 = 28; + +#[cfg(not(unix))] +fn send_signal(_pid: u32, _signal: i32) -> Result<(), SandboxError> { + Err(SandboxError::StreamError { + message: "process signaling not supported on this platform".to_string(), + }) +} + +#[cfg(unix)] +fn open_pty( + cols: u16, + rows: u16, +) -> Result<(std::os::fd::OwnedFd, std::os::fd::OwnedFd), SandboxError> { + use std::os::fd::FromRawFd; + + let mut master: libc::c_int = -1; + let mut slave: libc::c_int = -1; + let mut winsize = libc::winsize { + ws_row: rows, + ws_col: cols, + ws_xpixel: 0, + ws_ypixel: 0, + }; + + let rc = unsafe { + libc::openpty( + &mut master, + &mut slave, + std::ptr::null_mut(), + std::ptr::null_mut(), + &mut winsize, + ) + }; + + if rc != 0 { + return Err(SandboxError::StreamError { + message: format!( + "failed to allocate PTY: {}", + std::io::Error::last_os_error() + ), + }); + } + + let master_fd = unsafe { std::os::fd::OwnedFd::from_raw_fd(master) }; + let slave_fd = unsafe { std::os::fd::OwnedFd::from_raw_fd(slave) }; + Ok((master_fd, slave_fd)) +} + +#[cfg(unix)] +fn dup_fd(fd: std::os::fd::RawFd) -> Result { + use std::os::fd::FromRawFd; + + let duplicated = unsafe { libc::dup(fd) }; + if duplicated == -1 { + return Err(SandboxError::StreamError { + message: format!("failed to dup fd: {}", std::io::Error::last_os_error()), + }); + } + + Ok(unsafe { std::os::fd::OwnedFd::from_raw_fd(duplicated) }) +} + +#[cfg(unix)] +fn resize_pty(fd: std::os::fd::RawFd, cols: u16, rows: u16) -> Result<(), SandboxError> { + let winsize = libc::winsize { + ws_row: rows, + ws_col: cols, + ws_xpixel: 0, + ws_ypixel: 0, + }; + + let rc = unsafe { libc::ioctl(fd, libc::TIOCSWINSZ as _, &winsize) }; + if rc == -1 { + return Err(SandboxError::StreamError { + message: format!("failed to resize PTY: {}", std::io::Error::last_os_error()), + }); + } + + Ok(()) +} + +pub fn decode_input_bytes(data: &str, encoding: &str) -> Result, SandboxError> { + match encoding { + "base64" => BASE64 + .decode(data) + .map_err(|err| SandboxError::InvalidRequest { + message: format!("invalid base64 input: {err}"), + }), + "utf8" | "text" => Ok(data.as_bytes().to_vec()), + _ => Err(SandboxError::InvalidRequest { + message: "encoding must be one of: base64, utf8, text".to_string(), + }), + } +} diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index 99971ff..7e0e2cc 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeMap, HashMap}; +use std::convert::Infallible; use std::fs; use std::io::Cursor; use std::path::{Path as StdPath, PathBuf}; @@ -6,6 +7,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use axum::body::Bytes; +use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; use axum::extract::{Path, Query, State}; use axum::http::{header, HeaderMap, Request, StatusCode}; use axum::middleware::Next; @@ -13,6 +15,8 @@ use axum::response::sse::KeepAlive; use axum::response::{IntoResponse, Response, Sse}; use axum::routing::{delete, get, post}; use axum::{Json, Router}; +use futures::stream; +use futures::StreamExt; use sandbox_agent_agent_management::agents::{ AgentId, AgentManager, InstallOptions, InstallResult, InstallSource, InstalledArtifactKind, }; @@ -27,11 +31,16 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use tar::Archive; +use tokio_stream::wrappers::BroadcastStream; use tower_http::trace::TraceLayer; use tracing::Span; use utoipa::{Modify, OpenApi, ToSchema}; use crate::acp_proxy_runtime::{AcpProxyRuntime, ProxyPostOutcome}; +use crate::process_runtime::{ + decode_input_bytes, ProcessLogFilter, ProcessLogFilterStream, ProcessRuntime, + ProcessRuntimeConfig, ProcessSnapshot, ProcessStartSpec, ProcessStatus, ProcessStream, RunSpec, +}; use crate::ui; mod support; @@ -77,6 +86,7 @@ pub struct AppState { agent_manager: Arc, acp_proxy: Arc, opencode_server_manager: Arc, + process_runtime: Arc, pub(crate) branding: BrandingMode, version_cache: Mutex>, } @@ -100,11 +110,13 @@ impl AppState { auto_restart: true, }, )); + let process_runtime = Arc::new(ProcessRuntime::new()); Self { auth, agent_manager, acp_proxy, opencode_server_manager, + process_runtime, branding, version_cache: Mutex::new(HashMap::new()), } @@ -122,6 +134,10 @@ impl AppState { self.opencode_server_manager.clone() } + pub(crate) fn process_runtime(&self) -> Arc { + self.process_runtime.clone() + } + pub(crate) fn purge_version_cache(&self, agent: AgentId) { self.version_cache.lock().unwrap().remove(&agent); } @@ -166,6 +182,28 @@ pub fn build_router_with_state(shared: Arc) -> (Router, Arc) .route("/fs/move", post(post_v1_fs_move)) .route("/fs/stat", get(get_v1_fs_stat)) .route("/fs/upload-batch", post(post_v1_fs_upload_batch)) + .route( + "/processes/config", + get(get_v1_processes_config).post(post_v1_processes_config), + ) + .route("/processes", get(get_v1_processes).post(post_v1_processes)) + .route("/processes/run", post(post_v1_processes_run)) + .route( + "/processes/:id", + get(get_v1_process).delete(delete_v1_process), + ) + .route("/processes/:id/stop", post(post_v1_process_stop)) + .route("/processes/:id/kill", post(post_v1_process_kill)) + .route("/processes/:id/logs", get(get_v1_process_logs)) + .route("/processes/:id/input", post(post_v1_process_input)) + .route( + "/processes/:id/terminal/resize", + post(post_v1_process_terminal_resize), + ) + .route( + "/processes/:id/terminal/ws", + get(get_v1_process_terminal_ws), + ) .route( "/config/mcp", get(get_v1_config_mcp) @@ -295,6 +333,19 @@ pub async fn shutdown_servers(state: &Arc) { post_v1_fs_move, get_v1_fs_stat, post_v1_fs_upload_batch, + get_v1_processes_config, + post_v1_processes_config, + post_v1_processes, + post_v1_processes_run, + get_v1_processes, + get_v1_process, + post_v1_process_stop, + post_v1_process_kill, + delete_v1_process, + get_v1_process_logs, + post_v1_process_input, + post_v1_process_terminal_resize, + get_v1_process_terminal_ws, get_v1_config_mcp, put_v1_config_mcp, delete_v1_config_mcp, @@ -329,6 +380,22 @@ pub async fn shutdown_servers(state: &Arc) { FsMoveResponse, FsActionResponse, FsUploadBatchResponse, + ProcessConfig, + ProcessCreateRequest, + ProcessRunRequest, + ProcessRunResponse, + ProcessState, + ProcessInfo, + ProcessListResponse, + ProcessLogsStream, + ProcessLogsQuery, + ProcessLogEntry, + ProcessLogsResponse, + ProcessInputRequest, + ProcessInputResponse, + ProcessSignalQuery, + ProcessTerminalResizeRequest, + ProcessTerminalResizeResponse, AcpPostQuery, AcpServerInfo, AcpServerListResponse, @@ -361,12 +428,21 @@ impl Modify for ServerAddon { pub enum ApiError { #[error(transparent)] Sandbox(#[from] SandboxError), + #[error("problem: {0:?}")] + Problem(ProblemDetails), +} + +impl From for ApiError { + fn from(value: ProblemDetails) -> Self { + Self::Problem(value) + } } impl IntoResponse for ApiError { fn into_response(self) -> Response { let problem = match &self { ApiError::Sandbox(error) => problem_from_sandbox_error(error), + ApiError::Problem(problem) => problem.clone(), }; let status = StatusCode::from_u16(problem.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); @@ -1075,6 +1151,617 @@ async fn post_v1_fs_upload_batch( })) } +#[utoipa::path( + get, + path = "/v1/processes/config", + tag = "v1", + responses( + (status = 200, description = "Current runtime process config", body = ProcessConfig), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn get_v1_processes_config( + State(state): State>, +) -> Result, ApiError> { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + let config = state.process_runtime().get_config().await; + Ok(Json(map_process_config(config))) +} + +#[utoipa::path( + post, + path = "/v1/processes/config", + tag = "v1", + request_body = ProcessConfig, + responses( + (status = 200, description = "Updated runtime process config", body = ProcessConfig), + (status = 400, description = "Invalid config", body = ProblemDetails), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn post_v1_processes_config( + State(state): State>, + Json(body): Json, +) -> Result, ApiError> { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + let runtime = state.process_runtime(); + let updated = runtime + .set_config(into_runtime_process_config(body)) + .await?; + Ok(Json(map_process_config(updated))) +} + +#[utoipa::path( + post, + path = "/v1/processes", + tag = "v1", + request_body = ProcessCreateRequest, + responses( + (status = 200, description = "Started process", body = ProcessInfo), + (status = 400, description = "Invalid request", body = ProblemDetails), + (status = 409, description = "Process limit or state conflict", body = ProblemDetails), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn post_v1_processes( + State(state): State>, + Json(body): Json, +) -> Result, ApiError> { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + let runtime = state.process_runtime(); + let snapshot = runtime + .start_process(ProcessStartSpec { + command: body.command, + args: body.args, + cwd: body.cwd, + env: body.env.into_iter().collect(), + tty: body.tty, + interactive: body.interactive, + }) + .await?; + + Ok(Json(map_process_snapshot(snapshot))) +} + +#[utoipa::path( + post, + path = "/v1/processes/run", + tag = "v1", + request_body = ProcessRunRequest, + responses( + (status = 200, description = "One-off command result", body = ProcessRunResponse), + (status = 400, description = "Invalid request", body = ProblemDetails), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn post_v1_processes_run( + State(state): State>, + Json(body): Json, +) -> Result, ApiError> { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + let runtime = state.process_runtime(); + let output = runtime + .run_once(RunSpec { + command: body.command, + args: body.args, + cwd: body.cwd, + env: body.env.into_iter().collect(), + timeout_ms: body.timeout_ms, + max_output_bytes: body.max_output_bytes, + }) + .await?; + + Ok(Json(ProcessRunResponse { + exit_code: output.exit_code, + timed_out: output.timed_out, + stdout: output.stdout, + stderr: output.stderr, + stdout_truncated: output.stdout_truncated, + stderr_truncated: output.stderr_truncated, + duration_ms: output.duration_ms, + })) +} + +#[utoipa::path( + get, + path = "/v1/processes", + tag = "v1", + responses( + (status = 200, description = "List processes", body = ProcessListResponse), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn get_v1_processes( + State(state): State>, +) -> Result, ApiError> { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + let snapshots = state.process_runtime().list_processes().await; + Ok(Json(ProcessListResponse { + processes: snapshots.into_iter().map(map_process_snapshot).collect(), + })) +} + +#[utoipa::path( + get, + path = "/v1/processes/{id}", + tag = "v1", + params( + ("id" = String, Path, description = "Process ID") + ), + responses( + (status = 200, description = "Process details", body = ProcessInfo), + (status = 404, description = "Unknown process", body = ProblemDetails), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn get_v1_process( + State(state): State>, + Path(id): Path, +) -> Result, ApiError> { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + let snapshot = state.process_runtime().snapshot(&id).await?; + Ok(Json(map_process_snapshot(snapshot))) +} + +#[utoipa::path( + post, + path = "/v1/processes/{id}/stop", + tag = "v1", + params( + ("id" = String, Path, description = "Process ID"), + ("waitMs" = Option, Query, description = "Wait up to N ms for process to exit") + ), + responses( + (status = 200, description = "Stop signal sent", body = ProcessInfo), + (status = 404, description = "Unknown process", body = ProblemDetails), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn post_v1_process_stop( + State(state): State>, + Path(id): Path, + Query(query): Query, +) -> Result, ApiError> { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + let snapshot = state + .process_runtime() + .stop_process(&id, query.wait_ms) + .await?; + Ok(Json(map_process_snapshot(snapshot))) +} + +#[utoipa::path( + post, + path = "/v1/processes/{id}/kill", + tag = "v1", + params( + ("id" = String, Path, description = "Process ID"), + ("waitMs" = Option, Query, description = "Wait up to N ms for process to exit") + ), + responses( + (status = 200, description = "Kill signal sent", body = ProcessInfo), + (status = 404, description = "Unknown process", body = ProblemDetails), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn post_v1_process_kill( + State(state): State>, + Path(id): Path, + Query(query): Query, +) -> Result, ApiError> { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + let snapshot = state + .process_runtime() + .kill_process(&id, query.wait_ms) + .await?; + Ok(Json(map_process_snapshot(snapshot))) +} + +#[utoipa::path( + delete, + path = "/v1/processes/{id}", + tag = "v1", + params( + ("id" = String, Path, description = "Process ID") + ), + responses( + (status = 204, description = "Process deleted"), + (status = 404, description = "Unknown process", body = ProblemDetails), + (status = 409, description = "Process is still running", body = ProblemDetails), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn delete_v1_process( + State(state): State>, + Path(id): Path, +) -> Result { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + state.process_runtime().delete_process(&id).await?; + Ok(StatusCode::NO_CONTENT) +} + +#[utoipa::path( + get, + path = "/v1/processes/{id}/logs", + tag = "v1", + params( + ("id" = String, Path, description = "Process ID"), + ("stream" = Option, Query, description = "stdout|stderr|combined|pty"), + ("tail" = Option, Query, description = "Tail N entries"), + ("follow" = Option, Query, description = "Follow via SSE"), + ("since" = Option, Query, description = "Only entries with sequence greater than this") + ), + responses( + (status = 200, description = "Process logs", body = ProcessLogsResponse), + (status = 404, description = "Unknown process", body = ProblemDetails), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn get_v1_process_logs( + State(state): State>, + Path(id): Path, + headers: HeaderMap, + Query(query): Query, +) -> Result { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + let runtime = state.process_runtime(); + let default_stream = if runtime.is_tty(&id).await? { + ProcessLogsStream::Pty + } else { + ProcessLogsStream::Combined + }; + let requested_stream = query.stream.unwrap_or(default_stream); + let since = match (query.since, parse_last_event_id(&headers)?) { + (Some(query_since), Some(last_event_id)) => Some(query_since.max(last_event_id)), + (Some(query_since), None) => Some(query_since), + (None, Some(last_event_id)) => Some(last_event_id), + (None, None) => None, + }; + let filter = ProcessLogFilter { + stream: into_runtime_log_stream(requested_stream), + tail: query.tail, + since, + }; + + let entries = runtime.logs(&id, filter).await?; + let response_entries: Vec = + entries.iter().cloned().map(map_process_log_line).collect(); + + if query.follow.unwrap_or(false) { + let rx = runtime.subscribe_logs(&id).await?; + let replay_stream = stream::iter(response_entries.into_iter().map(|entry| { + Ok::( + axum::response::sse::Event::default() + .event("log") + .id(entry.sequence.to_string()) + .data(serde_json::to_string(&entry).unwrap_or_else(|_| "{}".to_string())), + ) + })); + + let requested_stream_copy = requested_stream; + let follow_stream = BroadcastStream::new(rx).filter_map(move |item| { + let requested_stream_copy = requested_stream_copy; + async move { + match item { + Ok(line) => { + let entry = map_process_log_line(line); + if process_log_matches(&entry, requested_stream_copy) { + Some(Ok(axum::response::sse::Event::default() + .event("log") + .id(entry.sequence.to_string()) + .data( + serde_json::to_string(&entry) + .unwrap_or_else(|_| "{}".to_string()), + ))) + } else { + None + } + } + Err(_) => None, + } + } + }); + + let stream = replay_stream.chain(follow_stream); + let response = + Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15))); + return Ok(response.into_response()); + } + + Ok(Json(ProcessLogsResponse { + process_id: id, + stream: requested_stream, + entries: response_entries, + }) + .into_response()) +} + +#[utoipa::path( + post, + path = "/v1/processes/{id}/input", + tag = "v1", + params( + ("id" = String, Path, description = "Process ID") + ), + request_body = ProcessInputRequest, + responses( + (status = 200, description = "Input accepted", body = ProcessInputResponse), + (status = 400, description = "Invalid request", body = ProblemDetails), + (status = 413, description = "Input exceeds configured limit", body = ProblemDetails), + (status = 409, description = "Process not writable", body = ProblemDetails), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn post_v1_process_input( + State(state): State>, + Path(id): Path, + Json(body): Json, +) -> Result, ApiError> { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + let encoding = body.encoding.unwrap_or_else(|| "base64".to_string()); + let input = decode_input_bytes(&body.data, &encoding)?; + let runtime = state.process_runtime(); + let max_input = runtime.max_input_bytes().await; + if input.len() > max_input { + return Err(SandboxError::InvalidRequest { + message: format!("input payload exceeds maxInputBytesPerRequest ({max_input})"), + } + .into()); + } + + let bytes_written = runtime.write_input(&id, &input).await?; + Ok(Json(ProcessInputResponse { bytes_written })) +} + +#[utoipa::path( + post, + path = "/v1/processes/{id}/terminal/resize", + tag = "v1", + params( + ("id" = String, Path, description = "Process ID") + ), + request_body = ProcessTerminalResizeRequest, + responses( + (status = 200, description = "Resize accepted", body = ProcessTerminalResizeResponse), + (status = 400, description = "Invalid request", body = ProblemDetails), + (status = 404, description = "Unknown process", body = ProblemDetails), + (status = 409, description = "Not a terminal process", body = ProblemDetails), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn post_v1_process_terminal_resize( + State(state): State>, + Path(id): Path, + Json(body): Json, +) -> Result, ApiError> { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + state + .process_runtime() + .resize_terminal(&id, body.cols, body.rows) + .await?; + Ok(Json(ProcessTerminalResizeResponse { + cols: body.cols, + rows: body.rows, + })) +} + +#[utoipa::path( + get, + path = "/v1/processes/{id}/terminal/ws", + tag = "v1", + params( + ("id" = String, Path, description = "Process ID"), + ("access_token" = Option, Query, description = "Bearer token alternative for WS auth") + ), + responses( + (status = 101, description = "WebSocket upgraded"), + (status = 400, description = "Invalid websocket frame or upgrade request", body = ProblemDetails), + (status = 404, description = "Unknown process", body = ProblemDetails), + (status = 409, description = "Not a terminal process", body = ProblemDetails), + (status = 501, description = "Process API unsupported on this platform", body = ProblemDetails) + ) +)] +async fn get_v1_process_terminal_ws( + State(state): State>, + Path(id): Path, + Query(_query): Query, + ws: WebSocketUpgrade, +) -> Result { + if !process_api_supported() { + return Err(process_api_not_supported().into()); + } + + let runtime = state.process_runtime(); + if !runtime.is_tty(&id).await? { + return Err(SandboxError::Conflict { + message: "process is not running in tty mode".to_string(), + } + .into()); + } + + Ok(ws + .on_upgrade(move |socket| process_terminal_ws_session(socket, runtime, id)) + .into_response()) +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +enum TerminalClientFrame { + Input { + data: String, + #[serde(default)] + encoding: Option, + }, + Resize { + cols: u16, + rows: u16, + }, + Close, +} + +async fn process_terminal_ws_session( + mut socket: WebSocket, + runtime: Arc, + id: String, +) { + let _ = send_ws_json( + &mut socket, + json!({ + "type": "ready", + "processId": &id, + }), + ) + .await; + + let mut log_rx = match runtime.subscribe_logs(&id).await { + Ok(rx) => rx, + Err(err) => { + let _ = send_ws_error(&mut socket, &err.to_string()).await; + let _ = socket.close().await; + return; + } + }; + let mut exit_poll = tokio::time::interval(Duration::from_millis(150)); + + loop { + tokio::select! { + ws_in = socket.recv() => { + match ws_in { + Some(Ok(Message::Binary(_))) => { + let _ = send_ws_error(&mut socket, "binary input is not supported; use text JSON frames").await; + } + Some(Ok(Message::Text(text))) => { + let parsed = serde_json::from_str::(&text); + match parsed { + Ok(TerminalClientFrame::Input { data, encoding }) => { + let input = match decode_input_bytes(&data, encoding.as_deref().unwrap_or("utf8")) { + Ok(input) => input, + Err(err) => { + let _ = send_ws_error(&mut socket, &err.to_string()).await; + continue; + } + }; + if let Err(err) = runtime.write_input(&id, &input).await { + let _ = send_ws_error(&mut socket, &err.to_string()).await; + } + } + Ok(TerminalClientFrame::Resize { cols, rows }) => { + if let Err(err) = runtime.resize_terminal(&id, cols, rows).await { + let _ = send_ws_error(&mut socket, &err.to_string()).await; + } + } + Ok(TerminalClientFrame::Close) => { + let _ = socket.close().await; + break; + } + Err(err) => { + let _ = send_ws_error(&mut socket, &format!("invalid terminal frame: {err}")).await; + } + } + } + Some(Ok(Message::Ping(payload))) => { + let _ = socket.send(Message::Pong(payload)).await; + } + Some(Ok(Message::Close(_))) | None => break, + Some(Ok(Message::Pong(_))) => {} + Some(Err(_)) => break, + } + } + log_in = log_rx.recv() => { + match log_in { + Ok(line) => { + if line.stream != ProcessStream::Pty { + continue; + } + let bytes = { + use base64::engine::general_purpose::STANDARD as BASE64_ENGINE; + use base64::Engine; + BASE64_ENGINE.decode(&line.data).unwrap_or_default() + }; + if socket.send(Message::Binary(bytes)).await.is_err() { + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + _ = exit_poll.tick() => { + if let Ok(snapshot) = runtime.snapshot(&id).await { + if snapshot.status == ProcessStatus::Exited { + let _ = send_ws_json( + &mut socket, + json!({ + "type": "exit", + "exitCode": snapshot.exit_code, + }), + ) + .await; + let _ = socket.close().await; + break; + } + } + } + } + } +} + +async fn send_ws_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> { + socket + .send(Message::Text( + serde_json::to_string(&payload).map_err(|_| ())?, + )) + .await + .map_err(|_| ()) +} + +async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> { + send_ws_json( + socket, + json!({ + "type": "error", + "message": message, + }), + ) + .await +} + #[utoipa::path( get, path = "/v1/config/mcp", @@ -1386,6 +2073,96 @@ async fn delete_v1_acp( Ok(StatusCode::NO_CONTENT) } +fn process_api_supported() -> bool { + !cfg!(windows) +} + +fn process_api_not_supported() -> ProblemDetails { + ProblemDetails { + type_: ErrorType::InvalidRequest.as_urn().to_string(), + title: "Not Implemented".to_string(), + status: 501, + detail: Some("process API is not implemented on Windows".to_string()), + instance: None, + extensions: serde_json::Map::new(), + } +} + +fn map_process_config(config: ProcessRuntimeConfig) -> ProcessConfig { + ProcessConfig { + max_concurrent_processes: config.max_concurrent_processes, + default_run_timeout_ms: config.default_run_timeout_ms, + max_run_timeout_ms: config.max_run_timeout_ms, + max_output_bytes: config.max_output_bytes, + max_log_bytes_per_process: config.max_log_bytes_per_process, + max_input_bytes_per_request: config.max_input_bytes_per_request, + } +} + +fn into_runtime_process_config(config: ProcessConfig) -> ProcessRuntimeConfig { + ProcessRuntimeConfig { + max_concurrent_processes: config.max_concurrent_processes, + default_run_timeout_ms: config.default_run_timeout_ms, + max_run_timeout_ms: config.max_run_timeout_ms, + max_output_bytes: config.max_output_bytes, + max_log_bytes_per_process: config.max_log_bytes_per_process, + max_input_bytes_per_request: config.max_input_bytes_per_request, + } +} + +fn map_process_snapshot(snapshot: ProcessSnapshot) -> ProcessInfo { + ProcessInfo { + id: snapshot.id, + command: snapshot.command, + args: snapshot.args, + cwd: snapshot.cwd, + tty: snapshot.tty, + interactive: snapshot.interactive, + status: match snapshot.status { + ProcessStatus::Running => ProcessState::Running, + ProcessStatus::Exited => ProcessState::Exited, + }, + pid: snapshot.pid, + exit_code: snapshot.exit_code, + created_at_ms: snapshot.created_at_ms, + exited_at_ms: snapshot.exited_at_ms, + } +} + +fn into_runtime_log_stream(stream: ProcessLogsStream) -> ProcessLogFilterStream { + match stream { + ProcessLogsStream::Stdout => ProcessLogFilterStream::Stdout, + ProcessLogsStream::Stderr => ProcessLogFilterStream::Stderr, + ProcessLogsStream::Combined => ProcessLogFilterStream::Combined, + ProcessLogsStream::Pty => ProcessLogFilterStream::Pty, + } +} + +fn map_process_log_line(line: crate::process_runtime::ProcessLogLine) -> ProcessLogEntry { + ProcessLogEntry { + sequence: line.sequence, + stream: match line.stream { + ProcessStream::Stdout => ProcessLogsStream::Stdout, + ProcessStream::Stderr => ProcessLogsStream::Stderr, + ProcessStream::Pty => ProcessLogsStream::Pty, + }, + timestamp_ms: line.timestamp_ms, + data: line.data, + encoding: line.encoding.to_string(), + } +} + +fn process_log_matches(entry: &ProcessLogEntry, stream: ProcessLogsStream) -> bool { + match stream { + ProcessLogsStream::Stdout => entry.stream == ProcessLogsStream::Stdout, + ProcessLogsStream::Stderr => entry.stream == ProcessLogsStream::Stderr, + ProcessLogsStream::Combined => { + entry.stream == ProcessLogsStream::Stdout || entry.stream == ProcessLogsStream::Stderr + } + ProcessLogsStream::Pty => entry.stream == ProcessLogsStream::Pty, + } +} + fn validate_named_query(value: &str, field_name: &str) -> Result<(), SandboxError> { if value.trim().is_empty() { return Err(SandboxError::InvalidRequest { diff --git a/server/packages/sandbox-agent/src/router/support.rs b/server/packages/sandbox-agent/src/router/support.rs index 173017d..9020e15 100644 --- a/server/packages/sandbox-agent/src/router/support.rs +++ b/server/packages/sandbox-agent/src/router/support.rs @@ -33,7 +33,17 @@ pub(super) async fn require_token( .and_then(|value| value.to_str().ok()) .and_then(|value| value.strip_prefix("Bearer ")); - if bearer == Some(expected.as_str()) { + let allow_query_token = request.uri().path().ends_with("/terminal/ws"); + let query_token = if allow_query_token { + request + .uri() + .query() + .and_then(|query| query_param(query, "access_token")) + } else { + None + }; + + if bearer == Some(expected.as_str()) || query_token.as_deref() == Some(expected.as_str()) { return Ok(next.run(request).await); } @@ -42,6 +52,13 @@ pub(super) async fn require_token( })) } +fn query_param(query: &str, key: &str) -> Option { + query + .split('&') + .filter_map(|part| part.split_once('=')) + .find_map(|(k, v)| if k == key { Some(v.to_string()) } else { None }) +} + pub(super) type PinBoxSseStream = crate::acp_proxy_runtime::PinBoxSseStream; pub(super) fn credentials_available_for( @@ -497,8 +514,16 @@ pub(super) fn problem_from_sandbox_error(error: &SandboxError) -> ProblemDetails let mut problem = error.to_problem_details(); match error { - SandboxError::InvalidRequest { .. } => { - problem.status = 400; + SandboxError::InvalidRequest { message } => { + if message.starts_with("process not found:") { + problem.status = 404; + problem.title = "Not Found".to_string(); + } else if message.starts_with("input payload exceeds maxInputBytesPerRequest") { + problem.status = 413; + problem.title = "Payload Too Large".to_string(); + } else { + problem.status = 400; + } } SandboxError::Timeout { .. } => { problem.status = 504; diff --git a/server/packages/sandbox-agent/src/router/types.rs b/server/packages/sandbox-agent/src/router/types.rs index 481850b..6d40e2a 100644 --- a/server/packages/sandbox-agent/src/router/types.rs +++ b/server/packages/sandbox-agent/src/router/types.rs @@ -362,3 +362,173 @@ pub struct AcpEnvelope { #[serde(default)] pub error: Option, } + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessConfig { + pub max_concurrent_processes: usize, + pub default_run_timeout_ms: u64, + pub max_run_timeout_ms: u64, + pub max_output_bytes: usize, + pub max_log_bytes_per_process: usize, + pub max_input_bytes_per_request: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessCreateRequest { + pub command: String, + #[serde(default)] + pub args: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub cwd: Option, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub env: BTreeMap, + #[serde(default)] + pub tty: bool, + #[serde(default)] + pub interactive: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessRunRequest { + pub command: String, + #[serde(default)] + pub args: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub cwd: Option, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub env: BTreeMap, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub timeout_ms: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub max_output_bytes: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessRunResponse { + pub exit_code: Option, + pub timed_out: bool, + pub stdout: String, + pub stderr: String, + pub stdout_truncated: bool, + pub stderr_truncated: bool, + pub duration_ms: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum ProcessState { + Running, + Exited, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessInfo { + pub id: String, + pub command: String, + pub args: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub cwd: Option, + pub tty: bool, + pub interactive: bool, + pub status: ProcessState, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pid: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub exit_code: Option, + pub created_at_ms: i64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub exited_at_ms: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessListResponse { + pub processes: Vec, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum ProcessLogsStream { + Stdout, + Stderr, + Combined, + Pty, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessLogsQuery { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub stream: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub tail: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub follow: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub since: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessLogEntry { + pub sequence: u64, + pub stream: ProcessLogsStream, + pub timestamp_ms: i64, + pub data: String, + pub encoding: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessLogsResponse { + pub process_id: String, + pub stream: ProcessLogsStream, + pub entries: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessInputRequest { + pub data: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub encoding: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessInputResponse { + pub bytes_written: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessSignalQuery { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub wait_ms: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessTerminalResizeRequest { + pub cols: u16, + pub rows: u16, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessTerminalResizeResponse { + pub cols: u16, + pub rows: u16, +} + +#[derive(Debug, Clone, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessWsQuery { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub access_token: Option, +} diff --git a/server/packages/sandbox-agent/tests/v1_api.rs b/server/packages/sandbox-agent/tests/v1_api.rs index 89efde0..3dbd5e7 100644 --- a/server/packages/sandbox-agent/tests/v1_api.rs +++ b/server/packages/sandbox-agent/tests/v1_api.rs @@ -1,6 +1,6 @@ use std::fs; use std::io::{Read, Write}; -use std::net::{TcpListener, TcpStream}; +use std::net::{SocketAddr, TcpListener, TcpStream}; use std::path::Path; use std::time::Duration; @@ -14,6 +14,8 @@ use sandbox_agent_agent_management::agents::AgentManager; use serde_json::{json, Value}; use serial_test::serial; use tempfile::TempDir; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; use tower::util::ServiceExt; struct TestApp { @@ -48,6 +50,56 @@ struct EnvVarGuard { previous: Option, } +struct LiveServer { + address: SocketAddr, + shutdown_tx: Option>, + task: JoinHandle<()>, +} + +impl LiveServer { + async fn spawn(app: Router) -> Self { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("bind live server"); + let address = listener.local_addr().expect("live server address"); + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + + let task = tokio::spawn(async move { + let server = axum::serve(listener, app.into_make_service()) + .with_graceful_shutdown(async { + let _ = shutdown_rx.await; + }); + + let _ = server.await; + }); + + Self { + address, + shutdown_tx: Some(shutdown_tx), + task, + } + } + + fn http_url(&self, path: &str) -> String { + format!("http://{}{}", self.address, path) + } + + fn ws_url(&self, path: &str) -> String { + format!("ws://{}{}", self.address, path) + } + + async fn shutdown(mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + + let _ = tokio::time::timeout(Duration::from_secs(3), async { + let _ = self.task.await; + }) + .await; + } +} + impl EnvVarGuard { fn set(key: &'static str, value: &str) -> Self { let previous = std::env::var_os(key); @@ -291,3 +343,5 @@ mod acp_transport; mod config_endpoints; #[path = "v1_api/control_plane.rs"] mod control_plane; +#[path = "v1_api/processes.rs"] +mod processes; diff --git a/server/packages/sandbox-agent/tests/v1_api/processes.rs b/server/packages/sandbox-agent/tests/v1_api/processes.rs new file mode 100644 index 0000000..aaf072d --- /dev/null +++ b/server/packages/sandbox-agent/tests/v1_api/processes.rs @@ -0,0 +1,661 @@ +use super::*; +use base64::engine::general_purpose::STANDARD as BASE64; +use base64::Engine; +use futures::{SinkExt, StreamExt}; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +async fn wait_for_exited(test_app: &TestApp, process_id: &str) { + for _ in 0..30 { + let (status, _, body) = send_request( + &test_app.app, + Method::GET, + &format!("/v1/processes/{process_id}"), + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + let parsed = parse_json(&body); + if parsed["status"] == "exited" { + return; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + + panic!("process did not exit in time"); +} + +fn decode_log_entries(entries: &[Value]) -> String { + entries + .iter() + .filter_map(|entry| entry.get("data").and_then(Value::as_str)) + .filter_map(|encoded| BASE64.decode(encoded).ok()) + .map(|bytes| String::from_utf8_lossy(&bytes).to_string()) + .collect::>() + .join("") +} + +async fn recv_ws_message( + ws: &mut tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, +) -> Message { + tokio::time::timeout(Duration::from_secs(3), ws.next()) + .await + .expect("timed out waiting for websocket frame") + .expect("websocket stream ended") + .expect("websocket frame") +} + +#[tokio::test] +async fn v1_processes_config_round_trip() { + let test_app = TestApp::new(AuthConfig::disabled()); + + let (status, _, body) = send_request( + &test_app.app, + Method::GET, + "/v1/processes/config", + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + assert_eq!(parse_json(&body)["maxConcurrentProcesses"], 64); + + let (status, _, body) = send_request( + &test_app.app, + Method::POST, + "/v1/processes/config", + Some(json!({ + "maxConcurrentProcesses": 8, + "defaultRunTimeoutMs": 1000, + "maxRunTimeoutMs": 5000, + "maxOutputBytes": 4096, + "maxLogBytesPerProcess": 32768, + "maxInputBytesPerRequest": 1024 + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + let parsed = parse_json(&body); + assert_eq!(parsed["maxConcurrentProcesses"], 8); + assert_eq!(parsed["defaultRunTimeoutMs"], 1000); +} + +#[tokio::test] +async fn v1_process_lifecycle_requires_stop_before_delete() { + let test_app = TestApp::new(AuthConfig::disabled()); + + let (status, _, body) = send_request( + &test_app.app, + Method::POST, + "/v1/processes", + Some(json!({ + "command": "sh", + "args": ["-lc", "sleep 30"], + "tty": false, + "interactive": false + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + let process_id = parse_json(&body)["id"] + .as_str() + .expect("process id") + .to_string(); + + let (status, _, body) = send_request( + &test_app.app, + Method::DELETE, + &format!("/v1/processes/{process_id}"), + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::CONFLICT); + assert_eq!(parse_json(&body)["status"], 409); + + let (status, _, _body) = send_request( + &test_app.app, + Method::POST, + &format!("/v1/processes/{process_id}/stop"), + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + + wait_for_exited(&test_app, &process_id).await; + + let (status, _, _) = send_request( + &test_app.app, + Method::DELETE, + &format!("/v1/processes/{process_id}"), + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::NO_CONTENT); +} + +#[tokio::test] +async fn v1_process_run_returns_output_and_timeout() { + let test_app = TestApp::new(AuthConfig::disabled()); + + let (status, _, body) = send_request( + &test_app.app, + Method::POST, + "/v1/processes/run", + Some(json!({ + "command": "sh", + "args": ["-lc", "echo hi"], + "timeoutMs": 1000 + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + let parsed = parse_json(&body); + assert_eq!(parsed["timedOut"], false); + assert_eq!(parsed["exitCode"], 0); + assert!(parsed["stdout"].as_str().unwrap_or_default().contains("hi")); + + let (status, _, body) = send_request( + &test_app.app, + Method::POST, + "/v1/processes/run", + Some(json!({ + "command": "sh", + "args": ["-lc", "sleep 2"], + "timeoutMs": 50 + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + assert_eq!(parse_json(&body)["timedOut"], true); +} + +#[tokio::test] +async fn v1_process_run_reports_truncation() { + let test_app = TestApp::new(AuthConfig::disabled()); + + let (status, _, body) = send_request( + &test_app.app, + Method::POST, + "/v1/processes/run", + Some(json!({ + "command": "sh", + "args": ["-lc", "printf 'abcdefghijklmnopqrstuvwxyz'"], + "maxOutputBytes": 5 + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + let parsed = parse_json(&body); + assert_eq!(parsed["stdoutTruncated"], true); + assert_eq!(parsed["stderrTruncated"], false); + assert_eq!(parsed["stdout"].as_str().unwrap_or_default().len(), 5); +} + +#[tokio::test] +async fn v1_process_tty_input_and_logs() { + let test_app = TestApp::new(AuthConfig::disabled()); + + let (status, _, body) = send_request( + &test_app.app, + Method::POST, + "/v1/processes", + Some(json!({ + "command": "cat", + "tty": true, + "interactive": true + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + let process_id = parse_json(&body)["id"] + .as_str() + .expect("process id") + .to_string(); + + let (status, _, _body) = send_request( + &test_app.app, + Method::POST, + &format!("/v1/processes/{process_id}/input"), + Some(json!({ + "data": "aGVsbG8K", + "encoding": "base64" + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + + tokio::time::sleep(Duration::from_millis(150)).await; + + let (status, _, body) = send_request( + &test_app.app, + Method::GET, + &format!("/v1/processes/{process_id}/logs?stream=pty&tail=20"), + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + let entries = parse_json(&body)["entries"] + .as_array() + .cloned() + .unwrap_or_default(); + assert!(!entries.is_empty()); + + let (status, _, _body) = send_request( + &test_app.app, + Method::POST, + &format!("/v1/processes/{process_id}/kill"), + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + + wait_for_exited(&test_app, &process_id).await; + + let (status, _, _) = send_request( + &test_app.app, + Method::DELETE, + &format!("/v1/processes/{process_id}"), + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::NO_CONTENT); +} + +#[tokio::test] +async fn v1_process_not_found_returns_404() { + let test_app = TestApp::new(AuthConfig::disabled()); + + let (status, _, body) = send_request( + &test_app.app, + Method::GET, + "/v1/processes/does-not-exist", + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::NOT_FOUND); + assert_eq!(parse_json(&body)["status"], 404); +} + +#[tokio::test] +async fn v1_process_input_limit_returns_413() { + let test_app = TestApp::new(AuthConfig::disabled()); + + let (status, _, _) = send_request( + &test_app.app, + Method::POST, + "/v1/processes/config", + Some(json!({ + "maxConcurrentProcesses": 8, + "defaultRunTimeoutMs": 1000, + "maxRunTimeoutMs": 5000, + "maxOutputBytes": 4096, + "maxLogBytesPerProcess": 32768, + "maxInputBytesPerRequest": 4 + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + + let (status, _, body) = send_request( + &test_app.app, + Method::POST, + "/v1/processes", + Some(json!({ + "command": "cat", + "tty": true, + "interactive": true + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + let process_id = parse_json(&body)["id"] + .as_str() + .expect("process id") + .to_string(); + + let (status, _, body) = send_request( + &test_app.app, + Method::POST, + &format!("/v1/processes/{process_id}/input"), + Some(json!({ + "data": "aGVsbG8=", + "encoding": "base64" + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::PAYLOAD_TOO_LARGE); + assert_eq!(parse_json(&body)["status"], 413); +} + +#[tokio::test] +async fn v1_tty_process_is_real_terminal() { + let test_app = TestApp::new(AuthConfig::disabled()); + + let (status, _, body) = send_request( + &test_app.app, + Method::POST, + "/v1/processes", + Some(json!({ + "command": "sh", + "args": ["-lc", "tty"], + "tty": true, + "interactive": false + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + let process_id = parse_json(&body)["id"] + .as_str() + .expect("process id") + .to_string(); + + wait_for_exited(&test_app, &process_id).await; + + let (status, _, body) = send_request( + &test_app.app, + Method::GET, + &format!("/v1/processes/{process_id}/logs?stream=pty"), + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + let entries = parse_json(&body)["entries"] + .as_array() + .cloned() + .unwrap_or_default(); + let joined = decode_log_entries(&entries); + assert!(!joined.to_lowercase().contains("not a tty")); + assert!(joined.contains("/dev/")); +} + +#[tokio::test] +async fn v1_process_logs_follow_sse_streams_entries() { + let test_app = TestApp::new(AuthConfig::disabled()); + + let (status, _, body) = send_request( + &test_app.app, + Method::POST, + "/v1/processes", + Some(json!({ + "command": "sh", + "args": ["-lc", "echo first; sleep 0.3; echo second"], + "tty": false, + "interactive": false + })), + &[], + ) + .await; + assert_eq!(status, StatusCode::OK); + let process_id = parse_json(&body)["id"] + .as_str() + .expect("process id") + .to_string(); + + let request = Request::builder() + .method(Method::GET) + .uri(format!( + "/v1/processes/{process_id}/logs?stream=stdout&follow=true" + )) + .body(Body::empty()) + .expect("build request"); + let response = test_app + .app + .clone() + .oneshot(request) + .await + .expect("sse response"); + assert_eq!(response.status(), StatusCode::OK); + + let mut stream = response.into_body().into_data_stream(); + let chunk = tokio::time::timeout(Duration::from_secs(5), async move { + while let Some(chunk) = stream.next().await { + let bytes = chunk.expect("stream chunk"); + let text = String::from_utf8_lossy(&bytes).to_string(); + if text.contains("data:") { + return text; + } + } + panic!("SSE stream ended before log chunk"); + }) + .await + .expect("timed out reading process log sse"); + + let payload = parse_sse_data(&chunk); + assert!(payload["sequence"].as_u64().is_some()); + assert_eq!(payload["stream"], "stdout"); +} + +#[tokio::test] +async fn v1_access_token_query_only_allows_terminal_ws() { + let test_app = TestApp::new(AuthConfig::with_token("secret-token".to_string())); + + let (status, _, _) = send_request( + &test_app.app, + Method::GET, + "/v1/health?access_token=secret-token", + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::UNAUTHORIZED); + + let (status, _, body) = send_request( + &test_app.app, + Method::POST, + "/v1/processes", + Some(json!({ + "command": "cat", + "tty": true, + "interactive": true + })), + &[("authorization", "Bearer secret-token")], + ) + .await; + assert_eq!(status, StatusCode::OK); + let process_id = parse_json(&body)["id"] + .as_str() + .expect("process id") + .to_string(); + + let (status, _, _) = send_request( + &test_app.app, + Method::GET, + &format!("/v1/processes/{process_id}/terminal/ws"), + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::UNAUTHORIZED); + + let (status, _, _) = send_request( + &test_app.app, + Method::GET, + &format!("/v1/processes/{process_id}/terminal/ws?access_token=secret-token"), + None, + &[], + ) + .await; + assert_eq!(status, StatusCode::BAD_REQUEST); +} + +#[tokio::test] +async fn v1_process_terminal_ws_e2e_is_deterministic() { + let test_app = TestApp::new(AuthConfig::disabled()); + let live_server = LiveServer::spawn(test_app.app.clone()).await; + let http = reqwest::Client::new(); + + let create_response = http + .post(live_server.http_url("/v1/processes")) + .json(&json!({ + "command": "sh", + "args": ["-lc", "stty -echo; IFS= read -r line; printf 'got:%s\\n' \"$line\""], + "tty": true, + "interactive": true + })) + .send() + .await + .expect("create process response"); + assert_eq!(create_response.status(), reqwest::StatusCode::OK); + let create_body: Value = create_response.json().await.expect("create process json"); + let process_id = create_body["id"] + .as_str() + .expect("process id") + .to_string(); + + let ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws")); + let (mut ws, _) = connect_async(&ws_url) + .await + .expect("connect websocket"); + + let ready = recv_ws_message(&mut ws).await; + let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame")) + .expect("ready json"); + assert_eq!(ready_payload["type"], "ready"); + assert_eq!(ready_payload["processId"], process_id); + + ws.send(Message::Text( + json!({ + "type": "input", + "data": "hello from ws\n" + }) + .to_string(), + )) + .await + .expect("send input frame"); + + let mut saw_binary_output = false; + let mut saw_exit = false; + for _ in 0..10 { + let frame = recv_ws_message(&mut ws).await; + match frame { + Message::Binary(bytes) => { + let text = String::from_utf8_lossy(&bytes); + if text.contains("got:hello from ws") { + saw_binary_output = true; + } + } + Message::Text(text) => { + let payload: Value = serde_json::from_str(&text).expect("ws json"); + if payload["type"] == "exit" { + saw_exit = true; + break; + } + assert_ne!(payload["type"], "error"); + } + Message::Close(_) => break, + Message::Ping(_) | Message::Pong(_) => {} + _ => {} + } + } + + assert!(saw_binary_output, "expected pty binary output over websocket"); + assert!(saw_exit, "expected exit control frame over websocket"); + + let _ = ws.close(None).await; + + let delete_response = http + .delete(live_server.http_url(&format!("/v1/processes/{process_id}"))) + .send() + .await + .expect("delete process response"); + assert_eq!(delete_response.status(), reqwest::StatusCode::NO_CONTENT); + + live_server.shutdown().await; +} + +#[tokio::test] +async fn v1_process_terminal_ws_auth_e2e() { + let token = "secret-token"; + let test_app = TestApp::new(AuthConfig::with_token(token.to_string())); + let live_server = LiveServer::spawn(test_app.app.clone()).await; + let http = reqwest::Client::new(); + + let create_response = http + .post(live_server.http_url("/v1/processes")) + .bearer_auth(token) + .json(&json!({ + "command": "cat", + "tty": true, + "interactive": true + })) + .send() + .await + .expect("create process response"); + assert_eq!(create_response.status(), reqwest::StatusCode::OK); + let create_body: Value = create_response.json().await.expect("create process json"); + let process_id = create_body["id"] + .as_str() + .expect("process id") + .to_string(); + + let unauth_ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws")); + let unauth_err = connect_async(&unauth_ws_url) + .await + .expect_err("unauthenticated websocket handshake should fail"); + match unauth_err { + tokio_tungstenite::tungstenite::Error::Http(response) => { + assert_eq!(response.status().as_u16(), 401); + } + other => panic!("unexpected websocket auth error: {other:?}"), + } + + let auth_ws_url = live_server.ws_url(&format!( + "/v1/processes/{process_id}/terminal/ws?access_token={token}" + )); + let (mut ws, _) = connect_async(&auth_ws_url) + .await + .expect("authenticated websocket handshake"); + + let ready = recv_ws_message(&mut ws).await; + let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame")) + .expect("ready json"); + assert_eq!(ready_payload["type"], "ready"); + assert_eq!(ready_payload["processId"], process_id); + + let _ = ws + .send(Message::Text(json!({ "type": "close" }).to_string())) + .await; + let _ = ws.close(None).await; + + let kill_response = http + .post(live_server.http_url(&format!( + "/v1/processes/{process_id}/kill?waitMs=1000" + ))) + .bearer_auth(token) + .send() + .await + .expect("kill process response"); + assert_eq!(kill_response.status(), reqwest::StatusCode::OK); + + let delete_response = http + .delete(live_server.http_url(&format!("/v1/processes/{process_id}"))) + .bearer_auth(token) + .send() + .await + .expect("delete process response"); + assert_eq!(delete_response.status(), reqwest::StatusCode::NO_CONTENT); + + live_server.shutdown().await; +}