mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-15 08:03:46 +00:00
feat: add process management API (#203)
* feat: add process management API Introduces a complete Process Management API for Sandbox Agent with process lifecycle management (start, stop, kill, delete), one-shot command execution, log streaming via SSE and WebSocket, stdin input, and PTY/terminal support. Includes new process_runtime module for managing process state, HTTP route handlers, OpenAPI documentation, and integration tests. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com> * fix: address review issues in process management API - Add doc comments to all 13 new #[utoipa::path] handlers (CLAUDE.md compliance) - Fix send_signal ESRCH check: use raw_os_error() == Some(libc::ESRCH) instead of ErrorKind::NotFound - Add max_input_bytes_per_request enforcement in WebSocket terminal handler - URL-decode access_token query parameter for WebSocket auth - Replace fragile string prefix matching with proper SandboxError::NotFound variant Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com> * feat: add TypeScript SDK support for process management Add process CRUD operations (create, get, list, update, delete) and event streaming to the TypeScript SDK. Includes integration tests, mock agent updates, and test environment fixes for cross-platform home directory handling. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: provide WebSocket impl for process terminal test on Node 20 Node 20 lacks globalThis.WebSocket. Add ws as a devDependency and pass it to connectProcessTerminalWebSocket in the integration test so CI no longer fails. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
fba06d3304
commit
4335ef6af6
23 changed files with 5571 additions and 181 deletions
|
|
@ -1,17 +1,17 @@
|
|||
# Server Instructions
|
||||
|
||||
## ACP v2 Architecture
|
||||
## Architecture
|
||||
|
||||
- Public API routes are defined in `server/packages/sandbox-agent/src/router.rs`.
|
||||
- ACP runtime/process bridge is in `server/packages/sandbox-agent/src/acp_runtime.rs`.
|
||||
- `/v2` is the only active API surface for sessions/prompts (`/v2/rpc`).
|
||||
- ACP proxy runtime is in `server/packages/sandbox-agent/src/acp_proxy_runtime.rs`.
|
||||
- All API endpoints are under `/v1`.
|
||||
- Keep binary filesystem transfer endpoints as dedicated HTTP APIs:
|
||||
- `GET /v2/fs/file`
|
||||
- `PUT /v2/fs/file`
|
||||
- `POST /v2/fs/upload-batch`
|
||||
- `GET /v1/fs/file`
|
||||
- `PUT /v1/fs/file`
|
||||
- `POST /v1/fs/upload-batch`
|
||||
- Rationale: host-owned cross-agent-consistent behavior and large binary transfer needs that ACP JSON-RPC is not suited to stream efficiently.
|
||||
- Maintain ACP variants in parallel only when they share the same underlying filesystem implementation; SDK defaults should still prefer HTTP for large/binary transfers.
|
||||
- `/v1/*` must remain hard-removed (`410`) and `/opencode/*` stays disabled (`503`) until Phase 7.
|
||||
- `/opencode/*` stays disabled (`503`) until Phase 7.
|
||||
- Agent install logic (native + ACP agent process + lazy install) is handled by `server/packages/agent-management/`.
|
||||
|
||||
## API Contract Rules
|
||||
|
|
@ -23,14 +23,14 @@
|
|||
|
||||
## Tests
|
||||
|
||||
Primary v2 integration coverage:
|
||||
- `server/packages/sandbox-agent/tests/v2_api.rs`
|
||||
- `server/packages/sandbox-agent/tests/v2_agent_process_matrix.rs`
|
||||
Primary v1 integration coverage:
|
||||
- `server/packages/sandbox-agent/tests/v1_api.rs`
|
||||
- `server/packages/sandbox-agent/tests/v1_agent_process_matrix.rs`
|
||||
|
||||
Run:
|
||||
```bash
|
||||
cargo test -p sandbox-agent --test v2_api
|
||||
cargo test -p sandbox-agent --test v2_agent_process_matrix
|
||||
cargo test -p sandbox-agent --test v1_api
|
||||
cargo test -p sandbox-agent --test v1_agent_process_matrix
|
||||
```
|
||||
|
||||
## Migration Docs Sync
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ pub enum ErrorType {
|
|||
PermissionDenied,
|
||||
NotAcceptable,
|
||||
UnsupportedMediaType,
|
||||
NotFound,
|
||||
SessionNotFound,
|
||||
SessionAlreadyExists,
|
||||
ModeNotSupported,
|
||||
|
|
@ -37,6 +38,7 @@ impl ErrorType {
|
|||
Self::PermissionDenied => "urn:sandbox-agent:error:permission_denied",
|
||||
Self::NotAcceptable => "urn:sandbox-agent:error:not_acceptable",
|
||||
Self::UnsupportedMediaType => "urn:sandbox-agent:error:unsupported_media_type",
|
||||
Self::NotFound => "urn:sandbox-agent:error:not_found",
|
||||
Self::SessionNotFound => "urn:sandbox-agent:error:session_not_found",
|
||||
Self::SessionAlreadyExists => "urn:sandbox-agent:error:session_already_exists",
|
||||
Self::ModeNotSupported => "urn:sandbox-agent:error:mode_not_supported",
|
||||
|
|
@ -57,6 +59,7 @@ impl ErrorType {
|
|||
Self::PermissionDenied => "Permission Denied",
|
||||
Self::NotAcceptable => "Not Acceptable",
|
||||
Self::UnsupportedMediaType => "Unsupported Media Type",
|
||||
Self::NotFound => "Not Found",
|
||||
Self::SessionNotFound => "Session Not Found",
|
||||
Self::SessionAlreadyExists => "Session Already Exists",
|
||||
Self::ModeNotSupported => "Mode Not Supported",
|
||||
|
|
@ -77,6 +80,7 @@ impl ErrorType {
|
|||
Self::PermissionDenied => 403,
|
||||
Self::NotAcceptable => 406,
|
||||
Self::UnsupportedMediaType => 415,
|
||||
Self::NotFound => 404,
|
||||
Self::SessionNotFound => 404,
|
||||
Self::SessionAlreadyExists => 409,
|
||||
Self::ModeNotSupported => 400,
|
||||
|
|
@ -155,6 +159,8 @@ pub enum SandboxError {
|
|||
NotAcceptable { message: String },
|
||||
#[error("unsupported media type: {message}")]
|
||||
UnsupportedMediaType { message: String },
|
||||
#[error("not found: {resource} {id}")]
|
||||
NotFound { resource: String, id: String },
|
||||
#[error("session not found: {session_id}")]
|
||||
SessionNotFound { session_id: String },
|
||||
#[error("session already exists: {session_id}")]
|
||||
|
|
@ -180,6 +186,7 @@ impl SandboxError {
|
|||
Self::PermissionDenied { .. } => ErrorType::PermissionDenied,
|
||||
Self::NotAcceptable { .. } => ErrorType::NotAcceptable,
|
||||
Self::UnsupportedMediaType { .. } => ErrorType::UnsupportedMediaType,
|
||||
Self::NotFound { .. } => ErrorType::NotFound,
|
||||
Self::SessionNotFound { .. } => ErrorType::SessionNotFound,
|
||||
Self::SessionAlreadyExists { .. } => ErrorType::SessionAlreadyExists,
|
||||
Self::ModeNotSupported { .. } => ErrorType::ModeNotSupported,
|
||||
|
|
@ -264,6 +271,12 @@ impl SandboxError {
|
|||
map.insert("message".to_string(), Value::String(message.clone()));
|
||||
(None, None, Some(Value::Object(map)))
|
||||
}
|
||||
Self::NotFound { resource, id } => {
|
||||
let mut map = Map::new();
|
||||
map.insert("resource".to_string(), Value::String(resource.clone()));
|
||||
map.insert("id".to_string(), Value::String(id.clone()));
|
||||
(None, None, Some(Value::Object(map)))
|
||||
}
|
||||
Self::SessionNotFound { session_id } => (None, Some(session_id.clone()), None),
|
||||
Self::SessionAlreadyExists { session_id } => (None, Some(session_id.clone()), None),
|
||||
Self::ModeNotSupported { agent, mode } => {
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ insta.workspace = true
|
|||
tower.workspace = true
|
||||
tempfile.workspace = true
|
||||
serial_test = "3.2"
|
||||
tokio-tungstenite = "0.24"
|
||||
|
||||
[features]
|
||||
test-utils = ["tempfile"]
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
mod acp_proxy_runtime;
|
||||
pub mod cli;
|
||||
pub mod daemon;
|
||||
mod process_runtime;
|
||||
pub mod router;
|
||||
pub mod server_logs;
|
||||
pub mod telemetry;
|
||||
|
|
|
|||
1082
server/packages/sandbox-agent/src/process_runtime.rs
Normal file
1082
server/packages/sandbox-agent/src/process_runtime.rs
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -1,4 +1,5 @@
|
|||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::convert::Infallible;
|
||||
use std::fs;
|
||||
use std::io::Cursor;
|
||||
use std::path::{Path as StdPath, PathBuf};
|
||||
|
|
@ -6,6 +7,7 @@ use std::sync::{Arc, Mutex};
|
|||
use std::time::Duration;
|
||||
|
||||
use axum::body::Bytes;
|
||||
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
|
||||
use axum::extract::{Path, Query, State};
|
||||
use axum::http::{header, HeaderMap, Request, StatusCode};
|
||||
use axum::middleware::Next;
|
||||
|
|
@ -13,6 +15,8 @@ use axum::response::sse::KeepAlive;
|
|||
use axum::response::{IntoResponse, Response, Sse};
|
||||
use axum::routing::{delete, get, post};
|
||||
use axum::{Json, Router};
|
||||
use futures::stream;
|
||||
use futures::StreamExt;
|
||||
use sandbox_agent_agent_management::agents::{
|
||||
AgentId, AgentManager, InstallOptions, InstallResult, InstallSource, InstalledArtifactKind,
|
||||
};
|
||||
|
|
@ -27,11 +31,16 @@ use serde::de::DeserializeOwned;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{json, Value};
|
||||
use tar::Archive;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
use tower_http::trace::TraceLayer;
|
||||
use tracing::Span;
|
||||
use utoipa::{Modify, OpenApi, ToSchema};
|
||||
|
||||
use crate::acp_proxy_runtime::{AcpProxyRuntime, ProxyPostOutcome};
|
||||
use crate::process_runtime::{
|
||||
decode_input_bytes, ProcessLogFilter, ProcessLogFilterStream, ProcessRuntime,
|
||||
ProcessRuntimeConfig, ProcessSnapshot, ProcessStartSpec, ProcessStatus, ProcessStream, RunSpec,
|
||||
};
|
||||
use crate::ui;
|
||||
|
||||
mod support;
|
||||
|
|
@ -77,6 +86,7 @@ pub struct AppState {
|
|||
agent_manager: Arc<AgentManager>,
|
||||
acp_proxy: Arc<AcpProxyRuntime>,
|
||||
opencode_server_manager: Arc<OpenCodeServerManager>,
|
||||
process_runtime: Arc<ProcessRuntime>,
|
||||
pub(crate) branding: BrandingMode,
|
||||
version_cache: Mutex<HashMap<AgentId, CachedAgentVersion>>,
|
||||
}
|
||||
|
|
@ -100,11 +110,13 @@ impl AppState {
|
|||
auto_restart: true,
|
||||
},
|
||||
));
|
||||
let process_runtime = Arc::new(ProcessRuntime::new());
|
||||
Self {
|
||||
auth,
|
||||
agent_manager,
|
||||
acp_proxy,
|
||||
opencode_server_manager,
|
||||
process_runtime,
|
||||
branding,
|
||||
version_cache: Mutex::new(HashMap::new()),
|
||||
}
|
||||
|
|
@ -122,6 +134,10 @@ impl AppState {
|
|||
self.opencode_server_manager.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn process_runtime(&self) -> Arc<ProcessRuntime> {
|
||||
self.process_runtime.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn purge_version_cache(&self, agent: AgentId) {
|
||||
self.version_cache.lock().unwrap().remove(&agent);
|
||||
}
|
||||
|
|
@ -166,6 +182,28 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
|
|||
.route("/fs/move", post(post_v1_fs_move))
|
||||
.route("/fs/stat", get(get_v1_fs_stat))
|
||||
.route("/fs/upload-batch", post(post_v1_fs_upload_batch))
|
||||
.route(
|
||||
"/processes/config",
|
||||
get(get_v1_processes_config).post(post_v1_processes_config),
|
||||
)
|
||||
.route("/processes", get(get_v1_processes).post(post_v1_processes))
|
||||
.route("/processes/run", post(post_v1_processes_run))
|
||||
.route(
|
||||
"/processes/:id",
|
||||
get(get_v1_process).delete(delete_v1_process),
|
||||
)
|
||||
.route("/processes/:id/stop", post(post_v1_process_stop))
|
||||
.route("/processes/:id/kill", post(post_v1_process_kill))
|
||||
.route("/processes/:id/logs", get(get_v1_process_logs))
|
||||
.route("/processes/:id/input", post(post_v1_process_input))
|
||||
.route(
|
||||
"/processes/:id/terminal/resize",
|
||||
post(post_v1_process_terminal_resize),
|
||||
)
|
||||
.route(
|
||||
"/processes/:id/terminal/ws",
|
||||
get(get_v1_process_terminal_ws),
|
||||
)
|
||||
.route(
|
||||
"/config/mcp",
|
||||
get(get_v1_config_mcp)
|
||||
|
|
@ -295,6 +333,19 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
|
|||
post_v1_fs_move,
|
||||
get_v1_fs_stat,
|
||||
post_v1_fs_upload_batch,
|
||||
get_v1_processes_config,
|
||||
post_v1_processes_config,
|
||||
post_v1_processes,
|
||||
post_v1_processes_run,
|
||||
get_v1_processes,
|
||||
get_v1_process,
|
||||
post_v1_process_stop,
|
||||
post_v1_process_kill,
|
||||
delete_v1_process,
|
||||
get_v1_process_logs,
|
||||
post_v1_process_input,
|
||||
post_v1_process_terminal_resize,
|
||||
get_v1_process_terminal_ws,
|
||||
get_v1_config_mcp,
|
||||
put_v1_config_mcp,
|
||||
delete_v1_config_mcp,
|
||||
|
|
@ -329,6 +380,22 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
|
|||
FsMoveResponse,
|
||||
FsActionResponse,
|
||||
FsUploadBatchResponse,
|
||||
ProcessConfig,
|
||||
ProcessCreateRequest,
|
||||
ProcessRunRequest,
|
||||
ProcessRunResponse,
|
||||
ProcessState,
|
||||
ProcessInfo,
|
||||
ProcessListResponse,
|
||||
ProcessLogsStream,
|
||||
ProcessLogsQuery,
|
||||
ProcessLogEntry,
|
||||
ProcessLogsResponse,
|
||||
ProcessInputRequest,
|
||||
ProcessInputResponse,
|
||||
ProcessSignalQuery,
|
||||
ProcessTerminalResizeRequest,
|
||||
ProcessTerminalResizeResponse,
|
||||
AcpPostQuery,
|
||||
AcpServerInfo,
|
||||
AcpServerListResponse,
|
||||
|
|
@ -361,12 +428,21 @@ impl Modify for ServerAddon {
|
|||
pub enum ApiError {
|
||||
#[error(transparent)]
|
||||
Sandbox(#[from] SandboxError),
|
||||
#[error("problem: {0:?}")]
|
||||
Problem(ProblemDetails),
|
||||
}
|
||||
|
||||
impl From<ProblemDetails> for ApiError {
|
||||
fn from(value: ProblemDetails) -> Self {
|
||||
Self::Problem(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoResponse for ApiError {
|
||||
fn into_response(self) -> Response {
|
||||
let problem = match &self {
|
||||
ApiError::Sandbox(error) => problem_from_sandbox_error(error),
|
||||
ApiError::Problem(problem) => problem.clone(),
|
||||
};
|
||||
let status =
|
||||
StatusCode::from_u16(problem.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
|
|
@ -1075,6 +1151,678 @@ async fn post_v1_fs_upload_batch(
|
|||
}))
|
||||
}
|
||||
|
||||
/// Get process runtime configuration.
|
||||
///
|
||||
/// Returns the current runtime configuration for the process management API,
|
||||
/// including limits for concurrency, timeouts, and buffer sizes.
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/processes/config",
|
||||
tag = "v1",
|
||||
responses(
|
||||
(status = 200, description = "Current runtime process config", body = ProcessConfig),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn get_v1_processes_config(
|
||||
State(state): State<Arc<AppState>>,
|
||||
) -> Result<Json<ProcessConfig>, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
let config = state.process_runtime().get_config().await;
|
||||
Ok(Json(map_process_config(config)))
|
||||
}
|
||||
|
||||
/// Update process runtime configuration.
|
||||
///
|
||||
/// Replaces the runtime configuration for the process management API.
|
||||
/// Validates that all values are non-zero and clamps default timeout to max.
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/v1/processes/config",
|
||||
tag = "v1",
|
||||
request_body = ProcessConfig,
|
||||
responses(
|
||||
(status = 200, description = "Updated runtime process config", body = ProcessConfig),
|
||||
(status = 400, description = "Invalid config", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn post_v1_processes_config(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Json(body): Json<ProcessConfig>,
|
||||
) -> Result<Json<ProcessConfig>, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
let runtime = state.process_runtime();
|
||||
let updated = runtime
|
||||
.set_config(into_runtime_process_config(body))
|
||||
.await?;
|
||||
Ok(Json(map_process_config(updated)))
|
||||
}
|
||||
|
||||
/// Create a long-lived managed process.
|
||||
///
|
||||
/// Spawns a new process with the given command and arguments. Supports both
|
||||
/// pipe-based and PTY (tty) modes. Returns the process descriptor on success.
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/v1/processes",
|
||||
tag = "v1",
|
||||
request_body = ProcessCreateRequest,
|
||||
responses(
|
||||
(status = 200, description = "Started process", body = ProcessInfo),
|
||||
(status = 400, description = "Invalid request", body = ProblemDetails),
|
||||
(status = 409, description = "Process limit or state conflict", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn post_v1_processes(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Json(body): Json<ProcessCreateRequest>,
|
||||
) -> Result<Json<ProcessInfo>, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
let runtime = state.process_runtime();
|
||||
let snapshot = runtime
|
||||
.start_process(ProcessStartSpec {
|
||||
command: body.command,
|
||||
args: body.args,
|
||||
cwd: body.cwd,
|
||||
env: body.env.into_iter().collect(),
|
||||
tty: body.tty,
|
||||
interactive: body.interactive,
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(Json(map_process_snapshot(snapshot)))
|
||||
}
|
||||
|
||||
/// Run a one-shot command.
|
||||
///
|
||||
/// Executes a command to completion and returns its stdout, stderr, exit code,
|
||||
/// and duration. Supports configurable timeout and output size limits.
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/v1/processes/run",
|
||||
tag = "v1",
|
||||
request_body = ProcessRunRequest,
|
||||
responses(
|
||||
(status = 200, description = "One-off command result", body = ProcessRunResponse),
|
||||
(status = 400, description = "Invalid request", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn post_v1_processes_run(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Json(body): Json<ProcessRunRequest>,
|
||||
) -> Result<Json<ProcessRunResponse>, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
let runtime = state.process_runtime();
|
||||
let output = runtime
|
||||
.run_once(RunSpec {
|
||||
command: body.command,
|
||||
args: body.args,
|
||||
cwd: body.cwd,
|
||||
env: body.env.into_iter().collect(),
|
||||
timeout_ms: body.timeout_ms,
|
||||
max_output_bytes: body.max_output_bytes,
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(Json(ProcessRunResponse {
|
||||
exit_code: output.exit_code,
|
||||
timed_out: output.timed_out,
|
||||
stdout: output.stdout,
|
||||
stderr: output.stderr,
|
||||
stdout_truncated: output.stdout_truncated,
|
||||
stderr_truncated: output.stderr_truncated,
|
||||
duration_ms: output.duration_ms,
|
||||
}))
|
||||
}
|
||||
|
||||
/// List all managed processes.
|
||||
///
|
||||
/// Returns a list of all processes (running and exited) currently tracked
|
||||
/// by the runtime, sorted by process ID.
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/processes",
|
||||
tag = "v1",
|
||||
responses(
|
||||
(status = 200, description = "List processes", body = ProcessListResponse),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn get_v1_processes(
|
||||
State(state): State<Arc<AppState>>,
|
||||
) -> Result<Json<ProcessListResponse>, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
let snapshots = state.process_runtime().list_processes().await;
|
||||
Ok(Json(ProcessListResponse {
|
||||
processes: snapshots.into_iter().map(map_process_snapshot).collect(),
|
||||
}))
|
||||
}
|
||||
|
||||
/// Get a single process by ID.
|
||||
///
|
||||
/// Returns the current state of a managed process including its status,
|
||||
/// PID, exit code, and creation/exit timestamps.
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/processes/{id}",
|
||||
tag = "v1",
|
||||
params(
|
||||
("id" = String, Path, description = "Process ID")
|
||||
),
|
||||
responses(
|
||||
(status = 200, description = "Process details", body = ProcessInfo),
|
||||
(status = 404, description = "Unknown process", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn get_v1_process(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
) -> Result<Json<ProcessInfo>, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
let snapshot = state.process_runtime().snapshot(&id).await?;
|
||||
Ok(Json(map_process_snapshot(snapshot)))
|
||||
}
|
||||
|
||||
/// Send SIGTERM to a process.
|
||||
///
|
||||
/// Sends SIGTERM to the process and optionally waits up to `waitMs`
|
||||
/// milliseconds for the process to exit before returning.
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/v1/processes/{id}/stop",
|
||||
tag = "v1",
|
||||
params(
|
||||
("id" = String, Path, description = "Process ID"),
|
||||
("waitMs" = Option<u64>, Query, description = "Wait up to N ms for process to exit")
|
||||
),
|
||||
responses(
|
||||
(status = 200, description = "Stop signal sent", body = ProcessInfo),
|
||||
(status = 404, description = "Unknown process", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn post_v1_process_stop(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
Query(query): Query<ProcessSignalQuery>,
|
||||
) -> Result<Json<ProcessInfo>, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
let snapshot = state
|
||||
.process_runtime()
|
||||
.stop_process(&id, query.wait_ms)
|
||||
.await?;
|
||||
Ok(Json(map_process_snapshot(snapshot)))
|
||||
}
|
||||
|
||||
/// Send SIGKILL to a process.
|
||||
///
|
||||
/// Sends SIGKILL to the process and optionally waits up to `waitMs`
|
||||
/// milliseconds for the process to exit before returning.
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/v1/processes/{id}/kill",
|
||||
tag = "v1",
|
||||
params(
|
||||
("id" = String, Path, description = "Process ID"),
|
||||
("waitMs" = Option<u64>, Query, description = "Wait up to N ms for process to exit")
|
||||
),
|
||||
responses(
|
||||
(status = 200, description = "Kill signal sent", body = ProcessInfo),
|
||||
(status = 404, description = "Unknown process", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn post_v1_process_kill(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
Query(query): Query<ProcessSignalQuery>,
|
||||
) -> Result<Json<ProcessInfo>, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
let snapshot = state
|
||||
.process_runtime()
|
||||
.kill_process(&id, query.wait_ms)
|
||||
.await?;
|
||||
Ok(Json(map_process_snapshot(snapshot)))
|
||||
}
|
||||
|
||||
/// Delete a process record.
|
||||
///
|
||||
/// Removes a stopped process from the runtime. Returns 409 if the process
|
||||
/// is still running; stop or kill it first.
|
||||
#[utoipa::path(
|
||||
delete,
|
||||
path = "/v1/processes/{id}",
|
||||
tag = "v1",
|
||||
params(
|
||||
("id" = String, Path, description = "Process ID")
|
||||
),
|
||||
responses(
|
||||
(status = 204, description = "Process deleted"),
|
||||
(status = 404, description = "Unknown process", body = ProblemDetails),
|
||||
(status = 409, description = "Process is still running", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn delete_v1_process(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
state.process_runtime().delete_process(&id).await?;
|
||||
Ok(StatusCode::NO_CONTENT)
|
||||
}
|
||||
|
||||
/// Fetch process logs.
|
||||
///
|
||||
/// Returns buffered log entries for a process. Supports filtering by stream
|
||||
/// type, tail count, and sequence-based resumption. When `follow=true`,
|
||||
/// returns an SSE stream that replays buffered entries then streams live output.
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/processes/{id}/logs",
|
||||
tag = "v1",
|
||||
params(
|
||||
("id" = String, Path, description = "Process ID"),
|
||||
("stream" = Option<ProcessLogsStream>, Query, description = "stdout|stderr|combined|pty"),
|
||||
("tail" = Option<usize>, Query, description = "Tail N entries"),
|
||||
("follow" = Option<bool>, Query, description = "Follow via SSE"),
|
||||
("since" = Option<u64>, Query, description = "Only entries with sequence greater than this")
|
||||
),
|
||||
responses(
|
||||
(status = 200, description = "Process logs", body = ProcessLogsResponse),
|
||||
(status = 404, description = "Unknown process", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn get_v1_process_logs(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
headers: HeaderMap,
|
||||
Query(query): Query<ProcessLogsQuery>,
|
||||
) -> Result<Response, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
let runtime = state.process_runtime();
|
||||
let default_stream = if runtime.is_tty(&id).await? {
|
||||
ProcessLogsStream::Pty
|
||||
} else {
|
||||
ProcessLogsStream::Combined
|
||||
};
|
||||
let requested_stream = query.stream.unwrap_or(default_stream);
|
||||
let since = match (query.since, parse_last_event_id(&headers)?) {
|
||||
(Some(query_since), Some(last_event_id)) => Some(query_since.max(last_event_id)),
|
||||
(Some(query_since), None) => Some(query_since),
|
||||
(None, Some(last_event_id)) => Some(last_event_id),
|
||||
(None, None) => None,
|
||||
};
|
||||
let filter = ProcessLogFilter {
|
||||
stream: into_runtime_log_stream(requested_stream),
|
||||
tail: query.tail,
|
||||
since,
|
||||
};
|
||||
|
||||
let entries = runtime.logs(&id, filter).await?;
|
||||
let response_entries: Vec<ProcessLogEntry> =
|
||||
entries.iter().cloned().map(map_process_log_line).collect();
|
||||
|
||||
if query.follow.unwrap_or(false) {
|
||||
let rx = runtime.subscribe_logs(&id).await?;
|
||||
let replay_stream = stream::iter(response_entries.into_iter().map(|entry| {
|
||||
Ok::<axum::response::sse::Event, Infallible>(
|
||||
axum::response::sse::Event::default()
|
||||
.event("log")
|
||||
.id(entry.sequence.to_string())
|
||||
.data(serde_json::to_string(&entry).unwrap_or_else(|_| "{}".to_string())),
|
||||
)
|
||||
}));
|
||||
|
||||
let requested_stream_copy = requested_stream;
|
||||
let follow_stream = BroadcastStream::new(rx).filter_map(move |item| {
|
||||
let requested_stream_copy = requested_stream_copy;
|
||||
async move {
|
||||
match item {
|
||||
Ok(line) => {
|
||||
let entry = map_process_log_line(line);
|
||||
if process_log_matches(&entry, requested_stream_copy) {
|
||||
Some(Ok(axum::response::sse::Event::default()
|
||||
.event("log")
|
||||
.id(entry.sequence.to_string())
|
||||
.data(
|
||||
serde_json::to_string(&entry)
|
||||
.unwrap_or_else(|_| "{}".to_string()),
|
||||
)))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let stream = replay_stream.chain(follow_stream);
|
||||
let response =
|
||||
Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)));
|
||||
return Ok(response.into_response());
|
||||
}
|
||||
|
||||
Ok(Json(ProcessLogsResponse {
|
||||
process_id: id,
|
||||
stream: requested_stream,
|
||||
entries: response_entries,
|
||||
})
|
||||
.into_response())
|
||||
}
|
||||
|
||||
/// Write input to a process.
|
||||
///
|
||||
/// Sends data to a process's stdin (pipe mode) or PTY writer (tty mode).
|
||||
/// Data can be encoded as base64, utf8, or text. Returns 413 if the decoded
|
||||
/// payload exceeds the configured `maxInputBytesPerRequest` limit.
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/v1/processes/{id}/input",
|
||||
tag = "v1",
|
||||
params(
|
||||
("id" = String, Path, description = "Process ID")
|
||||
),
|
||||
request_body = ProcessInputRequest,
|
||||
responses(
|
||||
(status = 200, description = "Input accepted", body = ProcessInputResponse),
|
||||
(status = 400, description = "Invalid request", body = ProblemDetails),
|
||||
(status = 413, description = "Input exceeds configured limit", body = ProblemDetails),
|
||||
(status = 409, description = "Process not writable", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn post_v1_process_input(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
Json(body): Json<ProcessInputRequest>,
|
||||
) -> Result<Json<ProcessInputResponse>, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
let encoding = body.encoding.unwrap_or_else(|| "base64".to_string());
|
||||
let input = decode_input_bytes(&body.data, &encoding)?;
|
||||
let runtime = state.process_runtime();
|
||||
let max_input = runtime.max_input_bytes().await;
|
||||
if input.len() > max_input {
|
||||
return Err(SandboxError::InvalidRequest {
|
||||
message: format!("input payload exceeds maxInputBytesPerRequest ({max_input})"),
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
let bytes_written = runtime.write_input(&id, &input).await?;
|
||||
Ok(Json(ProcessInputResponse { bytes_written }))
|
||||
}
|
||||
|
||||
/// Resize a process terminal.
|
||||
///
|
||||
/// Sets the PTY window size (columns and rows) for a tty-mode process and
|
||||
/// sends SIGWINCH so the child process can adapt.
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/v1/processes/{id}/terminal/resize",
|
||||
tag = "v1",
|
||||
params(
|
||||
("id" = String, Path, description = "Process ID")
|
||||
),
|
||||
request_body = ProcessTerminalResizeRequest,
|
||||
responses(
|
||||
(status = 200, description = "Resize accepted", body = ProcessTerminalResizeResponse),
|
||||
(status = 400, description = "Invalid request", body = ProblemDetails),
|
||||
(status = 404, description = "Unknown process", body = ProblemDetails),
|
||||
(status = 409, description = "Not a terminal process", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn post_v1_process_terminal_resize(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
Json(body): Json<ProcessTerminalResizeRequest>,
|
||||
) -> Result<Json<ProcessTerminalResizeResponse>, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
state
|
||||
.process_runtime()
|
||||
.resize_terminal(&id, body.cols, body.rows)
|
||||
.await?;
|
||||
Ok(Json(ProcessTerminalResizeResponse {
|
||||
cols: body.cols,
|
||||
rows: body.rows,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Open an interactive WebSocket terminal session.
|
||||
///
|
||||
/// Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts
|
||||
/// `access_token` query param for browser-based auth (WebSocket API cannot
|
||||
/// send custom headers). Streams raw PTY output as binary frames and accepts
|
||||
/// JSON control frames for input, resize, and close.
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/processes/{id}/terminal/ws",
|
||||
tag = "v1",
|
||||
params(
|
||||
("id" = String, Path, description = "Process ID"),
|
||||
("access_token" = Option<String>, Query, description = "Bearer token alternative for WS auth")
|
||||
),
|
||||
responses(
|
||||
(status = 101, description = "WebSocket upgraded"),
|
||||
(status = 400, description = "Invalid websocket frame or upgrade request", body = ProblemDetails),
|
||||
(status = 404, description = "Unknown process", body = ProblemDetails),
|
||||
(status = 409, description = "Not a terminal process", body = ProblemDetails),
|
||||
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
|
||||
)
|
||||
)]
|
||||
async fn get_v1_process_terminal_ws(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
Query(_query): Query<ProcessWsQuery>,
|
||||
ws: WebSocketUpgrade,
|
||||
) -> Result<Response, ApiError> {
|
||||
if !process_api_supported() {
|
||||
return Err(process_api_not_supported().into());
|
||||
}
|
||||
|
||||
let runtime = state.process_runtime();
|
||||
if !runtime.is_tty(&id).await? {
|
||||
return Err(SandboxError::Conflict {
|
||||
message: "process is not running in tty mode".to_string(),
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
Ok(ws
|
||||
.on_upgrade(move |socket| process_terminal_ws_session(socket, runtime, id))
|
||||
.into_response())
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(tag = "type", rename_all = "camelCase")]
|
||||
enum TerminalClientFrame {
|
||||
Input {
|
||||
data: String,
|
||||
#[serde(default)]
|
||||
encoding: Option<String>,
|
||||
},
|
||||
Resize {
|
||||
cols: u16,
|
||||
rows: u16,
|
||||
},
|
||||
Close,
|
||||
}
|
||||
|
||||
async fn process_terminal_ws_session(
|
||||
mut socket: WebSocket,
|
||||
runtime: Arc<ProcessRuntime>,
|
||||
id: String,
|
||||
) {
|
||||
let _ = send_ws_json(
|
||||
&mut socket,
|
||||
json!({
|
||||
"type": "ready",
|
||||
"processId": &id,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut log_rx = match runtime.subscribe_logs(&id).await {
|
||||
Ok(rx) => rx,
|
||||
Err(err) => {
|
||||
let _ = send_ws_error(&mut socket, &err.to_string()).await;
|
||||
let _ = socket.close().await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut exit_poll = tokio::time::interval(Duration::from_millis(150));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
ws_in = socket.recv() => {
|
||||
match ws_in {
|
||||
Some(Ok(Message::Binary(_))) => {
|
||||
let _ = send_ws_error(&mut socket, "binary input is not supported; use text JSON frames").await;
|
||||
}
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
let parsed = serde_json::from_str::<TerminalClientFrame>(&text);
|
||||
match parsed {
|
||||
Ok(TerminalClientFrame::Input { data, encoding }) => {
|
||||
let input = match decode_input_bytes(&data, encoding.as_deref().unwrap_or("utf8")) {
|
||||
Ok(input) => input,
|
||||
Err(err) => {
|
||||
let _ = send_ws_error(&mut socket, &err.to_string()).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let max_input = runtime.max_input_bytes().await;
|
||||
if input.len() > max_input {
|
||||
let _ = send_ws_error(&mut socket, &format!("input payload exceeds maxInputBytesPerRequest ({max_input})")).await;
|
||||
continue;
|
||||
}
|
||||
if let Err(err) = runtime.write_input(&id, &input).await {
|
||||
let _ = send_ws_error(&mut socket, &err.to_string()).await;
|
||||
}
|
||||
}
|
||||
Ok(TerminalClientFrame::Resize { cols, rows }) => {
|
||||
if let Err(err) = runtime.resize_terminal(&id, cols, rows).await {
|
||||
let _ = send_ws_error(&mut socket, &err.to_string()).await;
|
||||
}
|
||||
}
|
||||
Ok(TerminalClientFrame::Close) => {
|
||||
let _ = socket.close().await;
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = send_ws_error(&mut socket, &format!("invalid terminal frame: {err}")).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Ok(Message::Ping(payload))) => {
|
||||
let _ = socket.send(Message::Pong(payload)).await;
|
||||
}
|
||||
Some(Ok(Message::Close(_))) | None => break,
|
||||
Some(Ok(Message::Pong(_))) => {}
|
||||
Some(Err(_)) => break,
|
||||
}
|
||||
}
|
||||
log_in = log_rx.recv() => {
|
||||
match log_in {
|
||||
Ok(line) => {
|
||||
if line.stream != ProcessStream::Pty {
|
||||
continue;
|
||||
}
|
||||
let bytes = {
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_ENGINE;
|
||||
use base64::Engine;
|
||||
BASE64_ENGINE.decode(&line.data).unwrap_or_default()
|
||||
};
|
||||
if socket.send(Message::Binary(bytes)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
_ = exit_poll.tick() => {
|
||||
if let Ok(snapshot) = runtime.snapshot(&id).await {
|
||||
if snapshot.status == ProcessStatus::Exited {
|
||||
let _ = send_ws_json(
|
||||
&mut socket,
|
||||
json!({
|
||||
"type": "exit",
|
||||
"exitCode": snapshot.exit_code,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
let _ = socket.close().await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_ws_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {
|
||||
socket
|
||||
.send(Message::Text(
|
||||
serde_json::to_string(&payload).map_err(|_| ())?,
|
||||
))
|
||||
.await
|
||||
.map_err(|_| ())
|
||||
}
|
||||
|
||||
async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> {
|
||||
send_ws_json(
|
||||
socket,
|
||||
json!({
|
||||
"type": "error",
|
||||
"message": message,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/config/mcp",
|
||||
|
|
@ -1386,6 +2134,96 @@ async fn delete_v1_acp(
|
|||
Ok(StatusCode::NO_CONTENT)
|
||||
}
|
||||
|
||||
fn process_api_supported() -> bool {
|
||||
!cfg!(windows)
|
||||
}
|
||||
|
||||
fn process_api_not_supported() -> ProblemDetails {
|
||||
ProblemDetails {
|
||||
type_: ErrorType::InvalidRequest.as_urn().to_string(),
|
||||
title: "Not Implemented".to_string(),
|
||||
status: 501,
|
||||
detail: Some("process API is not implemented on Windows".to_string()),
|
||||
instance: None,
|
||||
extensions: serde_json::Map::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn map_process_config(config: ProcessRuntimeConfig) -> ProcessConfig {
|
||||
ProcessConfig {
|
||||
max_concurrent_processes: config.max_concurrent_processes,
|
||||
default_run_timeout_ms: config.default_run_timeout_ms,
|
||||
max_run_timeout_ms: config.max_run_timeout_ms,
|
||||
max_output_bytes: config.max_output_bytes,
|
||||
max_log_bytes_per_process: config.max_log_bytes_per_process,
|
||||
max_input_bytes_per_request: config.max_input_bytes_per_request,
|
||||
}
|
||||
}
|
||||
|
||||
fn into_runtime_process_config(config: ProcessConfig) -> ProcessRuntimeConfig {
|
||||
ProcessRuntimeConfig {
|
||||
max_concurrent_processes: config.max_concurrent_processes,
|
||||
default_run_timeout_ms: config.default_run_timeout_ms,
|
||||
max_run_timeout_ms: config.max_run_timeout_ms,
|
||||
max_output_bytes: config.max_output_bytes,
|
||||
max_log_bytes_per_process: config.max_log_bytes_per_process,
|
||||
max_input_bytes_per_request: config.max_input_bytes_per_request,
|
||||
}
|
||||
}
|
||||
|
||||
fn map_process_snapshot(snapshot: ProcessSnapshot) -> ProcessInfo {
|
||||
ProcessInfo {
|
||||
id: snapshot.id,
|
||||
command: snapshot.command,
|
||||
args: snapshot.args,
|
||||
cwd: snapshot.cwd,
|
||||
tty: snapshot.tty,
|
||||
interactive: snapshot.interactive,
|
||||
status: match snapshot.status {
|
||||
ProcessStatus::Running => ProcessState::Running,
|
||||
ProcessStatus::Exited => ProcessState::Exited,
|
||||
},
|
||||
pid: snapshot.pid,
|
||||
exit_code: snapshot.exit_code,
|
||||
created_at_ms: snapshot.created_at_ms,
|
||||
exited_at_ms: snapshot.exited_at_ms,
|
||||
}
|
||||
}
|
||||
|
||||
fn into_runtime_log_stream(stream: ProcessLogsStream) -> ProcessLogFilterStream {
|
||||
match stream {
|
||||
ProcessLogsStream::Stdout => ProcessLogFilterStream::Stdout,
|
||||
ProcessLogsStream::Stderr => ProcessLogFilterStream::Stderr,
|
||||
ProcessLogsStream::Combined => ProcessLogFilterStream::Combined,
|
||||
ProcessLogsStream::Pty => ProcessLogFilterStream::Pty,
|
||||
}
|
||||
}
|
||||
|
||||
fn map_process_log_line(line: crate::process_runtime::ProcessLogLine) -> ProcessLogEntry {
|
||||
ProcessLogEntry {
|
||||
sequence: line.sequence,
|
||||
stream: match line.stream {
|
||||
ProcessStream::Stdout => ProcessLogsStream::Stdout,
|
||||
ProcessStream::Stderr => ProcessLogsStream::Stderr,
|
||||
ProcessStream::Pty => ProcessLogsStream::Pty,
|
||||
},
|
||||
timestamp_ms: line.timestamp_ms,
|
||||
data: line.data,
|
||||
encoding: line.encoding.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn process_log_matches(entry: &ProcessLogEntry, stream: ProcessLogsStream) -> bool {
|
||||
match stream {
|
||||
ProcessLogsStream::Stdout => entry.stream == ProcessLogsStream::Stdout,
|
||||
ProcessLogsStream::Stderr => entry.stream == ProcessLogsStream::Stderr,
|
||||
ProcessLogsStream::Combined => {
|
||||
entry.stream == ProcessLogsStream::Stdout || entry.stream == ProcessLogsStream::Stderr
|
||||
}
|
||||
ProcessLogsStream::Pty => entry.stream == ProcessLogsStream::Pty,
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_named_query(value: &str, field_name: &str) -> Result<(), SandboxError> {
|
||||
if value.trim().is_empty() {
|
||||
return Err(SandboxError::InvalidRequest {
|
||||
|
|
|
|||
|
|
@ -33,7 +33,17 @@ pub(super) async fn require_token(
|
|||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|value| value.strip_prefix("Bearer "));
|
||||
|
||||
if bearer == Some(expected.as_str()) {
|
||||
let allow_query_token = request.uri().path().ends_with("/terminal/ws");
|
||||
let query_token = if allow_query_token {
|
||||
request
|
||||
.uri()
|
||||
.query()
|
||||
.and_then(|query| query_param(query, "access_token"))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if bearer == Some(expected.as_str()) || query_token.as_deref() == Some(expected.as_str()) {
|
||||
return Ok(next.run(request).await);
|
||||
}
|
||||
|
||||
|
|
@ -42,6 +52,53 @@ pub(super) async fn require_token(
|
|||
}))
|
||||
}
|
||||
|
||||
fn query_param(query: &str, key: &str) -> Option<String> {
|
||||
query
|
||||
.split('&')
|
||||
.filter_map(|part| part.split_once('='))
|
||||
.find_map(|(k, v)| {
|
||||
if k == key {
|
||||
Some(percent_decode(v))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn percent_decode(input: &str) -> String {
|
||||
let mut output = Vec::with_capacity(input.len());
|
||||
let bytes = input.as_bytes();
|
||||
let mut i = 0;
|
||||
while i < bytes.len() {
|
||||
if bytes[i] == b'%' && i + 2 < bytes.len() {
|
||||
if let (Some(hi), Some(lo)) = (
|
||||
hex_nibble(bytes[i + 1]),
|
||||
hex_nibble(bytes[i + 2]),
|
||||
) {
|
||||
output.push((hi << 4) | lo);
|
||||
i += 3;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if bytes[i] == b'+' {
|
||||
output.push(b' ');
|
||||
} else {
|
||||
output.push(bytes[i]);
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
String::from_utf8(output).unwrap_or_else(|_| input.to_string())
|
||||
}
|
||||
|
||||
fn hex_nibble(b: u8) -> Option<u8> {
|
||||
match b {
|
||||
b'0'..=b'9' => Some(b - b'0'),
|
||||
b'a'..=b'f' => Some(b - b'a' + 10),
|
||||
b'A'..=b'F' => Some(b - b'A' + 10),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) type PinBoxSseStream = crate::acp_proxy_runtime::PinBoxSseStream;
|
||||
|
||||
pub(super) fn credentials_available_for(
|
||||
|
|
@ -497,8 +554,17 @@ pub(super) fn problem_from_sandbox_error(error: &SandboxError) -> ProblemDetails
|
|||
let mut problem = error.to_problem_details();
|
||||
|
||||
match error {
|
||||
SandboxError::InvalidRequest { .. } => {
|
||||
problem.status = 400;
|
||||
SandboxError::InvalidRequest { message } => {
|
||||
if message.starts_with("input payload exceeds maxInputBytesPerRequest") {
|
||||
problem.status = 413;
|
||||
problem.title = "Payload Too Large".to_string();
|
||||
} else {
|
||||
problem.status = 400;
|
||||
}
|
||||
}
|
||||
SandboxError::NotFound { .. } => {
|
||||
problem.status = 404;
|
||||
problem.title = "Not Found".to_string();
|
||||
}
|
||||
SandboxError::Timeout { .. } => {
|
||||
problem.status = 504;
|
||||
|
|
|
|||
|
|
@ -362,3 +362,173 @@ pub struct AcpEnvelope {
|
|||
#[serde(default)]
|
||||
pub error: Option<Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessConfig {
|
||||
pub max_concurrent_processes: usize,
|
||||
pub default_run_timeout_ms: u64,
|
||||
pub max_run_timeout_ms: u64,
|
||||
pub max_output_bytes: usize,
|
||||
pub max_log_bytes_per_process: usize,
|
||||
pub max_input_bytes_per_request: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessCreateRequest {
|
||||
pub command: String,
|
||||
#[serde(default)]
|
||||
pub args: Vec<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub cwd: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
|
||||
pub env: BTreeMap<String, String>,
|
||||
#[serde(default)]
|
||||
pub tty: bool,
|
||||
#[serde(default)]
|
||||
pub interactive: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessRunRequest {
|
||||
pub command: String,
|
||||
#[serde(default)]
|
||||
pub args: Vec<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub cwd: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
|
||||
pub env: BTreeMap<String, String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub timeout_ms: Option<u64>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub max_output_bytes: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessRunResponse {
|
||||
pub exit_code: Option<i32>,
|
||||
pub timed_out: bool,
|
||||
pub stdout: String,
|
||||
pub stderr: String,
|
||||
pub stdout_truncated: bool,
|
||||
pub stderr_truncated: bool,
|
||||
pub duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ProcessState {
|
||||
Running,
|
||||
Exited,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessInfo {
|
||||
pub id: String,
|
||||
pub command: String,
|
||||
pub args: Vec<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub cwd: Option<String>,
|
||||
pub tty: bool,
|
||||
pub interactive: bool,
|
||||
pub status: ProcessState,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub pid: Option<u32>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub exit_code: Option<i32>,
|
||||
pub created_at_ms: i64,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub exited_at_ms: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessListResponse {
|
||||
pub processes: Vec<ProcessInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ProcessLogsStream {
|
||||
Stdout,
|
||||
Stderr,
|
||||
Combined,
|
||||
Pty,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessLogsQuery {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub stream: Option<ProcessLogsStream>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub tail: Option<usize>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub follow: Option<bool>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub since: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessLogEntry {
|
||||
pub sequence: u64,
|
||||
pub stream: ProcessLogsStream,
|
||||
pub timestamp_ms: i64,
|
||||
pub data: String,
|
||||
pub encoding: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessLogsResponse {
|
||||
pub process_id: String,
|
||||
pub stream: ProcessLogsStream,
|
||||
pub entries: Vec<ProcessLogEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessInputRequest {
|
||||
pub data: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub encoding: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessInputResponse {
|
||||
pub bytes_written: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessSignalQuery {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub wait_ms: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessTerminalResizeRequest {
|
||||
pub cols: u16,
|
||||
pub rows: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessTerminalResizeResponse {
|
||||
pub cols: u16,
|
||||
pub rows: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProcessWsQuery {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub access_token: Option<String>,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use std::fs;
|
||||
use std::io::{Read, Write};
|
||||
use std::net::{TcpListener, TcpStream};
|
||||
use std::net::{SocketAddr, TcpListener, TcpStream};
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
@ -14,6 +14,8 @@ use sandbox_agent_agent_management::agents::AgentManager;
|
|||
use serde_json::{json, Value};
|
||||
use serial_test::serial;
|
||||
use tempfile::TempDir;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
use tower::util::ServiceExt;
|
||||
|
||||
struct TestApp {
|
||||
|
|
@ -48,6 +50,56 @@ struct EnvVarGuard {
|
|||
previous: Option<std::ffi::OsString>,
|
||||
}
|
||||
|
||||
struct LiveServer {
|
||||
address: SocketAddr,
|
||||
shutdown_tx: Option<oneshot::Sender<()>>,
|
||||
task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl LiveServer {
|
||||
async fn spawn(app: Router) -> Self {
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind live server");
|
||||
let address = listener.local_addr().expect("live server address");
|
||||
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
let server = axum::serve(listener, app.into_make_service())
|
||||
.with_graceful_shutdown(async {
|
||||
let _ = shutdown_rx.await;
|
||||
});
|
||||
|
||||
let _ = server.await;
|
||||
});
|
||||
|
||||
Self {
|
||||
address,
|
||||
shutdown_tx: Some(shutdown_tx),
|
||||
task,
|
||||
}
|
||||
}
|
||||
|
||||
fn http_url(&self, path: &str) -> String {
|
||||
format!("http://{}{}", self.address, path)
|
||||
}
|
||||
|
||||
fn ws_url(&self, path: &str) -> String {
|
||||
format!("ws://{}{}", self.address, path)
|
||||
}
|
||||
|
||||
async fn shutdown(mut self) {
|
||||
if let Some(shutdown_tx) = self.shutdown_tx.take() {
|
||||
let _ = shutdown_tx.send(());
|
||||
}
|
||||
|
||||
let _ = tokio::time::timeout(Duration::from_secs(3), async {
|
||||
let _ = self.task.await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
impl EnvVarGuard {
|
||||
fn set(key: &'static str, value: &str) -> Self {
|
||||
let previous = std::env::var_os(key);
|
||||
|
|
@ -291,3 +343,5 @@ mod acp_transport;
|
|||
mod config_endpoints;
|
||||
#[path = "v1_api/control_plane.rs"]
|
||||
mod control_plane;
|
||||
#[path = "v1_api/processes.rs"]
|
||||
mod processes;
|
||||
|
|
|
|||
661
server/packages/sandbox-agent/tests/v1_api/processes.rs
Normal file
661
server/packages/sandbox-agent/tests/v1_api/processes.rs
Normal file
|
|
@ -0,0 +1,661 @@
|
|||
use super::*;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||
use base64::Engine;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
async fn wait_for_exited(test_app: &TestApp, process_id: &str) {
|
||||
for _ in 0..30 {
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::GET,
|
||||
&format!("/v1/processes/{process_id}"),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let parsed = parse_json(&body);
|
||||
if parsed["status"] == "exited" {
|
||||
return;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
panic!("process did not exit in time");
|
||||
}
|
||||
|
||||
fn decode_log_entries(entries: &[Value]) -> String {
|
||||
entries
|
||||
.iter()
|
||||
.filter_map(|entry| entry.get("data").and_then(Value::as_str))
|
||||
.filter_map(|encoded| BASE64.decode(encoded).ok())
|
||||
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join("")
|
||||
}
|
||||
|
||||
async fn recv_ws_message(
|
||||
ws: &mut tokio_tungstenite::WebSocketStream<
|
||||
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
|
||||
>,
|
||||
) -> Message {
|
||||
tokio::time::timeout(Duration::from_secs(3), ws.next())
|
||||
.await
|
||||
.expect("timed out waiting for websocket frame")
|
||||
.expect("websocket stream ended")
|
||||
.expect("websocket frame")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_processes_config_round_trip() {
|
||||
let test_app = TestApp::new(AuthConfig::disabled());
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::GET,
|
||||
"/v1/processes/config",
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
assert_eq!(parse_json(&body)["maxConcurrentProcesses"], 64);
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
"/v1/processes/config",
|
||||
Some(json!({
|
||||
"maxConcurrentProcesses": 8,
|
||||
"defaultRunTimeoutMs": 1000,
|
||||
"maxRunTimeoutMs": 5000,
|
||||
"maxOutputBytes": 4096,
|
||||
"maxLogBytesPerProcess": 32768,
|
||||
"maxInputBytesPerRequest": 1024
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let parsed = parse_json(&body);
|
||||
assert_eq!(parsed["maxConcurrentProcesses"], 8);
|
||||
assert_eq!(parsed["defaultRunTimeoutMs"], 1000);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_process_lifecycle_requires_stop_before_delete() {
|
||||
let test_app = TestApp::new(AuthConfig::disabled());
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
"/v1/processes",
|
||||
Some(json!({
|
||||
"command": "sh",
|
||||
"args": ["-lc", "sleep 30"],
|
||||
"tty": false,
|
||||
"interactive": false
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let process_id = parse_json(&body)["id"]
|
||||
.as_str()
|
||||
.expect("process id")
|
||||
.to_string();
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::DELETE,
|
||||
&format!("/v1/processes/{process_id}"),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::CONFLICT);
|
||||
assert_eq!(parse_json(&body)["status"], 409);
|
||||
|
||||
let (status, _, _body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
&format!("/v1/processes/{process_id}/stop"),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
|
||||
wait_for_exited(&test_app, &process_id).await;
|
||||
|
||||
let (status, _, _) = send_request(
|
||||
&test_app.app,
|
||||
Method::DELETE,
|
||||
&format!("/v1/processes/{process_id}"),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::NO_CONTENT);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_process_run_returns_output_and_timeout() {
|
||||
let test_app = TestApp::new(AuthConfig::disabled());
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
"/v1/processes/run",
|
||||
Some(json!({
|
||||
"command": "sh",
|
||||
"args": ["-lc", "echo hi"],
|
||||
"timeoutMs": 1000
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let parsed = parse_json(&body);
|
||||
assert_eq!(parsed["timedOut"], false);
|
||||
assert_eq!(parsed["exitCode"], 0);
|
||||
assert!(parsed["stdout"].as_str().unwrap_or_default().contains("hi"));
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
"/v1/processes/run",
|
||||
Some(json!({
|
||||
"command": "sh",
|
||||
"args": ["-lc", "sleep 2"],
|
||||
"timeoutMs": 50
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
assert_eq!(parse_json(&body)["timedOut"], true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_process_run_reports_truncation() {
|
||||
let test_app = TestApp::new(AuthConfig::disabled());
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
"/v1/processes/run",
|
||||
Some(json!({
|
||||
"command": "sh",
|
||||
"args": ["-lc", "printf 'abcdefghijklmnopqrstuvwxyz'"],
|
||||
"maxOutputBytes": 5
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let parsed = parse_json(&body);
|
||||
assert_eq!(parsed["stdoutTruncated"], true);
|
||||
assert_eq!(parsed["stderrTruncated"], false);
|
||||
assert_eq!(parsed["stdout"].as_str().unwrap_or_default().len(), 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_process_tty_input_and_logs() {
|
||||
let test_app = TestApp::new(AuthConfig::disabled());
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
"/v1/processes",
|
||||
Some(json!({
|
||||
"command": "cat",
|
||||
"tty": true,
|
||||
"interactive": true
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let process_id = parse_json(&body)["id"]
|
||||
.as_str()
|
||||
.expect("process id")
|
||||
.to_string();
|
||||
|
||||
let (status, _, _body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
&format!("/v1/processes/{process_id}/input"),
|
||||
Some(json!({
|
||||
"data": "aGVsbG8K",
|
||||
"encoding": "base64"
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(150)).await;
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::GET,
|
||||
&format!("/v1/processes/{process_id}/logs?stream=pty&tail=20"),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let entries = parse_json(&body)["entries"]
|
||||
.as_array()
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
assert!(!entries.is_empty());
|
||||
|
||||
let (status, _, _body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
&format!("/v1/processes/{process_id}/kill"),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
|
||||
wait_for_exited(&test_app, &process_id).await;
|
||||
|
||||
let (status, _, _) = send_request(
|
||||
&test_app.app,
|
||||
Method::DELETE,
|
||||
&format!("/v1/processes/{process_id}"),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::NO_CONTENT);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_process_not_found_returns_404() {
|
||||
let test_app = TestApp::new(AuthConfig::disabled());
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::GET,
|
||||
"/v1/processes/does-not-exist",
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::NOT_FOUND);
|
||||
assert_eq!(parse_json(&body)["status"], 404);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_process_input_limit_returns_413() {
|
||||
let test_app = TestApp::new(AuthConfig::disabled());
|
||||
|
||||
let (status, _, _) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
"/v1/processes/config",
|
||||
Some(json!({
|
||||
"maxConcurrentProcesses": 8,
|
||||
"defaultRunTimeoutMs": 1000,
|
||||
"maxRunTimeoutMs": 5000,
|
||||
"maxOutputBytes": 4096,
|
||||
"maxLogBytesPerProcess": 32768,
|
||||
"maxInputBytesPerRequest": 4
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
"/v1/processes",
|
||||
Some(json!({
|
||||
"command": "cat",
|
||||
"tty": true,
|
||||
"interactive": true
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let process_id = parse_json(&body)["id"]
|
||||
.as_str()
|
||||
.expect("process id")
|
||||
.to_string();
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
&format!("/v1/processes/{process_id}/input"),
|
||||
Some(json!({
|
||||
"data": "aGVsbG8=",
|
||||
"encoding": "base64"
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::PAYLOAD_TOO_LARGE);
|
||||
assert_eq!(parse_json(&body)["status"], 413);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_tty_process_is_real_terminal() {
|
||||
let test_app = TestApp::new(AuthConfig::disabled());
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
"/v1/processes",
|
||||
Some(json!({
|
||||
"command": "sh",
|
||||
"args": ["-lc", "tty"],
|
||||
"tty": true,
|
||||
"interactive": false
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let process_id = parse_json(&body)["id"]
|
||||
.as_str()
|
||||
.expect("process id")
|
||||
.to_string();
|
||||
|
||||
wait_for_exited(&test_app, &process_id).await;
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::GET,
|
||||
&format!("/v1/processes/{process_id}/logs?stream=pty"),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let entries = parse_json(&body)["entries"]
|
||||
.as_array()
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let joined = decode_log_entries(&entries);
|
||||
assert!(!joined.to_lowercase().contains("not a tty"));
|
||||
assert!(joined.contains("/dev/"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_process_logs_follow_sse_streams_entries() {
|
||||
let test_app = TestApp::new(AuthConfig::disabled());
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
"/v1/processes",
|
||||
Some(json!({
|
||||
"command": "sh",
|
||||
"args": ["-lc", "echo first; sleep 0.3; echo second"],
|
||||
"tty": false,
|
||||
"interactive": false
|
||||
})),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let process_id = parse_json(&body)["id"]
|
||||
.as_str()
|
||||
.expect("process id")
|
||||
.to_string();
|
||||
|
||||
let request = Request::builder()
|
||||
.method(Method::GET)
|
||||
.uri(format!(
|
||||
"/v1/processes/{process_id}/logs?stream=stdout&follow=true"
|
||||
))
|
||||
.body(Body::empty())
|
||||
.expect("build request");
|
||||
let response = test_app
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(request)
|
||||
.await
|
||||
.expect("sse response");
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
let mut stream = response.into_body().into_data_stream();
|
||||
let chunk = tokio::time::timeout(Duration::from_secs(5), async move {
|
||||
while let Some(chunk) = stream.next().await {
|
||||
let bytes = chunk.expect("stream chunk");
|
||||
let text = String::from_utf8_lossy(&bytes).to_string();
|
||||
if text.contains("data:") {
|
||||
return text;
|
||||
}
|
||||
}
|
||||
panic!("SSE stream ended before log chunk");
|
||||
})
|
||||
.await
|
||||
.expect("timed out reading process log sse");
|
||||
|
||||
let payload = parse_sse_data(&chunk);
|
||||
assert!(payload["sequence"].as_u64().is_some());
|
||||
assert_eq!(payload["stream"], "stdout");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_access_token_query_only_allows_terminal_ws() {
|
||||
let test_app = TestApp::new(AuthConfig::with_token("secret-token".to_string()));
|
||||
|
||||
let (status, _, _) = send_request(
|
||||
&test_app.app,
|
||||
Method::GET,
|
||||
"/v1/health?access_token=secret-token",
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::UNAUTHORIZED);
|
||||
|
||||
let (status, _, body) = send_request(
|
||||
&test_app.app,
|
||||
Method::POST,
|
||||
"/v1/processes",
|
||||
Some(json!({
|
||||
"command": "cat",
|
||||
"tty": true,
|
||||
"interactive": true
|
||||
})),
|
||||
&[("authorization", "Bearer secret-token")],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let process_id = parse_json(&body)["id"]
|
||||
.as_str()
|
||||
.expect("process id")
|
||||
.to_string();
|
||||
|
||||
let (status, _, _) = send_request(
|
||||
&test_app.app,
|
||||
Method::GET,
|
||||
&format!("/v1/processes/{process_id}/terminal/ws"),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::UNAUTHORIZED);
|
||||
|
||||
let (status, _, _) = send_request(
|
||||
&test_app.app,
|
||||
Method::GET,
|
||||
&format!("/v1/processes/{process_id}/terminal/ws?access_token=secret-token"),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_process_terminal_ws_e2e_is_deterministic() {
|
||||
let test_app = TestApp::new(AuthConfig::disabled());
|
||||
let live_server = LiveServer::spawn(test_app.app.clone()).await;
|
||||
let http = reqwest::Client::new();
|
||||
|
||||
let create_response = http
|
||||
.post(live_server.http_url("/v1/processes"))
|
||||
.json(&json!({
|
||||
"command": "sh",
|
||||
"args": ["-lc", "stty -echo; IFS= read -r line; printf 'got:%s\\n' \"$line\""],
|
||||
"tty": true,
|
||||
"interactive": true
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.expect("create process response");
|
||||
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
|
||||
let create_body: Value = create_response.json().await.expect("create process json");
|
||||
let process_id = create_body["id"]
|
||||
.as_str()
|
||||
.expect("process id")
|
||||
.to_string();
|
||||
|
||||
let ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
|
||||
let (mut ws, _) = connect_async(&ws_url)
|
||||
.await
|
||||
.expect("connect websocket");
|
||||
|
||||
let ready = recv_ws_message(&mut ws).await;
|
||||
let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame"))
|
||||
.expect("ready json");
|
||||
assert_eq!(ready_payload["type"], "ready");
|
||||
assert_eq!(ready_payload["processId"], process_id);
|
||||
|
||||
ws.send(Message::Text(
|
||||
json!({
|
||||
"type": "input",
|
||||
"data": "hello from ws\n"
|
||||
})
|
||||
.to_string(),
|
||||
))
|
||||
.await
|
||||
.expect("send input frame");
|
||||
|
||||
let mut saw_binary_output = false;
|
||||
let mut saw_exit = false;
|
||||
for _ in 0..10 {
|
||||
let frame = recv_ws_message(&mut ws).await;
|
||||
match frame {
|
||||
Message::Binary(bytes) => {
|
||||
let text = String::from_utf8_lossy(&bytes);
|
||||
if text.contains("got:hello from ws") {
|
||||
saw_binary_output = true;
|
||||
}
|
||||
}
|
||||
Message::Text(text) => {
|
||||
let payload: Value = serde_json::from_str(&text).expect("ws json");
|
||||
if payload["type"] == "exit" {
|
||||
saw_exit = true;
|
||||
break;
|
||||
}
|
||||
assert_ne!(payload["type"], "error");
|
||||
}
|
||||
Message::Close(_) => break,
|
||||
Message::Ping(_) | Message::Pong(_) => {}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(saw_binary_output, "expected pty binary output over websocket");
|
||||
assert!(saw_exit, "expected exit control frame over websocket");
|
||||
|
||||
let _ = ws.close(None).await;
|
||||
|
||||
let delete_response = http
|
||||
.delete(live_server.http_url(&format!("/v1/processes/{process_id}")))
|
||||
.send()
|
||||
.await
|
||||
.expect("delete process response");
|
||||
assert_eq!(delete_response.status(), reqwest::StatusCode::NO_CONTENT);
|
||||
|
||||
live_server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v1_process_terminal_ws_auth_e2e() {
|
||||
let token = "secret-token";
|
||||
let test_app = TestApp::new(AuthConfig::with_token(token.to_string()));
|
||||
let live_server = LiveServer::spawn(test_app.app.clone()).await;
|
||||
let http = reqwest::Client::new();
|
||||
|
||||
let create_response = http
|
||||
.post(live_server.http_url("/v1/processes"))
|
||||
.bearer_auth(token)
|
||||
.json(&json!({
|
||||
"command": "cat",
|
||||
"tty": true,
|
||||
"interactive": true
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.expect("create process response");
|
||||
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
|
||||
let create_body: Value = create_response.json().await.expect("create process json");
|
||||
let process_id = create_body["id"]
|
||||
.as_str()
|
||||
.expect("process id")
|
||||
.to_string();
|
||||
|
||||
let unauth_ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
|
||||
let unauth_err = connect_async(&unauth_ws_url)
|
||||
.await
|
||||
.expect_err("unauthenticated websocket handshake should fail");
|
||||
match unauth_err {
|
||||
tokio_tungstenite::tungstenite::Error::Http(response) => {
|
||||
assert_eq!(response.status().as_u16(), 401);
|
||||
}
|
||||
other => panic!("unexpected websocket auth error: {other:?}"),
|
||||
}
|
||||
|
||||
let auth_ws_url = live_server.ws_url(&format!(
|
||||
"/v1/processes/{process_id}/terminal/ws?access_token={token}"
|
||||
));
|
||||
let (mut ws, _) = connect_async(&auth_ws_url)
|
||||
.await
|
||||
.expect("authenticated websocket handshake");
|
||||
|
||||
let ready = recv_ws_message(&mut ws).await;
|
||||
let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame"))
|
||||
.expect("ready json");
|
||||
assert_eq!(ready_payload["type"], "ready");
|
||||
assert_eq!(ready_payload["processId"], process_id);
|
||||
|
||||
let _ = ws
|
||||
.send(Message::Text(json!({ "type": "close" }).to_string()))
|
||||
.await;
|
||||
let _ = ws.close(None).await;
|
||||
|
||||
let kill_response = http
|
||||
.post(live_server.http_url(&format!(
|
||||
"/v1/processes/{process_id}/kill?waitMs=1000"
|
||||
)))
|
||||
.bearer_auth(token)
|
||||
.send()
|
||||
.await
|
||||
.expect("kill process response");
|
||||
assert_eq!(kill_response.status(), reqwest::StatusCode::OK);
|
||||
|
||||
let delete_response = http
|
||||
.delete(live_server.http_url(&format!("/v1/processes/{process_id}")))
|
||||
.bearer_auth(token)
|
||||
.send()
|
||||
.await
|
||||
.expect("delete process response");
|
||||
assert_eq!(delete_response.status(), reqwest::StatusCode::NO_CONTENT);
|
||||
|
||||
live_server.shutdown().await;
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue