feat: add process management API

Introduces a complete Process Management API for Sandbox Agent with process lifecycle management (start, stop, kill, delete), one-shot command execution, log streaming via SSE and WebSocket, stdin input, and PTY/terminal support. Includes new process_runtime module for managing process state, HTTP route handlers, OpenAPI documentation, and integration tests.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
Nathan Flurry 2026-03-05 15:44:39 -08:00
parent c3a95c3611
commit 38efa316a2
10 changed files with 3947 additions and 5 deletions

View file

@ -32,7 +32,7 @@ schemars = "0.8"
utoipa = { version = "4.2", features = ["axum_extras"] }
# Web framework
axum = "0.7"
axum = { version = "0.7", features = ["ws"] }
tower = { version = "0.5", features = ["util"] }
tower-http = { version = "0.5", features = ["cors", "trace"] }

File diff suppressed because it is too large Load diff

View file

@ -55,6 +55,7 @@ insta.workspace = true
tower.workspace = true
tempfile.workspace = true
serial_test = "3.2"
tokio-tungstenite = "0.24"
[features]
test-utils = ["tempfile"]

View file

@ -3,6 +3,7 @@
mod acp_proxy_runtime;
pub mod cli;
pub mod daemon;
mod process_runtime;
pub mod router;
pub mod server_logs;
pub mod telemetry;

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,5 @@
use std::collections::{BTreeMap, HashMap};
use std::convert::Infallible;
use std::fs;
use std::io::Cursor;
use std::path::{Path as StdPath, PathBuf};
@ -6,6 +7,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use axum::body::Bytes;
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::extract::{Path, Query, State};
use axum::http::{header, HeaderMap, Request, StatusCode};
use axum::middleware::Next;
@ -13,6 +15,8 @@ use axum::response::sse::KeepAlive;
use axum::response::{IntoResponse, Response, Sse};
use axum::routing::{delete, get, post};
use axum::{Json, Router};
use futures::stream;
use futures::StreamExt;
use sandbox_agent_agent_management::agents::{
AgentId, AgentManager, InstallOptions, InstallResult, InstallSource, InstalledArtifactKind,
};
@ -27,11 +31,16 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tar::Archive;
use tokio_stream::wrappers::BroadcastStream;
use tower_http::trace::TraceLayer;
use tracing::Span;
use utoipa::{Modify, OpenApi, ToSchema};
use crate::acp_proxy_runtime::{AcpProxyRuntime, ProxyPostOutcome};
use crate::process_runtime::{
decode_input_bytes, ProcessLogFilter, ProcessLogFilterStream, ProcessRuntime,
ProcessRuntimeConfig, ProcessSnapshot, ProcessStartSpec, ProcessStatus, ProcessStream, RunSpec,
};
use crate::ui;
mod support;
@ -77,6 +86,7 @@ pub struct AppState {
agent_manager: Arc<AgentManager>,
acp_proxy: Arc<AcpProxyRuntime>,
opencode_server_manager: Arc<OpenCodeServerManager>,
process_runtime: Arc<ProcessRuntime>,
pub(crate) branding: BrandingMode,
version_cache: Mutex<HashMap<AgentId, CachedAgentVersion>>,
}
@ -100,11 +110,13 @@ impl AppState {
auto_restart: true,
},
));
let process_runtime = Arc::new(ProcessRuntime::new());
Self {
auth,
agent_manager,
acp_proxy,
opencode_server_manager,
process_runtime,
branding,
version_cache: Mutex::new(HashMap::new()),
}
@ -122,6 +134,10 @@ impl AppState {
self.opencode_server_manager.clone()
}
pub(crate) fn process_runtime(&self) -> Arc<ProcessRuntime> {
self.process_runtime.clone()
}
pub(crate) fn purge_version_cache(&self, agent: AgentId) {
self.version_cache.lock().unwrap().remove(&agent);
}
@ -166,6 +182,28 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
.route("/fs/move", post(post_v1_fs_move))
.route("/fs/stat", get(get_v1_fs_stat))
.route("/fs/upload-batch", post(post_v1_fs_upload_batch))
.route(
"/processes/config",
get(get_v1_processes_config).post(post_v1_processes_config),
)
.route("/processes", get(get_v1_processes).post(post_v1_processes))
.route("/processes/run", post(post_v1_processes_run))
.route(
"/processes/:id",
get(get_v1_process).delete(delete_v1_process),
)
.route("/processes/:id/stop", post(post_v1_process_stop))
.route("/processes/:id/kill", post(post_v1_process_kill))
.route("/processes/:id/logs", get(get_v1_process_logs))
.route("/processes/:id/input", post(post_v1_process_input))
.route(
"/processes/:id/terminal/resize",
post(post_v1_process_terminal_resize),
)
.route(
"/processes/:id/terminal/ws",
get(get_v1_process_terminal_ws),
)
.route(
"/config/mcp",
get(get_v1_config_mcp)
@ -295,6 +333,19 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
post_v1_fs_move,
get_v1_fs_stat,
post_v1_fs_upload_batch,
get_v1_processes_config,
post_v1_processes_config,
post_v1_processes,
post_v1_processes_run,
get_v1_processes,
get_v1_process,
post_v1_process_stop,
post_v1_process_kill,
delete_v1_process,
get_v1_process_logs,
post_v1_process_input,
post_v1_process_terminal_resize,
get_v1_process_terminal_ws,
get_v1_config_mcp,
put_v1_config_mcp,
delete_v1_config_mcp,
@ -329,6 +380,22 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
FsMoveResponse,
FsActionResponse,
FsUploadBatchResponse,
ProcessConfig,
ProcessCreateRequest,
ProcessRunRequest,
ProcessRunResponse,
ProcessState,
ProcessInfo,
ProcessListResponse,
ProcessLogsStream,
ProcessLogsQuery,
ProcessLogEntry,
ProcessLogsResponse,
ProcessInputRequest,
ProcessInputResponse,
ProcessSignalQuery,
ProcessTerminalResizeRequest,
ProcessTerminalResizeResponse,
AcpPostQuery,
AcpServerInfo,
AcpServerListResponse,
@ -361,12 +428,21 @@ impl Modify for ServerAddon {
pub enum ApiError {
#[error(transparent)]
Sandbox(#[from] SandboxError),
#[error("problem: {0:?}")]
Problem(ProblemDetails),
}
impl From<ProblemDetails> for ApiError {
fn from(value: ProblemDetails) -> Self {
Self::Problem(value)
}
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
let problem = match &self {
ApiError::Sandbox(error) => problem_from_sandbox_error(error),
ApiError::Problem(problem) => problem.clone(),
};
let status =
StatusCode::from_u16(problem.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
@ -1075,6 +1151,617 @@ async fn post_v1_fs_upload_batch(
}))
}
#[utoipa::path(
get,
path = "/v1/processes/config",
tag = "v1",
responses(
(status = 200, description = "Current runtime process config", body = ProcessConfig),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_processes_config(
State(state): State<Arc<AppState>>,
) -> Result<Json<ProcessConfig>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let config = state.process_runtime().get_config().await;
Ok(Json(map_process_config(config)))
}
#[utoipa::path(
post,
path = "/v1/processes/config",
tag = "v1",
request_body = ProcessConfig,
responses(
(status = 200, description = "Updated runtime process config", body = ProcessConfig),
(status = 400, description = "Invalid config", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_processes_config(
State(state): State<Arc<AppState>>,
Json(body): Json<ProcessConfig>,
) -> Result<Json<ProcessConfig>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let updated = runtime
.set_config(into_runtime_process_config(body))
.await?;
Ok(Json(map_process_config(updated)))
}
#[utoipa::path(
post,
path = "/v1/processes",
tag = "v1",
request_body = ProcessCreateRequest,
responses(
(status = 200, description = "Started process", body = ProcessInfo),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 409, description = "Process limit or state conflict", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_processes(
State(state): State<Arc<AppState>>,
Json(body): Json<ProcessCreateRequest>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let snapshot = runtime
.start_process(ProcessStartSpec {
command: body.command,
args: body.args,
cwd: body.cwd,
env: body.env.into_iter().collect(),
tty: body.tty,
interactive: body.interactive,
})
.await?;
Ok(Json(map_process_snapshot(snapshot)))
}
#[utoipa::path(
post,
path = "/v1/processes/run",
tag = "v1",
request_body = ProcessRunRequest,
responses(
(status = 200, description = "One-off command result", body = ProcessRunResponse),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_processes_run(
State(state): State<Arc<AppState>>,
Json(body): Json<ProcessRunRequest>,
) -> Result<Json<ProcessRunResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let output = runtime
.run_once(RunSpec {
command: body.command,
args: body.args,
cwd: body.cwd,
env: body.env.into_iter().collect(),
timeout_ms: body.timeout_ms,
max_output_bytes: body.max_output_bytes,
})
.await?;
Ok(Json(ProcessRunResponse {
exit_code: output.exit_code,
timed_out: output.timed_out,
stdout: output.stdout,
stderr: output.stderr,
stdout_truncated: output.stdout_truncated,
stderr_truncated: output.stderr_truncated,
duration_ms: output.duration_ms,
}))
}
#[utoipa::path(
get,
path = "/v1/processes",
tag = "v1",
responses(
(status = 200, description = "List processes", body = ProcessListResponse),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_processes(
State(state): State<Arc<AppState>>,
) -> Result<Json<ProcessListResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshots = state.process_runtime().list_processes().await;
Ok(Json(ProcessListResponse {
processes: snapshots.into_iter().map(map_process_snapshot).collect(),
}))
}
#[utoipa::path(
get,
path = "/v1/processes/{id}",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
responses(
(status = 200, description = "Process details", body = ProcessInfo),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_process(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshot = state.process_runtime().snapshot(&id).await?;
Ok(Json(map_process_snapshot(snapshot)))
}
#[utoipa::path(
post,
path = "/v1/processes/{id}/stop",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("waitMs" = Option<u64>, Query, description = "Wait up to N ms for process to exit")
),
responses(
(status = 200, description = "Stop signal sent", body = ProcessInfo),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_stop(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(query): Query<ProcessSignalQuery>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshot = state
.process_runtime()
.stop_process(&id, query.wait_ms)
.await?;
Ok(Json(map_process_snapshot(snapshot)))
}
#[utoipa::path(
post,
path = "/v1/processes/{id}/kill",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("waitMs" = Option<u64>, Query, description = "Wait up to N ms for process to exit")
),
responses(
(status = 200, description = "Kill signal sent", body = ProcessInfo),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_kill(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(query): Query<ProcessSignalQuery>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshot = state
.process_runtime()
.kill_process(&id, query.wait_ms)
.await?;
Ok(Json(map_process_snapshot(snapshot)))
}
#[utoipa::path(
delete,
path = "/v1/processes/{id}",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
responses(
(status = 204, description = "Process deleted"),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 409, description = "Process is still running", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn delete_v1_process(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<StatusCode, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
state.process_runtime().delete_process(&id).await?;
Ok(StatusCode::NO_CONTENT)
}
#[utoipa::path(
get,
path = "/v1/processes/{id}/logs",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("stream" = Option<ProcessLogsStream>, Query, description = "stdout|stderr|combined|pty"),
("tail" = Option<usize>, Query, description = "Tail N entries"),
("follow" = Option<bool>, Query, description = "Follow via SSE"),
("since" = Option<u64>, Query, description = "Only entries with sequence greater than this")
),
responses(
(status = 200, description = "Process logs", body = ProcessLogsResponse),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_process_logs(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
headers: HeaderMap,
Query(query): Query<ProcessLogsQuery>,
) -> Result<Response, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let default_stream = if runtime.is_tty(&id).await? {
ProcessLogsStream::Pty
} else {
ProcessLogsStream::Combined
};
let requested_stream = query.stream.unwrap_or(default_stream);
let since = match (query.since, parse_last_event_id(&headers)?) {
(Some(query_since), Some(last_event_id)) => Some(query_since.max(last_event_id)),
(Some(query_since), None) => Some(query_since),
(None, Some(last_event_id)) => Some(last_event_id),
(None, None) => None,
};
let filter = ProcessLogFilter {
stream: into_runtime_log_stream(requested_stream),
tail: query.tail,
since,
};
let entries = runtime.logs(&id, filter).await?;
let response_entries: Vec<ProcessLogEntry> =
entries.iter().cloned().map(map_process_log_line).collect();
if query.follow.unwrap_or(false) {
let rx = runtime.subscribe_logs(&id).await?;
let replay_stream = stream::iter(response_entries.into_iter().map(|entry| {
Ok::<axum::response::sse::Event, Infallible>(
axum::response::sse::Event::default()
.event("log")
.id(entry.sequence.to_string())
.data(serde_json::to_string(&entry).unwrap_or_else(|_| "{}".to_string())),
)
}));
let requested_stream_copy = requested_stream;
let follow_stream = BroadcastStream::new(rx).filter_map(move |item| {
let requested_stream_copy = requested_stream_copy;
async move {
match item {
Ok(line) => {
let entry = map_process_log_line(line);
if process_log_matches(&entry, requested_stream_copy) {
Some(Ok(axum::response::sse::Event::default()
.event("log")
.id(entry.sequence.to_string())
.data(
serde_json::to_string(&entry)
.unwrap_or_else(|_| "{}".to_string()),
)))
} else {
None
}
}
Err(_) => None,
}
}
});
let stream = replay_stream.chain(follow_stream);
let response =
Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)));
return Ok(response.into_response());
}
Ok(Json(ProcessLogsResponse {
process_id: id,
stream: requested_stream,
entries: response_entries,
})
.into_response())
}
#[utoipa::path(
post,
path = "/v1/processes/{id}/input",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
request_body = ProcessInputRequest,
responses(
(status = 200, description = "Input accepted", body = ProcessInputResponse),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 413, description = "Input exceeds configured limit", body = ProblemDetails),
(status = 409, description = "Process not writable", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_input(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Json(body): Json<ProcessInputRequest>,
) -> Result<Json<ProcessInputResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let encoding = body.encoding.unwrap_or_else(|| "base64".to_string());
let input = decode_input_bytes(&body.data, &encoding)?;
let runtime = state.process_runtime();
let max_input = runtime.max_input_bytes().await;
if input.len() > max_input {
return Err(SandboxError::InvalidRequest {
message: format!("input payload exceeds maxInputBytesPerRequest ({max_input})"),
}
.into());
}
let bytes_written = runtime.write_input(&id, &input).await?;
Ok(Json(ProcessInputResponse { bytes_written }))
}
#[utoipa::path(
post,
path = "/v1/processes/{id}/terminal/resize",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
request_body = ProcessTerminalResizeRequest,
responses(
(status = 200, description = "Resize accepted", body = ProcessTerminalResizeResponse),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 409, description = "Not a terminal process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_terminal_resize(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Json(body): Json<ProcessTerminalResizeRequest>,
) -> Result<Json<ProcessTerminalResizeResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
state
.process_runtime()
.resize_terminal(&id, body.cols, body.rows)
.await?;
Ok(Json(ProcessTerminalResizeResponse {
cols: body.cols,
rows: body.rows,
}))
}
#[utoipa::path(
get,
path = "/v1/processes/{id}/terminal/ws",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("access_token" = Option<String>, Query, description = "Bearer token alternative for WS auth")
),
responses(
(status = 101, description = "WebSocket upgraded"),
(status = 400, description = "Invalid websocket frame or upgrade request", body = ProblemDetails),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 409, description = "Not a terminal process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_process_terminal_ws(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(_query): Query<ProcessWsQuery>,
ws: WebSocketUpgrade,
) -> Result<Response, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
if !runtime.is_tty(&id).await? {
return Err(SandboxError::Conflict {
message: "process is not running in tty mode".to_string(),
}
.into());
}
Ok(ws
.on_upgrade(move |socket| process_terminal_ws_session(socket, runtime, id))
.into_response())
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
enum TerminalClientFrame {
Input {
data: String,
#[serde(default)]
encoding: Option<String>,
},
Resize {
cols: u16,
rows: u16,
},
Close,
}
async fn process_terminal_ws_session(
mut socket: WebSocket,
runtime: Arc<ProcessRuntime>,
id: String,
) {
let _ = send_ws_json(
&mut socket,
json!({
"type": "ready",
"processId": &id,
}),
)
.await;
let mut log_rx = match runtime.subscribe_logs(&id).await {
Ok(rx) => rx,
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
let _ = socket.close().await;
return;
}
};
let mut exit_poll = tokio::time::interval(Duration::from_millis(150));
loop {
tokio::select! {
ws_in = socket.recv() => {
match ws_in {
Some(Ok(Message::Binary(_))) => {
let _ = send_ws_error(&mut socket, "binary input is not supported; use text JSON frames").await;
}
Some(Ok(Message::Text(text))) => {
let parsed = serde_json::from_str::<TerminalClientFrame>(&text);
match parsed {
Ok(TerminalClientFrame::Input { data, encoding }) => {
let input = match decode_input_bytes(&data, encoding.as_deref().unwrap_or("utf8")) {
Ok(input) => input,
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
continue;
}
};
if let Err(err) = runtime.write_input(&id, &input).await {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
}
}
Ok(TerminalClientFrame::Resize { cols, rows }) => {
if let Err(err) = runtime.resize_terminal(&id, cols, rows).await {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
}
}
Ok(TerminalClientFrame::Close) => {
let _ = socket.close().await;
break;
}
Err(err) => {
let _ = send_ws_error(&mut socket, &format!("invalid terminal frame: {err}")).await;
}
}
}
Some(Ok(Message::Ping(payload))) => {
let _ = socket.send(Message::Pong(payload)).await;
}
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Pong(_))) => {}
Some(Err(_)) => break,
}
}
log_in = log_rx.recv() => {
match log_in {
Ok(line) => {
if line.stream != ProcessStream::Pty {
continue;
}
let bytes = {
use base64::engine::general_purpose::STANDARD as BASE64_ENGINE;
use base64::Engine;
BASE64_ENGINE.decode(&line.data).unwrap_or_default()
};
if socket.send(Message::Binary(bytes)).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
_ = exit_poll.tick() => {
if let Ok(snapshot) = runtime.snapshot(&id).await {
if snapshot.status == ProcessStatus::Exited {
let _ = send_ws_json(
&mut socket,
json!({
"type": "exit",
"exitCode": snapshot.exit_code,
}),
)
.await;
let _ = socket.close().await;
break;
}
}
}
}
}
}
async fn send_ws_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {
socket
.send(Message::Text(
serde_json::to_string(&payload).map_err(|_| ())?,
))
.await
.map_err(|_| ())
}
async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> {
send_ws_json(
socket,
json!({
"type": "error",
"message": message,
}),
)
.await
}
#[utoipa::path(
get,
path = "/v1/config/mcp",
@ -1386,6 +2073,96 @@ async fn delete_v1_acp(
Ok(StatusCode::NO_CONTENT)
}
fn process_api_supported() -> bool {
!cfg!(windows)
}
fn process_api_not_supported() -> ProblemDetails {
ProblemDetails {
type_: ErrorType::InvalidRequest.as_urn().to_string(),
title: "Not Implemented".to_string(),
status: 501,
detail: Some("process API is not implemented on Windows".to_string()),
instance: None,
extensions: serde_json::Map::new(),
}
}
fn map_process_config(config: ProcessRuntimeConfig) -> ProcessConfig {
ProcessConfig {
max_concurrent_processes: config.max_concurrent_processes,
default_run_timeout_ms: config.default_run_timeout_ms,
max_run_timeout_ms: config.max_run_timeout_ms,
max_output_bytes: config.max_output_bytes,
max_log_bytes_per_process: config.max_log_bytes_per_process,
max_input_bytes_per_request: config.max_input_bytes_per_request,
}
}
fn into_runtime_process_config(config: ProcessConfig) -> ProcessRuntimeConfig {
ProcessRuntimeConfig {
max_concurrent_processes: config.max_concurrent_processes,
default_run_timeout_ms: config.default_run_timeout_ms,
max_run_timeout_ms: config.max_run_timeout_ms,
max_output_bytes: config.max_output_bytes,
max_log_bytes_per_process: config.max_log_bytes_per_process,
max_input_bytes_per_request: config.max_input_bytes_per_request,
}
}
fn map_process_snapshot(snapshot: ProcessSnapshot) -> ProcessInfo {
ProcessInfo {
id: snapshot.id,
command: snapshot.command,
args: snapshot.args,
cwd: snapshot.cwd,
tty: snapshot.tty,
interactive: snapshot.interactive,
status: match snapshot.status {
ProcessStatus::Running => ProcessState::Running,
ProcessStatus::Exited => ProcessState::Exited,
},
pid: snapshot.pid,
exit_code: snapshot.exit_code,
created_at_ms: snapshot.created_at_ms,
exited_at_ms: snapshot.exited_at_ms,
}
}
fn into_runtime_log_stream(stream: ProcessLogsStream) -> ProcessLogFilterStream {
match stream {
ProcessLogsStream::Stdout => ProcessLogFilterStream::Stdout,
ProcessLogsStream::Stderr => ProcessLogFilterStream::Stderr,
ProcessLogsStream::Combined => ProcessLogFilterStream::Combined,
ProcessLogsStream::Pty => ProcessLogFilterStream::Pty,
}
}
fn map_process_log_line(line: crate::process_runtime::ProcessLogLine) -> ProcessLogEntry {
ProcessLogEntry {
sequence: line.sequence,
stream: match line.stream {
ProcessStream::Stdout => ProcessLogsStream::Stdout,
ProcessStream::Stderr => ProcessLogsStream::Stderr,
ProcessStream::Pty => ProcessLogsStream::Pty,
},
timestamp_ms: line.timestamp_ms,
data: line.data,
encoding: line.encoding.to_string(),
}
}
fn process_log_matches(entry: &ProcessLogEntry, stream: ProcessLogsStream) -> bool {
match stream {
ProcessLogsStream::Stdout => entry.stream == ProcessLogsStream::Stdout,
ProcessLogsStream::Stderr => entry.stream == ProcessLogsStream::Stderr,
ProcessLogsStream::Combined => {
entry.stream == ProcessLogsStream::Stdout || entry.stream == ProcessLogsStream::Stderr
}
ProcessLogsStream::Pty => entry.stream == ProcessLogsStream::Pty,
}
}
fn validate_named_query(value: &str, field_name: &str) -> Result<(), SandboxError> {
if value.trim().is_empty() {
return Err(SandboxError::InvalidRequest {

View file

@ -33,7 +33,17 @@ pub(super) async fn require_token(
.and_then(|value| value.to_str().ok())
.and_then(|value| value.strip_prefix("Bearer "));
if bearer == Some(expected.as_str()) {
let allow_query_token = request.uri().path().ends_with("/terminal/ws");
let query_token = if allow_query_token {
request
.uri()
.query()
.and_then(|query| query_param(query, "access_token"))
} else {
None
};
if bearer == Some(expected.as_str()) || query_token.as_deref() == Some(expected.as_str()) {
return Ok(next.run(request).await);
}
@ -42,6 +52,13 @@ pub(super) async fn require_token(
}))
}
fn query_param(query: &str, key: &str) -> Option<String> {
query
.split('&')
.filter_map(|part| part.split_once('='))
.find_map(|(k, v)| if k == key { Some(v.to_string()) } else { None })
}
pub(super) type PinBoxSseStream = crate::acp_proxy_runtime::PinBoxSseStream;
pub(super) fn credentials_available_for(
@ -497,8 +514,16 @@ pub(super) fn problem_from_sandbox_error(error: &SandboxError) -> ProblemDetails
let mut problem = error.to_problem_details();
match error {
SandboxError::InvalidRequest { .. } => {
problem.status = 400;
SandboxError::InvalidRequest { message } => {
if message.starts_with("process not found:") {
problem.status = 404;
problem.title = "Not Found".to_string();
} else if message.starts_with("input payload exceeds maxInputBytesPerRequest") {
problem.status = 413;
problem.title = "Payload Too Large".to_string();
} else {
problem.status = 400;
}
}
SandboxError::Timeout { .. } => {
problem.status = 504;

View file

@ -362,3 +362,173 @@ pub struct AcpEnvelope {
#[serde(default)]
pub error: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessConfig {
pub max_concurrent_processes: usize,
pub default_run_timeout_ms: u64,
pub max_run_timeout_ms: u64,
pub max_output_bytes: usize,
pub max_log_bytes_per_process: usize,
pub max_input_bytes_per_request: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessCreateRequest {
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub env: BTreeMap<String, String>,
#[serde(default)]
pub tty: bool,
#[serde(default)]
pub interactive: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessRunRequest {
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub env: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_output_bytes: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessRunResponse {
pub exit_code: Option<i32>,
pub timed_out: bool,
pub stdout: String,
pub stderr: String,
pub stdout_truncated: bool,
pub stderr_truncated: bool,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessState {
Running,
Exited,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInfo {
pub id: String,
pub command: String,
pub args: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
pub tty: bool,
pub interactive: bool,
pub status: ProcessState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub exit_code: Option<i32>,
pub created_at_ms: i64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub exited_at_ms: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessListResponse {
pub processes: Vec<ProcessInfo>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessLogsStream {
Stdout,
Stderr,
Combined,
Pty,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessLogsQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub stream: Option<ProcessLogsStream>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tail: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub follow: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub since: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessLogEntry {
pub sequence: u64,
pub stream: ProcessLogsStream,
pub timestamp_ms: i64,
pub data: String,
pub encoding: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessLogsResponse {
pub process_id: String,
pub stream: ProcessLogsStream,
pub entries: Vec<ProcessLogEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInputRequest {
pub data: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub encoding: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInputResponse {
pub bytes_written: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessSignalQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wait_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessTerminalResizeRequest {
pub cols: u16,
pub rows: u16,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessTerminalResizeResponse {
pub cols: u16,
pub rows: u16,
}
#[derive(Debug, Clone, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessWsQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub access_token: Option<String>,
}

View file

@ -1,6 +1,6 @@
use std::fs;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::path::Path;
use std::time::Duration;
@ -14,6 +14,8 @@ use sandbox_agent_agent_management::agents::AgentManager;
use serde_json::{json, Value};
use serial_test::serial;
use tempfile::TempDir;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tower::util::ServiceExt;
struct TestApp {
@ -48,6 +50,56 @@ struct EnvVarGuard {
previous: Option<std::ffi::OsString>,
}
struct LiveServer {
address: SocketAddr,
shutdown_tx: Option<oneshot::Sender<()>>,
task: JoinHandle<()>,
}
impl LiveServer {
async fn spawn(app: Router) -> Self {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind live server");
let address = listener.local_addr().expect("live server address");
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let task = tokio::spawn(async move {
let server = axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(async {
let _ = shutdown_rx.await;
});
let _ = server.await;
});
Self {
address,
shutdown_tx: Some(shutdown_tx),
task,
}
}
fn http_url(&self, path: &str) -> String {
format!("http://{}{}", self.address, path)
}
fn ws_url(&self, path: &str) -> String {
format!("ws://{}{}", self.address, path)
}
async fn shutdown(mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
let _ = tokio::time::timeout(Duration::from_secs(3), async {
let _ = self.task.await;
})
.await;
}
}
impl EnvVarGuard {
fn set(key: &'static str, value: &str) -> Self {
let previous = std::env::var_os(key);
@ -291,3 +343,5 @@ mod acp_transport;
mod config_endpoints;
#[path = "v1_api/control_plane.rs"]
mod control_plane;
#[path = "v1_api/processes.rs"]
mod processes;

View file

@ -0,0 +1,661 @@
use super::*;
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::Engine;
use futures::{SinkExt, StreamExt};
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
async fn wait_for_exited(test_app: &TestApp, process_id: &str) {
for _ in 0..30 {
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
if parsed["status"] == "exited" {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
panic!("process did not exit in time");
}
fn decode_log_entries(entries: &[Value]) -> String {
entries
.iter()
.filter_map(|entry| entry.get("data").and_then(Value::as_str))
.filter_map(|encoded| BASE64.decode(encoded).ok())
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
.collect::<Vec<_>>()
.join("")
}
async fn recv_ws_message(
ws: &mut tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
) -> Message {
tokio::time::timeout(Duration::from_secs(3), ws.next())
.await
.expect("timed out waiting for websocket frame")
.expect("websocket stream ended")
.expect("websocket frame")
}
#[tokio::test]
async fn v1_processes_config_round_trip() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/processes/config",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["maxConcurrentProcesses"], 64);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/config",
Some(json!({
"maxConcurrentProcesses": 8,
"defaultRunTimeoutMs": 1000,
"maxRunTimeoutMs": 5000,
"maxOutputBytes": 4096,
"maxLogBytesPerProcess": 32768,
"maxInputBytesPerRequest": 1024
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
assert_eq!(parsed["maxConcurrentProcesses"], 8);
assert_eq!(parsed["defaultRunTimeoutMs"], 1000);
}
#[tokio::test]
async fn v1_process_lifecycle_requires_stop_before_delete() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "sh",
"args": ["-lc", "sleep 30"],
"tty": false,
"interactive": false
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, body) = send_request(
&test_app.app,
Method::DELETE,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::CONFLICT);
assert_eq!(parse_json(&body)["status"], 409);
let (status, _, _body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/stop"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
wait_for_exited(&test_app, &process_id).await;
let (status, _, _) = send_request(
&test_app.app,
Method::DELETE,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn v1_process_run_returns_output_and_timeout() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/run",
Some(json!({
"command": "sh",
"args": ["-lc", "echo hi"],
"timeoutMs": 1000
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
assert_eq!(parsed["timedOut"], false);
assert_eq!(parsed["exitCode"], 0);
assert!(parsed["stdout"].as_str().unwrap_or_default().contains("hi"));
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/run",
Some(json!({
"command": "sh",
"args": ["-lc", "sleep 2"],
"timeoutMs": 50
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["timedOut"], true);
}
#[tokio::test]
async fn v1_process_run_reports_truncation() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/run",
Some(json!({
"command": "sh",
"args": ["-lc", "printf 'abcdefghijklmnopqrstuvwxyz'"],
"maxOutputBytes": 5
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
assert_eq!(parsed["stdoutTruncated"], true);
assert_eq!(parsed["stderrTruncated"], false);
assert_eq!(parsed["stdout"].as_str().unwrap_or_default().len(), 5);
}
#[tokio::test]
async fn v1_process_tty_input_and_logs() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "cat",
"tty": true,
"interactive": true
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, _body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/input"),
Some(json!({
"data": "aGVsbG8K",
"encoding": "base64"
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
tokio::time::sleep(Duration::from_millis(150)).await;
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/logs?stream=pty&tail=20"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let entries = parse_json(&body)["entries"]
.as_array()
.cloned()
.unwrap_or_default();
assert!(!entries.is_empty());
let (status, _, _body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/kill"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
wait_for_exited(&test_app, &process_id).await;
let (status, _, _) = send_request(
&test_app.app,
Method::DELETE,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn v1_process_not_found_returns_404() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/processes/does-not-exist",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_eq!(parse_json(&body)["status"], 404);
}
#[tokio::test]
async fn v1_process_input_limit_returns_413() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, _) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/config",
Some(json!({
"maxConcurrentProcesses": 8,
"defaultRunTimeoutMs": 1000,
"maxRunTimeoutMs": 5000,
"maxOutputBytes": 4096,
"maxLogBytesPerProcess": 32768,
"maxInputBytesPerRequest": 4
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "cat",
"tty": true,
"interactive": true
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/input"),
Some(json!({
"data": "aGVsbG8=",
"encoding": "base64"
})),
&[],
)
.await;
assert_eq!(status, StatusCode::PAYLOAD_TOO_LARGE);
assert_eq!(parse_json(&body)["status"], 413);
}
#[tokio::test]
async fn v1_tty_process_is_real_terminal() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "sh",
"args": ["-lc", "tty"],
"tty": true,
"interactive": false
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
wait_for_exited(&test_app, &process_id).await;
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/logs?stream=pty"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let entries = parse_json(&body)["entries"]
.as_array()
.cloned()
.unwrap_or_default();
let joined = decode_log_entries(&entries);
assert!(!joined.to_lowercase().contains("not a tty"));
assert!(joined.contains("/dev/"));
}
#[tokio::test]
async fn v1_process_logs_follow_sse_streams_entries() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "sh",
"args": ["-lc", "echo first; sleep 0.3; echo second"],
"tty": false,
"interactive": false
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let request = Request::builder()
.method(Method::GET)
.uri(format!(
"/v1/processes/{process_id}/logs?stream=stdout&follow=true"
))
.body(Body::empty())
.expect("build request");
let response = test_app
.app
.clone()
.oneshot(request)
.await
.expect("sse response");
assert_eq!(response.status(), StatusCode::OK);
let mut stream = response.into_body().into_data_stream();
let chunk = tokio::time::timeout(Duration::from_secs(5), async move {
while let Some(chunk) = stream.next().await {
let bytes = chunk.expect("stream chunk");
let text = String::from_utf8_lossy(&bytes).to_string();
if text.contains("data:") {
return text;
}
}
panic!("SSE stream ended before log chunk");
})
.await
.expect("timed out reading process log sse");
let payload = parse_sse_data(&chunk);
assert!(payload["sequence"].as_u64().is_some());
assert_eq!(payload["stream"], "stdout");
}
#[tokio::test]
async fn v1_access_token_query_only_allows_terminal_ws() {
let test_app = TestApp::new(AuthConfig::with_token("secret-token".to_string()));
let (status, _, _) = send_request(
&test_app.app,
Method::GET,
"/v1/health?access_token=secret-token",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::UNAUTHORIZED);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "cat",
"tty": true,
"interactive": true
})),
&[("authorization", "Bearer secret-token")],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, _) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/terminal/ws"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::UNAUTHORIZED);
let (status, _, _) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/terminal/ws?access_token=secret-token"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn v1_process_terminal_ws_e2e_is_deterministic() {
let test_app = TestApp::new(AuthConfig::disabled());
let live_server = LiveServer::spawn(test_app.app.clone()).await;
let http = reqwest::Client::new();
let create_response = http
.post(live_server.http_url("/v1/processes"))
.json(&json!({
"command": "sh",
"args": ["-lc", "stty -echo; IFS= read -r line; printf 'got:%s\\n' \"$line\""],
"tty": true,
"interactive": true
}))
.send()
.await
.expect("create process response");
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
let create_body: Value = create_response.json().await.expect("create process json");
let process_id = create_body["id"]
.as_str()
.expect("process id")
.to_string();
let ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
let (mut ws, _) = connect_async(&ws_url)
.await
.expect("connect websocket");
let ready = recv_ws_message(&mut ws).await;
let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame"))
.expect("ready json");
assert_eq!(ready_payload["type"], "ready");
assert_eq!(ready_payload["processId"], process_id);
ws.send(Message::Text(
json!({
"type": "input",
"data": "hello from ws\n"
})
.to_string(),
))
.await
.expect("send input frame");
let mut saw_binary_output = false;
let mut saw_exit = false;
for _ in 0..10 {
let frame = recv_ws_message(&mut ws).await;
match frame {
Message::Binary(bytes) => {
let text = String::from_utf8_lossy(&bytes);
if text.contains("got:hello from ws") {
saw_binary_output = true;
}
}
Message::Text(text) => {
let payload: Value = serde_json::from_str(&text).expect("ws json");
if payload["type"] == "exit" {
saw_exit = true;
break;
}
assert_ne!(payload["type"], "error");
}
Message::Close(_) => break,
Message::Ping(_) | Message::Pong(_) => {}
_ => {}
}
}
assert!(saw_binary_output, "expected pty binary output over websocket");
assert!(saw_exit, "expected exit control frame over websocket");
let _ = ws.close(None).await;
let delete_response = http
.delete(live_server.http_url(&format!("/v1/processes/{process_id}")))
.send()
.await
.expect("delete process response");
assert_eq!(delete_response.status(), reqwest::StatusCode::NO_CONTENT);
live_server.shutdown().await;
}
#[tokio::test]
async fn v1_process_terminal_ws_auth_e2e() {
let token = "secret-token";
let test_app = TestApp::new(AuthConfig::with_token(token.to_string()));
let live_server = LiveServer::spawn(test_app.app.clone()).await;
let http = reqwest::Client::new();
let create_response = http
.post(live_server.http_url("/v1/processes"))
.bearer_auth(token)
.json(&json!({
"command": "cat",
"tty": true,
"interactive": true
}))
.send()
.await
.expect("create process response");
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
let create_body: Value = create_response.json().await.expect("create process json");
let process_id = create_body["id"]
.as_str()
.expect("process id")
.to_string();
let unauth_ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
let unauth_err = connect_async(&unauth_ws_url)
.await
.expect_err("unauthenticated websocket handshake should fail");
match unauth_err {
tokio_tungstenite::tungstenite::Error::Http(response) => {
assert_eq!(response.status().as_u16(), 401);
}
other => panic!("unexpected websocket auth error: {other:?}"),
}
let auth_ws_url = live_server.ws_url(&format!(
"/v1/processes/{process_id}/terminal/ws?access_token={token}"
));
let (mut ws, _) = connect_async(&auth_ws_url)
.await
.expect("authenticated websocket handshake");
let ready = recv_ws_message(&mut ws).await;
let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame"))
.expect("ready json");
assert_eq!(ready_payload["type"], "ready");
assert_eq!(ready_payload["processId"], process_id);
let _ = ws
.send(Message::Text(json!({ "type": "close" }).to_string()))
.await;
let _ = ws.close(None).await;
let kill_response = http
.post(live_server.http_url(&format!(
"/v1/processes/{process_id}/kill?waitMs=1000"
)))
.bearer_auth(token)
.send()
.await
.expect("kill process response");
assert_eq!(kill_response.status(), reqwest::StatusCode::OK);
let delete_response = http
.delete(live_server.http_url(&format!("/v1/processes/{process_id}")))
.bearer_auth(token)
.send()
.await
.expect("delete process response");
assert_eq!(delete_response.status(), reqwest::StatusCode::NO_CONTENT);
live_server.shutdown().await;
}