chore: recover wellington workspace state

This commit is contained in:
Nathan Flurry 2026-03-09 19:59:55 -07:00
parent 5d65013aa5
commit c294ca85be
366 changed files with 1265 additions and 53395 deletions

View file

@ -1,17 +1,17 @@
# Server Instructions
## Architecture
## ACP v2 Architecture
- Public API routes are defined in `server/packages/sandbox-agent/src/router.rs`.
- ACP proxy runtime is in `server/packages/sandbox-agent/src/acp_proxy_runtime.rs`.
- All API endpoints are under `/v1`.
- ACP runtime/process bridge is in `server/packages/sandbox-agent/src/acp_runtime.rs`.
- `/v2` is the only active API surface for sessions/prompts (`/v2/rpc`).
- Keep binary filesystem transfer endpoints as dedicated HTTP APIs:
- `GET /v1/fs/file`
- `PUT /v1/fs/file`
- `POST /v1/fs/upload-batch`
- `GET /v2/fs/file`
- `PUT /v2/fs/file`
- `POST /v2/fs/upload-batch`
- Rationale: host-owned cross-agent-consistent behavior and large binary transfer needs that ACP JSON-RPC is not suited to stream efficiently.
- Maintain ACP variants in parallel only when they share the same underlying filesystem implementation; SDK defaults should still prefer HTTP for large/binary transfers.
- `/opencode/*` stays disabled (`503`) until Phase 7.
- `/v1/*` must remain hard-removed (`410`) and `/opencode/*` stays disabled (`503`) until Phase 7.
- Agent install logic (native + ACP agent process + lazy install) is handled by `server/packages/agent-management/`.
## API Contract Rules
@ -23,14 +23,14 @@
## Tests
Primary v1 integration coverage:
- `server/packages/sandbox-agent/tests/v1_api.rs`
- `server/packages/sandbox-agent/tests/v1_agent_process_matrix.rs`
Primary v2 integration coverage:
- `server/packages/sandbox-agent/tests/v2_api.rs`
- `server/packages/sandbox-agent/tests/v2_agent_process_matrix.rs`
Run:
```bash
cargo test -p sandbox-agent --test v1_api
cargo test -p sandbox-agent --test v1_agent_process_matrix
cargo test -p sandbox-agent --test v2_api
cargo test -p sandbox-agent --test v2_agent_process_matrix
```
## Migration Docs Sync

View file

@ -93,20 +93,6 @@ fn map_error(err: AdapterError) -> Response {
"timeout",
"timed out waiting for agent response",
),
AdapterError::Exited { exit_code, stderr } => {
let detail = if let Some(stderr) = stderr {
format!(
"agent process exited before responding (exit_code: {:?}, stderr: {})",
exit_code, stderr
)
} else {
format!(
"agent process exited before responding (exit_code: {:?})",
exit_code
)
};
problem(StatusCode::BAD_GATEWAY, "agent_exited", &detail)
}
AdapterError::Write(write) => problem(
StatusCode::BAD_GATEWAY,
"write_failed",

View file

@ -32,7 +32,6 @@ async fn main() {
}
async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let started = std::time::Instant::now();
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
@ -42,12 +41,6 @@ async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.init();
let cli = Cli::parse();
tracing::info!(
host = %cli.host,
port = cli.port,
startup_ms = started.elapsed().as_millis() as u64,
"acp-http-adapter.run: starting server"
);
run_server(ServerConfig {
host: cli.host,
port: cli.port,

View file

@ -16,7 +16,6 @@ use tokio_stream::wrappers::BroadcastStream;
use crate::registry::LaunchSpec;
const RING_BUFFER_SIZE: usize = 1024;
const STDERR_TAIL_SIZE: usize = 16;
#[derive(Debug, Error)]
pub enum AdapterError {
@ -34,11 +33,6 @@ pub enum AdapterError {
Serialize(serde_json::Error),
#[error("failed to write subprocess stdin: {0}")]
Write(std::io::Error),
#[error("agent process exited before responding")]
Exited {
exit_code: Option<i32>,
stderr: Option<String>,
},
#[error("timeout waiting for response")]
Timeout,
}
@ -67,7 +61,6 @@ pub struct AdapterRuntime {
shutting_down: AtomicBool,
spawned_at: Instant,
first_stdout: Arc<AtomicBool>,
stderr_tail: Arc<Mutex<VecDeque<String>>>,
}
impl AdapterRuntime {
@ -127,7 +120,6 @@ impl AdapterRuntime {
shutting_down: AtomicBool::new(false),
spawned_at: spawn_start,
first_stdout: Arc::new(AtomicBool::new(false)),
stderr_tail: Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_TAIL_SIZE))),
};
runtime.spawn_stdout_loop(stdout);
@ -206,16 +198,6 @@ impl AdapterRuntime {
"post: response channel dropped (agent process may have exited)"
);
self.pending.lock().await.remove(&key);
if let Some((exit_code, stderr)) = self.try_process_exit_info().await {
tracing::error!(
method = %method,
id = %key,
exit_code = ?exit_code,
stderr = ?stderr,
"post: agent process exited before response channel completed"
);
return Err(AdapterError::Exited { exit_code, stderr });
}
Err(AdapterError::Timeout)
}
Err(_) => {
@ -231,16 +213,6 @@ impl AdapterRuntime {
"post: TIMEOUT waiting for agent response"
);
self.pending.lock().await.remove(&key);
if let Some((exit_code, stderr)) = self.try_process_exit_info().await {
tracing::error!(
method = %method,
id = %key,
exit_code = ?exit_code,
stderr = ?stderr,
"post: agent process exited before timeout completed"
);
return Err(AdapterError::Exited { exit_code, stderr });
}
Err(AdapterError::Timeout)
}
}
@ -473,7 +445,6 @@ impl AdapterRuntime {
fn spawn_stderr_loop(&self, stderr: tokio::process::ChildStderr) {
let spawned_at = self.spawned_at;
let stderr_tail = self.stderr_tail.clone();
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
@ -481,13 +452,6 @@ impl AdapterRuntime {
while let Ok(Some(line)) = lines.next_line().await {
line_count += 1;
{
let mut tail = stderr_tail.lock().await;
tail.push_back(line.clone());
while tail.len() > STDERR_TAIL_SIZE {
tail.pop_front();
}
}
tracing::info!(
line_number = line_count,
age_ms = spawned_at.elapsed().as_millis() as u64,
@ -596,28 +560,6 @@ impl AdapterRuntime {
tracing::debug!(method = method, id = %id, "stdin: write+flush complete");
Ok(())
}
async fn try_process_exit_info(&self) -> Option<(Option<i32>, Option<String>)> {
let mut child = self.child.lock().await;
match child.try_wait() {
Ok(Some(status)) => {
let exit_code = status.code();
drop(child);
let stderr = self.stderr_tail_summary().await;
Some((exit_code, stderr))
}
Ok(None) => None,
Err(_) => None,
}
}
async fn stderr_tail_summary(&self) -> Option<String> {
let tail = self.stderr_tail.lock().await;
if tail.is_empty() {
return None;
}
Some(tail.iter().cloned().collect::<Vec<_>>().join("\n"))
}
}
fn id_key(value: &Value) -> String {

View file

@ -20,4 +20,3 @@ url.workspace = true
dirs.workspace = true
tempfile.workspace = true
time.workspace = true
tracing.workspace = true

View file

@ -4,7 +4,6 @@ use std::fs;
use std::io::{self, Read};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Instant;
use flate2::read::GzDecoder;
use reqwest::blocking::Client;
@ -79,7 +78,7 @@ impl AgentId {
fn agent_process_registry_id(self) -> Option<&'static str> {
match self {
AgentId::Claude => Some("claude-acp"),
AgentId::Claude => Some("claude-code-acp"),
AgentId::Codex => Some("codex-acp"),
AgentId::Opencode => Some("opencode"),
AgentId::Amp => Some("amp-acp"),
@ -91,7 +90,7 @@ impl AgentId {
fn agent_process_binary_hint(self) -> Option<&'static str> {
match self {
AgentId::Claude => Some("claude-agent-acp"),
AgentId::Claude => Some("claude-code-acp"),
AgentId::Codex => Some("codex-acp"),
AgentId::Opencode => Some("opencode"),
AgentId::Amp => Some("amp-acp"),
@ -322,14 +321,6 @@ impl AgentManager {
agent: AgentId,
options: InstallOptions,
) -> Result<InstallResult, AgentError> {
let install_started = Instant::now();
tracing::info!(
agent = agent.as_str(),
reinstall = options.reinstall,
native_version = ?options.version,
agent_process_version = ?options.agent_process_version,
"agent_manager.install: starting"
);
fs::create_dir_all(&self.install_dir)?;
fs::create_dir_all(self.install_dir.join("agent_processes"))?;
@ -354,20 +345,10 @@ impl AgentManager {
artifacts.push(artifact);
}
let result = InstallResult {
Ok(InstallResult {
artifacts,
already_installed,
};
tracing::info!(
agent = agent.as_str(),
already_installed = result.already_installed,
artifact_count = result.artifacts.len(),
total_ms = elapsed_ms(install_started),
"agent_manager.install: completed"
);
Ok(result)
})
}
pub fn is_installed(&self, agent: AgentId) -> bool {
@ -411,41 +392,25 @@ impl AgentManager {
&self,
agent: AgentId,
) -> Result<AgentProcessLaunchSpec, AgentError> {
let started = Instant::now();
if agent == AgentId::Mock {
let spec = AgentProcessLaunchSpec {
return Ok(AgentProcessLaunchSpec {
program: self.agent_process_path(agent),
args: Vec::new(),
env: HashMap::new(),
source: InstallSource::Builtin,
version: Some("builtin".to_string()),
};
tracing::info!(
agent = agent.as_str(),
source = ?spec.source,
total_ms = elapsed_ms(started),
"agent_manager.resolve_agent_process: resolved builtin"
);
return Ok(spec);
});
}
let launcher = self.agent_process_path(agent);
if launcher.exists() {
let spec = AgentProcessLaunchSpec {
return Ok(AgentProcessLaunchSpec {
program: launcher,
args: Vec::new(),
env: HashMap::new(),
source: InstallSource::LocalPath,
version: None,
};
tracing::info!(
agent = agent.as_str(),
source = ?spec.source,
program = %spec.program.display(),
total_ms = elapsed_ms(started),
"agent_manager.resolve_agent_process: resolved local launcher"
);
return Ok(spec);
});
}
if let Some(bin) = agent.agent_process_binary_hint().and_then(find_in_path) {
@ -454,47 +419,29 @@ impl AgentManager {
} else {
Vec::new()
};
let spec = AgentProcessLaunchSpec {
return Ok(AgentProcessLaunchSpec {
program: bin,
args,
env: HashMap::new(),
source: InstallSource::LocalPath,
version: None,
};
tracing::info!(
agent = agent.as_str(),
source = ?spec.source,
program = %spec.program.display(),
args = ?spec.args,
total_ms = elapsed_ms(started),
"agent_manager.resolve_agent_process: resolved PATH binary hint"
);
return Ok(spec);
});
}
if agent == AgentId::Opencode {
let native = self.resolve_binary(agent)?;
let spec = AgentProcessLaunchSpec {
return Ok(AgentProcessLaunchSpec {
program: native,
args: vec!["acp".to_string()],
env: HashMap::new(),
source: InstallSource::LocalPath,
version: None,
};
tracing::info!(
agent = agent.as_str(),
source = ?spec.source,
program = %spec.program.display(),
args = ?spec.args,
total_ms = elapsed_ms(started),
"agent_manager.resolve_agent_process: resolved opencode native"
);
return Ok(spec);
});
}
Err(AgentError::AgentProcessNotFound {
agent,
hint: Some(format!("run step 3: `sandbox-agent install-agent {agent}`")),
hint: Some("run install to provision ACP agent process".to_string()),
})
}
@ -507,23 +454,11 @@ impl AgentManager {
agent: AgentId,
options: &InstallOptions,
) -> Result<Option<InstalledArtifact>, AgentError> {
let started = Instant::now();
if !options.reinstall && self.native_installed(agent) {
tracing::info!(
agent = agent.as_str(),
total_ms = elapsed_ms(started),
"agent_manager.install_native: already installed"
);
return Ok(None);
}
let path = self.binary_path(agent);
tracing::info!(
agent = agent.as_str(),
path = %path.display(),
version_override = ?options.version,
"agent_manager.install_native: installing"
);
match agent {
AgentId::Claude => install_claude(&path, self.platform, options.version.as_deref())?,
AgentId::Codex => install_codex(&path, self.platform, options.version.as_deref())?,
@ -539,22 +474,12 @@ impl AgentManager {
}
}
let artifact = InstalledArtifact {
Ok(Some(InstalledArtifact {
kind: InstalledArtifactKind::NativeAgent,
path,
version: self.version(agent).ok().flatten(),
source: InstallSource::Fallback,
};
tracing::info!(
agent = agent.as_str(),
source = ?artifact.source,
version = ?artifact.version,
total_ms = elapsed_ms(started),
"agent_manager.install_native: completed"
);
Ok(Some(artifact))
}))
}
fn install_agent_process(
@ -562,14 +487,8 @@ impl AgentManager {
agent: AgentId,
options: &InstallOptions,
) -> Result<Option<InstalledArtifact>, AgentError> {
let started = Instant::now();
if !options.reinstall {
if self.agent_process_status(agent).is_some() {
tracing::info!(
agent = agent.as_str(),
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process: already installed"
);
return Ok(None);
}
}
@ -577,104 +496,22 @@ impl AgentManager {
if agent == AgentId::Mock {
let path = self.agent_process_path(agent);
write_mock_agent_process_launcher(&path)?;
let artifact = InstalledArtifact {
return Ok(Some(InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path,
version: Some("builtin".to_string()),
source: InstallSource::Builtin,
};
tracing::info!(
agent = agent.as_str(),
source = ?artifact.source,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process: installed builtin launcher"
);
return Ok(Some(artifact));
}));
}
if let Some(artifact) = self.install_agent_process_from_registry(agent, options)? {
tracing::info!(
agent = agent.as_str(),
source = ?artifact.source,
version = ?artifact.version,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process: installed from registry"
);
return Ok(Some(artifact));
}
let artifact = self.install_agent_process_fallback(agent, options)?;
tracing::info!(
agent = agent.as_str(),
source = ?artifact.source,
version = ?artifact.version,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process: installed from fallback"
);
Ok(Some(artifact))
}
fn install_npm_agent_process_package(
&self,
agent: AgentId,
package: &str,
args: &[String],
env: &HashMap<String, String>,
source: InstallSource,
version: Option<String>,
) -> Result<InstalledArtifact, AgentError> {
let started = Instant::now();
let root = self.agent_process_storage_dir(agent);
if root.exists() {
fs::remove_dir_all(&root)?;
}
fs::create_dir_all(&root)?;
let npm_install_started = Instant::now();
install_npm_package(&root, package, agent)?;
let npm_install_ms = elapsed_ms(npm_install_started);
let bin_name = agent.agent_process_binary_hint().ok_or_else(|| {
AgentError::ExtractFailed(format!(
"missing executable hint for agent process package: {agent}"
))
})?;
let cmd_path = npm_bin_path(&root, bin_name);
if !cmd_path.exists() {
return Err(AgentError::ExtractFailed(format!(
"installed package missing executable: {}",
cmd_path.display()
)));
}
let launcher = self.agent_process_path(agent);
let write_started = Instant::now();
write_exec_agent_process_launcher(&launcher, &cmd_path, args, env)?;
let write_ms = elapsed_ms(write_started);
let verify_started = Instant::now();
verify_command(&launcher, &[])?;
let verify_ms = elapsed_ms(verify_started);
tracing::info!(
agent = agent.as_str(),
package = %package,
cmd = %cmd_path.display(),
npm_install_ms = npm_install_ms,
write_ms = write_ms,
verify_ms = verify_ms,
total_ms = elapsed_ms(started),
"agent_manager.install_npm_agent_process_package: completed"
);
Ok(InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version,
source,
})
}
fn agent_process_status(&self, agent: AgentId) -> Option<AgentProcessStatus> {
if agent == AgentId::Mock {
return Some(AgentProcessStatus {
@ -703,111 +540,59 @@ impl AgentManager {
agent: AgentId,
options: &InstallOptions,
) -> Result<Option<InstalledArtifact>, AgentError> {
let started = Instant::now();
let Some(registry_id) = agent.agent_process_registry_id() else {
return Ok(None);
};
tracing::info!(
agent = agent.as_str(),
registry_id = registry_id,
url = %self.registry_url,
"agent_manager.install_agent_process_from_registry: fetching registry"
);
let fetch_started = Instant::now();
let registry = fetch_registry(&self.registry_url)?;
tracing::info!(
agent = agent.as_str(),
registry_id = registry_id,
fetch_ms = elapsed_ms(fetch_started),
"agent_manager.install_agent_process_from_registry: registry fetched"
);
let Some(entry) = registry.agents.into_iter().find(|a| a.id == registry_id) else {
tracing::info!(
agent = agent.as_str(),
registry_id = registry_id,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process_from_registry: missing entry"
);
return Ok(None);
};
if let Some(npx) = entry.distribution.npx {
let package =
apply_npx_version_override(&npx.package, options.agent_process_version.as_deref());
let version = options
.agent_process_version
.clone()
.or(entry.version)
.or(extract_npx_version(&package));
let artifact = self.install_npm_agent_process_package(
agent,
&package,
&npx.args,
&npx.env,
InstallSource::Registry,
version,
)?;
tracing::info!(
agent = agent.as_str(),
package = %package,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process_from_registry: npm package installed"
);
return Ok(Some(artifact));
let launcher = self.agent_process_path(agent);
write_npx_agent_process_launcher(&launcher, &package, &npx.args, &npx.env)?;
verify_command(&launcher, &[])?;
return Ok(Some(InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version: options
.agent_process_version
.clone()
.or(entry.version)
.or(extract_npx_version(&package)),
source: InstallSource::Registry,
}));
}
if let Some(binary) = entry.distribution.binary {
let key = self.platform.registry_key();
if let Some(target) = binary.get(key) {
let archive_url = Url::parse(&target.archive)?;
let download_started = Instant::now();
let payload = download_bytes(&archive_url)?;
let download_ms = elapsed_ms(download_started);
let root = self.agent_process_storage_dir(agent);
if root.exists() {
fs::remove_dir_all(&root)?;
}
fs::create_dir_all(&root)?;
let unpack_started = Instant::now();
unpack_archive(&payload, &archive_url, &root)?;
let unpack_ms = elapsed_ms(unpack_started);
let cmd_path = resolve_extracted_command(&root, &target.cmd)?;
let launcher = self.agent_process_path(agent);
let write_started = Instant::now();
write_exec_agent_process_launcher(&launcher, &cmd_path, &target.args, &target.env)?;
let write_ms = elapsed_ms(write_started);
let verify_started = Instant::now();
verify_command(&launcher, &[])?;
let verify_ms = elapsed_ms(verify_started);
let artifact = InstalledArtifact {
return Ok(Some(InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version: options.agent_process_version.clone().or(entry.version),
source: InstallSource::Registry,
};
tracing::info!(
agent = agent.as_str(),
archive_url = %archive_url,
download_ms = download_ms,
unpack_ms = unpack_ms,
write_ms = write_ms,
verify_ms = verify_ms,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process_from_registry: binary launcher installed"
);
return Ok(Some(artifact));
}));
}
}
tracing::info!(
agent = agent.as_str(),
registry_id = registry_id,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process_from_registry: no compatible distribution"
);
Ok(None)
}
@ -816,44 +601,24 @@ impl AgentManager {
agent: AgentId,
options: &InstallOptions,
) -> Result<InstalledArtifact, AgentError> {
let started = Instant::now();
let artifact = match agent {
let launcher = self.agent_process_path(agent);
match agent {
AgentId::Claude => {
let package = fallback_npx_package(
"@zed-industries/claude-agent-acp",
"@zed-industries/claude-code-acp",
options.agent_process_version.as_deref(),
);
self.install_npm_agent_process_package(
agent,
&package,
&[],
&HashMap::new(),
InstallSource::Fallback,
options
.agent_process_version
.clone()
.or(extract_npx_version(&package)),
)?
write_npx_agent_process_launcher(&launcher, &package, &[], &HashMap::new())?;
}
AgentId::Codex => {
let package = fallback_npx_package(
"@zed-industries/codex-acp",
options.agent_process_version.as_deref(),
);
self.install_npm_agent_process_package(
agent,
&package,
&[],
&HashMap::new(),
InstallSource::Fallback,
options
.agent_process_version
.clone()
.or(extract_npx_version(&package)),
)?
write_npx_agent_process_launcher(&launcher, &package, &[], &HashMap::new())?;
}
AgentId::Opencode => {
let launcher = self.agent_process_path(agent);
let native = self.resolve_binary(agent)?;
write_exec_agent_process_launcher(
&launcher,
@ -861,82 +626,37 @@ impl AgentManager {
&["acp".to_string()],
&HashMap::new(),
)?;
verify_command(&launcher, &[])?;
InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version: options.agent_process_version.clone(),
source: InstallSource::Fallback,
}
}
AgentId::Amp => {
let package =
fallback_npx_package("amp-acp", options.agent_process_version.as_deref());
self.install_npm_agent_process_package(
agent,
&package,
&[],
&HashMap::new(),
InstallSource::Fallback,
options
.agent_process_version
.clone()
.or(extract_npx_version(&package)),
)?
write_npx_agent_process_launcher(&launcher, &package, &[], &HashMap::new())?;
}
AgentId::Pi => {
let package =
fallback_npx_package("pi-acp", options.agent_process_version.as_deref());
self.install_npm_agent_process_package(
agent,
&package,
&[],
&HashMap::new(),
InstallSource::Fallback,
options
.agent_process_version
.clone()
.or(extract_npx_version(&package)),
)?
write_npx_agent_process_launcher(&launcher, &package, &[], &HashMap::new())?;
}
AgentId::Cursor => {
let package = fallback_npx_package(
"@blowmage/cursor-agent-acp",
options.agent_process_version.as_deref(),
);
self.install_npm_agent_process_package(
agent,
&package,
&[],
&HashMap::new(),
InstallSource::Fallback,
options
.agent_process_version
.clone()
.or(extract_npx_version(&package)),
)?
write_npx_agent_process_launcher(&launcher, &package, &[], &HashMap::new())?;
}
AgentId::Mock => {
let launcher = self.agent_process_path(agent);
write_mock_agent_process_launcher(&launcher)?;
InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version: options.agent_process_version.clone(),
source: InstallSource::Fallback,
}
}
};
}
tracing::info!(
agent = agent.as_str(),
source = ?artifact.source,
version = ?artifact.version,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process_fallback: launcher installed"
);
verify_command(&launcher, &[])?;
Ok(artifact)
Ok(InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version: options.agent_process_version.clone(),
source: InstallSource::Fallback,
})
}
}
@ -1012,10 +732,6 @@ pub enum AgentError {
RegistryParse(String),
#[error("command verification failed: {0}")]
VerifyFailed(String),
#[error(
"npm is required to install {agent}. install npm, then run step 3: `sandbox-agent install-agent {agent}`"
)]
MissingNpm { agent: AgentId },
}
fn fallback_npx_package(base: &str, version: Option<&str>) -> String {
@ -1063,36 +779,15 @@ fn split_package_version(package: &str) -> Option<(&str, &str)> {
}
}
fn install_npm_package(root: &Path, package: &str, agent: AgentId) -> Result<(), AgentError> {
let mut command = Command::new("npm");
command
.arg("install")
.arg("--no-audit")
.arg("--no-fund")
.arg("--prefix")
.arg(root)
.arg(package)
.stdout(Stdio::null())
.stderr(Stdio::null());
match command.status() {
Ok(status) if status.success() => Ok(()),
Ok(status) => Err(AgentError::VerifyFailed(format!(
"npm install failed for {agent} with status {status}. run step 3: `sandbox-agent install-agent {agent}`"
))),
Err(err) if err.kind() == io::ErrorKind::NotFound => Err(AgentError::MissingNpm { agent }),
Err(err) => Err(AgentError::VerifyFailed(format!(
"failed to execute npm for {agent}: {err}"
))),
}
}
fn npm_bin_path(root: &Path, bin_name: &str) -> PathBuf {
let mut path = root.join("node_modules").join(".bin").join(bin_name);
if cfg!(windows) {
path.set_extension("cmd");
}
path
fn write_npx_agent_process_launcher(
path: &Path,
package: &str,
args: &[String],
env: &HashMap<String, String>,
) -> Result<(), AgentError> {
let mut command = vec!["npx".to_string(), "-y".to_string(), package.to_string()];
command.extend(args.iter().cloned());
write_launcher(path, &command, env)
}
fn write_exec_agent_process_launcher(
@ -1303,15 +998,6 @@ fn install_claude(
platform: Platform,
version: Option<&str>,
) -> Result<(), AgentError> {
let started = Instant::now();
tracing::info!(
path = %path.display(),
platform = ?platform,
version_override = ?version,
"agent_manager.install_claude: starting"
);
let version_started = Instant::now();
let version = match version {
Some(version) => version.to_string(),
None => {
@ -1323,7 +1009,6 @@ fn install_claude(
text.trim().to_string()
}
};
let version_ms = elapsed_ms(version_started);
let platform_segment = match platform {
Platform::LinuxX64 => "linux-x64",
@ -1338,26 +1023,12 @@ fn install_claude(
let url = Url::parse(&format!(
"https://storage.googleapis.com/claude-code-dist-86c565f3-f756-42ad-8dfa-d59b1c096819/claude-code-releases/{version}/{platform_segment}/claude"
))?;
let download_started = Instant::now();
let bytes = download_bytes(&url)?;
let download_ms = elapsed_ms(download_started);
let write_started = Instant::now();
write_executable(path, &bytes)?;
tracing::info!(
version = %version,
url = %url,
bytes = bytes.len(),
version_ms = version_ms,
download_ms = download_ms,
write_ms = elapsed_ms(write_started),
total_ms = elapsed_ms(started),
"agent_manager.install_claude: completed"
);
Ok(())
}
fn install_amp(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> {
let started = Instant::now();
let version = match version {
Some(version) => version.to_string(),
None => {
@ -1382,25 +1053,12 @@ fn install_amp(path: &Path, platform: Platform, version: Option<&str>) -> Result
let url = Url::parse(&format!(
"https://storage.googleapis.com/amp-public-assets-prod-0/cli/{version}/amp-{platform_segment}"
))?;
let download_started = Instant::now();
let bytes = download_bytes(&url)?;
let download_ms = elapsed_ms(download_started);
let write_started = Instant::now();
write_executable(path, &bytes)?;
tracing::info!(
version = %version,
url = %url,
bytes = bytes.len(),
download_ms = download_ms,
write_ms = elapsed_ms(write_started),
total_ms = elapsed_ms(started),
"agent_manager.install_amp: completed"
);
Ok(())
}
fn install_codex(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> {
let started = Instant::now();
let target = match platform {
Platform::LinuxX64 | Platform::LinuxX64Musl => "x86_64-unknown-linux-musl",
Platform::LinuxArm64 => "aarch64-unknown-linux-musl",
@ -1419,15 +1077,11 @@ fn install_codex(path: &Path, platform: Platform, version: Option<&str>) -> Resu
))?,
};
let download_started = Instant::now();
let bytes = download_bytes(&url)?;
let download_ms = elapsed_ms(download_started);
let temp_dir = tempfile::tempdir()?;
let unpack_started = Instant::now();
let cursor = io::Cursor::new(bytes);
let mut archive = tar::Archive::new(GzDecoder::new(cursor));
archive.unpack(temp_dir.path())?;
let unpack_ms = elapsed_ms(unpack_started);
let expected = if cfg!(windows) {
format!("codex-{target}.exe")
@ -1437,17 +1091,7 @@ fn install_codex(path: &Path, platform: Platform, version: Option<&str>) -> Resu
let binary = find_file_recursive(temp_dir.path(), &expected)?
.ok_or_else(|| AgentError::ExtractFailed(format!("missing {expected}")))?;
let move_started = Instant::now();
move_executable(&binary, path)?;
tracing::info!(
url = %url,
target = target,
download_ms = download_ms,
unpack_ms = unpack_ms,
move_ms = elapsed_ms(move_started),
total_ms = elapsed_ms(started),
"agent_manager.install_codex: completed"
);
Ok(())
}
@ -1456,15 +1100,7 @@ fn install_opencode(
platform: Platform,
version: Option<&str>,
) -> Result<(), AgentError> {
let started = Instant::now();
tracing::info!(
path = %path.display(),
platform = ?platform,
version_override = ?version,
"agent_manager.install_opencode: starting"
);
let result = match platform {
match platform {
Platform::MacosArm64 => {
let url = match version {
Some(version) => Url::parse(&format!(
@ -1505,46 +1141,22 @@ fn install_opencode(
))?,
};
let download_started = Instant::now();
let bytes = download_bytes(&url)?;
let download_ms = elapsed_ms(download_started);
let temp_dir = tempfile::tempdir()?;
let unpack_started = Instant::now();
let cursor = io::Cursor::new(bytes);
let mut archive = tar::Archive::new(GzDecoder::new(cursor));
archive.unpack(temp_dir.path())?;
let unpack_ms = elapsed_ms(unpack_started);
let binary = find_file_recursive(temp_dir.path(), "opencode")
.or_else(|_| find_file_recursive(temp_dir.path(), "opencode.exe"))?
.ok_or_else(|| AgentError::ExtractFailed("missing opencode".to_string()))?;
let move_started = Instant::now();
move_executable(&binary, path)?;
tracing::info!(
url = %url,
download_ms = download_ms,
unpack_ms = unpack_ms,
move_ms = elapsed_ms(move_started),
"agent_manager.install_opencode: tarball extraction complete"
);
Ok(())
}
};
if result.is_ok() {
tracing::info!(
total_ms = elapsed_ms(started),
"agent_manager.install_opencode: completed"
);
}
result
}
fn install_zip_binary(path: &Path, url: &Url, binary_name: &str) -> Result<(), AgentError> {
let started = Instant::now();
let download_started = Instant::now();
let bytes = download_bytes(url)?;
let download_ms = elapsed_ms(download_started);
let reader = io::Cursor::new(bytes);
let mut archive =
zip::ZipArchive::new(reader).map_err(|err| AgentError::ExtractFailed(err.to_string()))?;
@ -1561,16 +1173,7 @@ fn install_zip_binary(path: &Path, url: &Url, binary_name: &str) -> Result<(), A
let out_path = temp_dir.path().join(binary_name);
let mut out_file = fs::File::create(&out_path)?;
io::copy(&mut file, &mut out_file)?;
let move_started = Instant::now();
move_executable(&out_path, path)?;
tracing::info!(
url = %url,
binary_name = binary_name,
download_ms = download_ms,
move_ms = elapsed_ms(move_started),
total_ms = elapsed_ms(started),
"agent_manager.install_zip_binary: completed"
);
return Ok(());
}
Err(AgentError::ExtractFailed(format!("missing {binary_name}")))
@ -1628,10 +1231,6 @@ fn find_file_recursive(dir: &Path, filename: &str) -> Result<Option<PathBuf>, Ag
Ok(None)
}
fn elapsed_ms(start: Instant) -> u64 {
start.elapsed().as_millis() as u64
}
fn parse_version_output(output: &std::process::Output) -> Option<String> {
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
@ -1666,38 +1265,6 @@ mod tests {
}
}
fn write_fake_npm(path: &Path) {
write_exec(
path,
r#"#!/usr/bin/env sh
set -e
prefix=""
while [ "$#" -gt 0 ]; do
case "$1" in
install|--no-audit|--no-fund)
shift
;;
--prefix)
prefix="$2"
shift 2
;;
*)
shift
;;
esac
done
[ -n "$prefix" ] || exit 1
mkdir -p "$prefix/node_modules/.bin"
for bin in claude-code-acp codex-acp amp-acp pi-acp cursor-agent-acp; do
echo '#!/usr/bin/env sh' > "$prefix/node_modules/.bin/$bin"
echo 'exit 0' >> "$prefix/node_modules/.bin/$bin"
chmod +x "$prefix/node_modules/.bin/$bin"
done
exit 0
"#,
);
}
fn env_lock() -> &'static Mutex<()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
@ -1840,7 +1407,7 @@ exit 0
let bin_dir = temp_dir.path().join("bin");
fs::create_dir_all(&bin_dir).expect("create bin dir");
write_fake_npm(&bin_dir.join("npm"));
write_exec(&bin_dir.join("npx"), "#!/usr/bin/env sh\nexit 0\n");
let original_path = std::env::var_os("PATH").unwrap_or_default();
let mut paths = vec![bin_dir.clone()];
@ -1888,8 +1455,8 @@ exit 0
let launcher =
fs::read_to_string(manager.agent_process_path(AgentId::Codex)).expect("launcher");
assert!(
launcher.contains("node_modules/.bin/codex-acp"),
"launcher should invoke installed codex executable"
launcher.contains("@example/codex-acp@9.9.9"),
"launcher should include overridden package version"
);
}
@ -1907,7 +1474,7 @@ exit 0
let bin_dir = temp_dir.path().join("bin");
fs::create_dir_all(&bin_dir).expect("create bin dir");
write_fake_npm(&bin_dir.join("npm"));
write_exec(&bin_dir.join("npx"), "#!/usr/bin/env sh\nexit 0\n");
let original_path = std::env::var_os("PATH").unwrap_or_default();
let mut paths = vec![bin_dir.clone()];
@ -1929,39 +1496,6 @@ exit 0
assert_eq!(agent_process_artifact.source, InstallSource::Fallback);
}
#[test]
fn install_returns_missing_npm_error_for_npm_backed_agents() {
let _env_lock = env_lock().lock().expect("env lock");
let temp_dir = tempfile::tempdir().expect("create tempdir");
let mut manager = AgentManager::with_platform(temp_dir.path(), Platform::LinuxX64);
write_exec(
&manager.binary_path(AgentId::Codex),
"#!/usr/bin/env sh\nexit 0\n",
);
let bin_dir = temp_dir.path().join("bin");
fs::create_dir_all(&bin_dir).expect("create bin dir");
let original_path = std::env::var_os("PATH").unwrap_or_default();
let combined_path = std::env::join_paths([bin_dir]).expect("join PATH");
let _path_guard = EnvVarGuard::set("PATH", &combined_path);
manager.registry_url = serve_registry_once(serde_json::json!({ "agents": [] }));
let error = manager
.install(AgentId::Codex, InstallOptions::default())
.expect_err("install should fail without npm");
match error {
AgentError::MissingNpm { agent } => assert_eq!(agent, AgentId::Codex),
other => panic!("expected MissingNpm, got {other:?}"),
}
drop(original_path);
}
#[test]
fn reinstall_mock_returns_agent_process_artifact() {
let temp_dir = tempfile::tempdir().expect("create tempdir");
@ -1988,7 +1522,7 @@ exit 0
}
#[test]
fn install_pi_skips_native_and_installs_fallback_npm_launcher() {
fn install_pi_skips_native_and_writes_fallback_npx_launcher() {
let _env_lock = env_lock().lock().expect("env lock");
let temp_dir = tempfile::tempdir().expect("create tempdir");
@ -1996,7 +1530,7 @@ exit 0
let bin_dir = temp_dir.path().join("bin");
fs::create_dir_all(&bin_dir).expect("create bin dir");
write_fake_npm(&bin_dir.join("npm"));
write_exec(&bin_dir.join("npx"), "#!/usr/bin/env sh\nexit 0\n");
let original_path = std::env::var_os("PATH").unwrap_or_default();
let mut paths = vec![bin_dir.clone()];
@ -2030,8 +1564,8 @@ exit 0
let launcher =
fs::read_to_string(manager.agent_process_path(AgentId::Pi)).expect("read pi launcher");
assert!(
launcher.contains("node_modules/.bin/pi-acp"),
"pi launcher should use installed pi executable"
launcher.contains("pi-acp"),
"pi launcher should reference pi-acp package"
);
// resolve_agent_process should now find it.
@ -2056,7 +1590,7 @@ exit 0
}
#[test]
fn install_cursor_skips_native_and_installs_fallback_npm_launcher() {
fn install_cursor_skips_native_and_writes_fallback_npx_launcher() {
let _env_lock = env_lock().lock().expect("env lock");
let temp_dir = tempfile::tempdir().expect("create tempdir");
@ -2064,7 +1598,7 @@ exit 0
let bin_dir = temp_dir.path().join("bin");
fs::create_dir_all(&bin_dir).expect("create bin dir");
write_fake_npm(&bin_dir.join("npm"));
write_exec(&bin_dir.join("npx"), "#!/usr/bin/env sh\nexit 0\n");
let original_path = std::env::var_os("PATH").unwrap_or_default();
let mut paths = vec![bin_dir.clone()];
@ -2096,8 +1630,8 @@ exit 0
let launcher = fs::read_to_string(manager.agent_process_path(AgentId::Cursor))
.expect("read cursor launcher");
assert!(
launcher.contains("node_modules/.bin/cursor-agent-acp"),
"cursor launcher should use installed cursor executable"
launcher.contains("@blowmage/cursor-agent-acp"),
"cursor launcher should reference @blowmage/cursor-agent-acp package"
);
let spec = manager

View file

@ -17,7 +17,6 @@ pub enum ErrorType {
PermissionDenied,
NotAcceptable,
UnsupportedMediaType,
NotFound,
SessionNotFound,
SessionAlreadyExists,
ModeNotSupported,
@ -38,7 +37,6 @@ impl ErrorType {
Self::PermissionDenied => "urn:sandbox-agent:error:permission_denied",
Self::NotAcceptable => "urn:sandbox-agent:error:not_acceptable",
Self::UnsupportedMediaType => "urn:sandbox-agent:error:unsupported_media_type",
Self::NotFound => "urn:sandbox-agent:error:not_found",
Self::SessionNotFound => "urn:sandbox-agent:error:session_not_found",
Self::SessionAlreadyExists => "urn:sandbox-agent:error:session_already_exists",
Self::ModeNotSupported => "urn:sandbox-agent:error:mode_not_supported",
@ -59,7 +57,6 @@ impl ErrorType {
Self::PermissionDenied => "Permission Denied",
Self::NotAcceptable => "Not Acceptable",
Self::UnsupportedMediaType => "Unsupported Media Type",
Self::NotFound => "Not Found",
Self::SessionNotFound => "Session Not Found",
Self::SessionAlreadyExists => "Session Already Exists",
Self::ModeNotSupported => "Mode Not Supported",
@ -80,7 +77,6 @@ impl ErrorType {
Self::PermissionDenied => 403,
Self::NotAcceptable => 406,
Self::UnsupportedMediaType => 415,
Self::NotFound => 404,
Self::SessionNotFound => 404,
Self::SessionAlreadyExists => 409,
Self::ModeNotSupported => 400,
@ -159,8 +155,6 @@ pub enum SandboxError {
NotAcceptable { message: String },
#[error("unsupported media type: {message}")]
UnsupportedMediaType { message: String },
#[error("not found: {resource} {id}")]
NotFound { resource: String, id: String },
#[error("session not found: {session_id}")]
SessionNotFound { session_id: String },
#[error("session already exists: {session_id}")]
@ -186,7 +180,6 @@ impl SandboxError {
Self::PermissionDenied { .. } => ErrorType::PermissionDenied,
Self::NotAcceptable { .. } => ErrorType::NotAcceptable,
Self::UnsupportedMediaType { .. } => ErrorType::UnsupportedMediaType,
Self::NotFound { .. } => ErrorType::NotFound,
Self::SessionNotFound { .. } => ErrorType::SessionNotFound,
Self::SessionAlreadyExists { .. } => ErrorType::SessionAlreadyExists,
Self::ModeNotSupported { .. } => ErrorType::ModeNotSupported,
@ -271,12 +264,6 @@ impl SandboxError {
map.insert("message".to_string(), Value::String(message.clone()));
(None, None, Some(Value::Object(map)))
}
Self::NotFound { resource, id } => {
let mut map = Map::new();
map.insert("resource".to_string(), Value::String(resource.clone()));
map.insert("id".to_string(), Value::String(id.clone()));
(None, None, Some(Value::Object(map)))
}
Self::SessionNotFound { session_id } => (None, Some(session_id.clone()), None),
Self::SessionAlreadyExists { session_id } => (None, Some(session_id.clone()), None),
Self::ModeNotSupported { agent, mode } => {

View file

@ -55,7 +55,6 @@ insta.workspace = true
tower.workspace = true
tempfile.workspace = true
serial_test = "3.2"
tokio-tungstenite = "0.24"
[features]
test-utils = ["tempfile"]

View file

@ -165,7 +165,7 @@ impl AcpProxyRuntime {
error = %err,
"acp_proxy: POST → error"
);
Err(map_adapter_error(err, Some(instance.agent)))
Err(map_adapter_error(err))
}
}
}
@ -277,28 +277,27 @@ impl AcpProxyRuntime {
server_id: &str,
agent: AgentId,
) -> Result<Arc<ProxyInstance>, SandboxError> {
let total_started = std::time::Instant::now();
let start = std::time::Instant::now();
tracing::info!(
server_id = server_id,
agent = agent.as_str(),
"create_instance: starting"
);
let install_started = std::time::Instant::now();
self.ensure_installed(agent).await?;
let install_elapsed = start.elapsed();
tracing::info!(
server_id = server_id,
agent = agent.as_str(),
install_ms = install_started.elapsed().as_millis() as u64,
install_ms = install_elapsed.as_millis() as u64,
"create_instance: agent installed/verified"
);
let resolve_started = std::time::Instant::now();
let manager = self.inner.agent_manager.clone();
let launch = tokio::task::spawn_blocking(move || manager.resolve_agent_process(agent))
.await
.map_err(|err| SandboxError::StreamError {
message: format!("failed to resolve agent process launch spec: {err}"),
message: format!("failed to resolve ACP agent process launch spec: {err}"),
})?
.map_err(|err| SandboxError::StreamError {
message: err.to_string(),
@ -309,11 +308,10 @@ impl AcpProxyRuntime {
agent = agent.as_str(),
program = ?launch.program,
args = ?launch.args,
resolve_ms = resolve_started.elapsed().as_millis() as u64,
resolve_ms = start.elapsed().as_millis() as u64,
"create_instance: launch spec resolved, spawning"
);
let spawn_started = std::time::Instant::now();
let runtime = AdapterRuntime::start(
LaunchSpec {
program: launch.program,
@ -323,13 +321,12 @@ impl AcpProxyRuntime {
self.inner.request_timeout,
)
.await
.map_err(|err| map_adapter_error(err, Some(agent)))?;
.map_err(map_adapter_error)?;
let total_ms = total_started.elapsed().as_millis() as u64;
let total_ms = start.elapsed().as_millis() as u64;
tracing::info!(
server_id = server_id,
agent = agent.as_str(),
spawn_ms = spawn_started.elapsed().as_millis() as u64,
total_ms = total_ms,
"create_instance: ready"
);
@ -343,27 +340,16 @@ impl AcpProxyRuntime {
}
async fn ensure_installed(&self, agent: AgentId) -> Result<(), SandboxError> {
let started = std::time::Instant::now();
if self.inner.require_preinstall {
if !self.is_ready(agent).await {
return Err(SandboxError::AgentNotInstalled {
agent: agent.as_str().to_string(),
});
}
tracing::info!(
agent = agent.as_str(),
total_ms = started.elapsed().as_millis() as u64,
"ensure_installed: preinstall requirement satisfied"
);
return Ok(());
}
if self.is_ready(agent).await {
tracing::info!(
agent = agent.as_str(),
total_ms = started.elapsed().as_millis() as u64,
"ensure_installed: already ready"
);
return Ok(());
}
@ -377,19 +363,9 @@ impl AcpProxyRuntime {
let _guard = lock.lock().await;
if self.is_ready(agent).await {
tracing::info!(
agent = agent.as_str(),
total_ms = started.elapsed().as_millis() as u64,
"ensure_installed: became ready while waiting for lock"
);
return Ok(());
}
tracing::info!(
agent = agent.as_str(),
"ensure_installed: installing missing artifacts"
);
let install_started = std::time::Instant::now();
let manager = self.inner.agent_manager.clone();
tokio::task::spawn_blocking(move || manager.install(agent, InstallOptions::default()))
.await
@ -402,12 +378,6 @@ impl AcpProxyRuntime {
stderr: Some(err.to_string()),
})?;
tracing::info!(
agent = agent.as_str(),
install_ms = install_started.elapsed().as_millis() as u64,
total_ms = started.elapsed().as_millis() as u64,
"ensure_installed: install complete"
);
Ok(())
}
@ -462,7 +432,7 @@ impl AcpDispatch for AcpProxyRuntime {
}
}
fn map_adapter_error(err: AdapterError, agent: Option<AgentId>) -> SandboxError {
fn map_adapter_error(err: AdapterError) -> SandboxError {
match err {
AdapterError::InvalidEnvelope => SandboxError::InvalidRequest {
message: "request body must be a JSON-RPC object".to_string(),
@ -476,29 +446,6 @@ fn map_adapter_error(err: AdapterError, agent: Option<AgentId>) -> SandboxError
AdapterError::Write(error) => SandboxError::StreamError {
message: format!("failed writing to agent stdin: {error}"),
},
AdapterError::Exited { exit_code, stderr } => {
if let Some(agent) = agent {
SandboxError::AgentProcessExited {
agent: agent.as_str().to_string(),
exit_code,
stderr,
}
} else {
SandboxError::StreamError {
message: if let Some(stderr) = stderr {
format!(
"agent process exited before responding (exit_code: {:?}, stderr: {})",
exit_code, stderr
)
} else {
format!(
"agent process exited before responding (exit_code: {:?})",
exit_code
)
},
}
}
}
AdapterError::Spawn(error) => SandboxError::StreamError {
message: format!("failed to start agent process: {error}"),
},

View file

@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::io::Write;
use std::path::PathBuf;
use std::process::Command as ProcessCommand;
@ -24,7 +24,7 @@ use sandbox_agent_agent_credentials::{
ProviderCredentials,
};
use sandbox_agent_agent_management::agents::{AgentId, AgentManager, InstallOptions};
use serde::{Deserialize, Serialize};
use serde::Serialize;
use serde_json::{json, Value};
use thiserror::Error;
use tower_http::cors::{Any, CorsLayer};
@ -220,8 +220,6 @@ pub struct AgentsArgs {
pub enum AgentsCommand {
/// List all agents and install status.
List(ClientArgs),
/// Emit JSON report of model/mode/thought options for all agents.
Report(ClientArgs),
/// Install or reinstall an agent.
Install(ApiInstallAgentArgs),
}
@ -477,7 +475,6 @@ fn run_agents(command: &AgentsCommand, cli: &CliConfig) -> Result<(), CliError>
let result = call_acp_extension(&ctx, ACP_EXTENSION_AGENT_LIST_METHOD, json!({}))?;
write_stdout_line(&serde_json::to_string_pretty(&result)?)
}
AgentsCommand::Report(args) => run_agents_report(args, cli),
AgentsCommand::Install(args) => {
let ctx = ClientContext::new(cli, &args.client)?;
let mut params = serde_json::Map::new();
@ -501,223 +498,6 @@ fn run_agents(command: &AgentsCommand, cli: &CliConfig) -> Result<(), CliError>
}
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct AgentListApiResponse {
agents: Vec<AgentListApiAgent>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct AgentListApiAgent {
id: String,
installed: bool,
#[serde(default)]
config_error: Option<String>,
#[serde(default)]
config_options: Option<Vec<Value>>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawConfigOption {
#[serde(default)]
id: Option<String>,
#[serde(default)]
category: Option<String>,
#[serde(default)]
current_value: Option<Value>,
#[serde(default)]
options: Vec<RawConfigOptionChoice>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawConfigOptionChoice {
#[serde(default)]
value: Value,
#[serde(default)]
name: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct AgentConfigReport {
generated_at_ms: u128,
endpoint: String,
agents: Vec<AgentConfigReportEntry>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct AgentConfigReportEntry {
id: String,
installed: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
config_error: Option<String>,
models: AgentConfigCategoryReport,
modes: AgentConfigCategoryReport,
thought_levels: AgentConfigCategoryReport,
}
#[derive(Debug, Serialize, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
struct AgentConfigCategoryReport {
#[serde(default, skip_serializing_if = "Option::is_none")]
current_value: Option<String>,
values: Vec<AgentConfigValueReport>,
}
#[derive(Debug, Serialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
struct AgentConfigValueReport {
value: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
name: Option<String>,
}
#[derive(Clone, Copy)]
enum ConfigReportCategory {
Model,
Mode,
ThoughtLevel,
}
#[derive(Default)]
struct CategoryAccumulator {
current_value: Option<String>,
values: BTreeMap<String, Option<String>>,
}
impl CategoryAccumulator {
fn absorb(&mut self, option: &RawConfigOption) {
if self.current_value.is_none() {
self.current_value = config_value_to_string(option.current_value.as_ref());
}
for candidate in &option.options {
let Some(value) = config_value_to_string(Some(&candidate.value)) else {
continue;
};
let name = candidate
.name
.as_ref()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty());
let entry = self.values.entry(value).or_insert(None);
if entry.is_none() && name.is_some() {
*entry = name;
}
}
}
fn into_report(mut self) -> AgentConfigCategoryReport {
if let Some(current) = self.current_value.clone() {
self.values.entry(current).or_insert(None);
}
AgentConfigCategoryReport {
current_value: self.current_value,
values: self
.values
.into_iter()
.map(|(value, name)| AgentConfigValueReport { value, name })
.collect(),
}
}
}
fn run_agents_report(args: &ClientArgs, cli: &CliConfig) -> Result<(), CliError> {
let ctx = ClientContext::new(cli, args)?;
let response = ctx.get(&format!("{API_PREFIX}/agents?config=true"))?;
let status = response.status();
let text = response.text()?;
if !status.is_success() {
print_error_body(&text)?;
return Err(CliError::HttpStatus(status));
}
let parsed: AgentListApiResponse = serde_json::from_str(&text)?;
let report = build_agent_config_report(parsed, &ctx.endpoint);
write_stdout_line(&serde_json::to_string_pretty(&report)?)
}
fn build_agent_config_report(input: AgentListApiResponse, endpoint: &str) -> AgentConfigReport {
let generated_at_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis())
.unwrap_or(0);
let agents = input
.agents
.into_iter()
.map(|agent| {
let mut model = CategoryAccumulator::default();
let mut mode = CategoryAccumulator::default();
let mut thought_level = CategoryAccumulator::default();
for option_value in agent.config_options.unwrap_or_default() {
let Ok(option) = serde_json::from_value::<RawConfigOption>(option_value) else {
continue;
};
let Some(category) = option
.category
.as_deref()
.or(option.id.as_deref())
.and_then(classify_report_category)
else {
continue;
};
match category {
ConfigReportCategory::Model => model.absorb(&option),
ConfigReportCategory::Mode => mode.absorb(&option),
ConfigReportCategory::ThoughtLevel => thought_level.absorb(&option),
}
}
AgentConfigReportEntry {
id: agent.id,
installed: agent.installed,
config_error: agent.config_error,
models: model.into_report(),
modes: mode.into_report(),
thought_levels: thought_level.into_report(),
}
})
.collect();
AgentConfigReport {
generated_at_ms,
endpoint: endpoint.to_string(),
agents,
}
}
fn classify_report_category(raw: &str) -> Option<ConfigReportCategory> {
let normalized = raw
.trim()
.to_ascii_lowercase()
.replace('-', "_")
.replace(' ', "_");
match normalized.as_str() {
"model" | "model_id" => Some(ConfigReportCategory::Model),
"mode" | "agent_mode" => Some(ConfigReportCategory::Mode),
"thought" | "thoughtlevel" | "thought_level" | "thinking" | "thinking_level"
| "reasoning" | "reasoning_effort" => Some(ConfigReportCategory::ThoughtLevel),
_ => None,
}
}
fn config_value_to_string(value: Option<&Value>) -> Option<String> {
match value {
Some(Value::String(value)) => Some(value.clone()),
Some(Value::Null) | None => None,
Some(other) => Some(other.to_string()),
}
}
fn call_acp_extension(ctx: &ClientContext, method: &str, params: Value) -> Result<Value, CliError> {
let server_id = unique_cli_server_id("cli-ext");
let initialize_path = build_acp_server_path(&server_id, Some("mock"))?;
@ -1439,96 +1219,4 @@ mod tests {
.expect("build request");
assert!(request.headers().get("last-event-id").is_none());
}
#[test]
fn classify_report_category_supports_common_aliases() {
assert!(matches!(
classify_report_category("model"),
Some(ConfigReportCategory::Model)
));
assert!(matches!(
classify_report_category("mode"),
Some(ConfigReportCategory::Mode)
));
assert!(matches!(
classify_report_category("thought_level"),
Some(ConfigReportCategory::ThoughtLevel)
));
assert!(matches!(
classify_report_category("reasoning_effort"),
Some(ConfigReportCategory::ThoughtLevel)
));
assert!(classify_report_category("arbitrary").is_none());
}
#[test]
fn build_agent_config_report_extracts_model_mode_and_thought() {
let response = AgentListApiResponse {
agents: vec![AgentListApiAgent {
id: "codex".to_string(),
installed: true,
config_error: None,
config_options: Some(vec![
json!({
"id": "model",
"category": "model",
"currentValue": "gpt-5",
"options": [
{"value": "gpt-5", "name": "GPT-5"},
{"value": "gpt-5-mini", "name": "GPT-5 mini"}
]
}),
json!({
"id": "mode",
"category": "mode",
"currentValue": "default",
"options": [
{"value": "default", "name": "Default"},
{"value": "plan", "name": "Plan"}
]
}),
json!({
"id": "thought",
"category": "thought_level",
"currentValue": "medium",
"options": [
{"value": "low", "name": "Low"},
{"value": "medium", "name": "Medium"},
{"value": "high", "name": "High"}
]
}),
]),
}],
};
let report = build_agent_config_report(response, "http://127.0.0.1:2468");
let agent = report.agents.first().expect("agent report");
assert_eq!(agent.id, "codex");
assert_eq!(agent.models.current_value.as_deref(), Some("gpt-5"));
assert_eq!(agent.modes.current_value.as_deref(), Some("default"));
assert_eq!(
agent.thought_levels.current_value.as_deref(),
Some("medium")
);
let model_values: Vec<&str> = agent
.models
.values
.iter()
.map(|item| item.value.as_str())
.collect();
assert!(model_values.contains(&"gpt-5"));
assert!(model_values.contains(&"gpt-5-mini"));
let thought_values: Vec<&str> = agent
.thought_levels
.values
.iter()
.map(|item| item.value.as_str())
.collect();
assert!(thought_values.contains(&"low"));
assert!(thought_values.contains(&"medium"));
assert!(thought_values.contains(&"high"));
}
}

View file

@ -3,7 +3,6 @@
mod acp_proxy_runtime;
pub mod cli;
pub mod daemon;
mod process_runtime;
pub mod router;
pub mod server_logs;
pub mod telemetry;

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::convert::Infallible;
use std::fs;
use std::io::Cursor;
use std::path::{Path as StdPath, PathBuf};
@ -7,7 +6,6 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use axum::body::Bytes;
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::extract::{Path, Query, State};
use axum::http::{header, HeaderMap, Request, StatusCode};
use axum::middleware::Next;
@ -15,8 +13,6 @@ use axum::response::sse::KeepAlive;
use axum::response::{IntoResponse, Response, Sse};
use axum::routing::{delete, get, post};
use axum::{Json, Router};
use futures::stream;
use futures::StreamExt;
use sandbox_agent_agent_management::agents::{
AgentId, AgentManager, InstallOptions, InstallResult, InstallSource, InstalledArtifactKind,
};
@ -31,16 +27,11 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tar::Archive;
use tokio_stream::wrappers::BroadcastStream;
use tower_http::trace::TraceLayer;
use tracing::Span;
use utoipa::{Modify, OpenApi, ToSchema};
use crate::acp_proxy_runtime::{AcpProxyRuntime, ProxyPostOutcome};
use crate::process_runtime::{
decode_input_bytes, ProcessLogFilter, ProcessLogFilterStream, ProcessRuntime,
ProcessRuntimeConfig, ProcessSnapshot, ProcessStartSpec, ProcessStatus, ProcessStream, RunSpec,
};
use crate::ui;
mod support;
@ -86,7 +77,6 @@ pub struct AppState {
agent_manager: Arc<AgentManager>,
acp_proxy: Arc<AcpProxyRuntime>,
opencode_server_manager: Arc<OpenCodeServerManager>,
process_runtime: Arc<ProcessRuntime>,
pub(crate) branding: BrandingMode,
version_cache: Mutex<HashMap<AgentId, CachedAgentVersion>>,
}
@ -110,13 +100,11 @@ impl AppState {
auto_restart: true,
},
));
let process_runtime = Arc::new(ProcessRuntime::new());
Self {
auth,
agent_manager,
acp_proxy,
opencode_server_manager,
process_runtime,
branding,
version_cache: Mutex::new(HashMap::new()),
}
@ -134,10 +122,6 @@ impl AppState {
self.opencode_server_manager.clone()
}
pub(crate) fn process_runtime(&self) -> Arc<ProcessRuntime> {
self.process_runtime.clone()
}
pub(crate) fn purge_version_cache(&self, agent: AgentId) {
self.version_cache.lock().unwrap().remove(&agent);
}
@ -182,28 +166,6 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
.route("/fs/move", post(post_v1_fs_move))
.route("/fs/stat", get(get_v1_fs_stat))
.route("/fs/upload-batch", post(post_v1_fs_upload_batch))
.route(
"/processes/config",
get(get_v1_processes_config).post(post_v1_processes_config),
)
.route("/processes", get(get_v1_processes).post(post_v1_processes))
.route("/processes/run", post(post_v1_processes_run))
.route(
"/processes/:id",
get(get_v1_process).delete(delete_v1_process),
)
.route("/processes/:id/stop", post(post_v1_process_stop))
.route("/processes/:id/kill", post(post_v1_process_kill))
.route("/processes/:id/logs", get(get_v1_process_logs))
.route("/processes/:id/input", post(post_v1_process_input))
.route(
"/processes/:id/terminal/resize",
post(post_v1_process_terminal_resize),
)
.route(
"/processes/:id/terminal/ws",
get(get_v1_process_terminal_ws),
)
.route(
"/config/mcp",
get(get_v1_config_mcp)
@ -333,19 +295,6 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
post_v1_fs_move,
get_v1_fs_stat,
post_v1_fs_upload_batch,
get_v1_processes_config,
post_v1_processes_config,
post_v1_processes,
post_v1_processes_run,
get_v1_processes,
get_v1_process,
post_v1_process_stop,
post_v1_process_kill,
delete_v1_process,
get_v1_process_logs,
post_v1_process_input,
post_v1_process_terminal_resize,
get_v1_process_terminal_ws,
get_v1_config_mcp,
put_v1_config_mcp,
delete_v1_config_mcp,
@ -380,22 +329,6 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
FsMoveResponse,
FsActionResponse,
FsUploadBatchResponse,
ProcessConfig,
ProcessCreateRequest,
ProcessRunRequest,
ProcessRunResponse,
ProcessState,
ProcessInfo,
ProcessListResponse,
ProcessLogsStream,
ProcessLogsQuery,
ProcessLogEntry,
ProcessLogsResponse,
ProcessInputRequest,
ProcessInputResponse,
ProcessSignalQuery,
ProcessTerminalResizeRequest,
ProcessTerminalResizeResponse,
AcpPostQuery,
AcpServerInfo,
AcpServerListResponse,
@ -428,21 +361,12 @@ impl Modify for ServerAddon {
pub enum ApiError {
#[error(transparent)]
Sandbox(#[from] SandboxError),
#[error("problem: {0:?}")]
Problem(ProblemDetails),
}
impl From<ProblemDetails> for ApiError {
fn from(value: ProblemDetails) -> Self {
Self::Problem(value)
}
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
let problem = match &self {
ApiError::Sandbox(error) => problem_from_sandbox_error(error),
ApiError::Problem(problem) => problem.clone(),
};
let status =
StatusCode::from_u16(problem.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
@ -1151,678 +1075,6 @@ async fn post_v1_fs_upload_batch(
}))
}
/// Get process runtime configuration.
///
/// Returns the current runtime configuration for the process management API,
/// including limits for concurrency, timeouts, and buffer sizes.
#[utoipa::path(
get,
path = "/v1/processes/config",
tag = "v1",
responses(
(status = 200, description = "Current runtime process config", body = ProcessConfig),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_processes_config(
State(state): State<Arc<AppState>>,
) -> Result<Json<ProcessConfig>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let config = state.process_runtime().get_config().await;
Ok(Json(map_process_config(config)))
}
/// Update process runtime configuration.
///
/// Replaces the runtime configuration for the process management API.
/// Validates that all values are non-zero and clamps default timeout to max.
#[utoipa::path(
post,
path = "/v1/processes/config",
tag = "v1",
request_body = ProcessConfig,
responses(
(status = 200, description = "Updated runtime process config", body = ProcessConfig),
(status = 400, description = "Invalid config", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_processes_config(
State(state): State<Arc<AppState>>,
Json(body): Json<ProcessConfig>,
) -> Result<Json<ProcessConfig>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let updated = runtime
.set_config(into_runtime_process_config(body))
.await?;
Ok(Json(map_process_config(updated)))
}
/// Create a long-lived managed process.
///
/// Spawns a new process with the given command and arguments. Supports both
/// pipe-based and PTY (tty) modes. Returns the process descriptor on success.
#[utoipa::path(
post,
path = "/v1/processes",
tag = "v1",
request_body = ProcessCreateRequest,
responses(
(status = 200, description = "Started process", body = ProcessInfo),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 409, description = "Process limit or state conflict", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_processes(
State(state): State<Arc<AppState>>,
Json(body): Json<ProcessCreateRequest>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let snapshot = runtime
.start_process(ProcessStartSpec {
command: body.command,
args: body.args,
cwd: body.cwd,
env: body.env.into_iter().collect(),
tty: body.tty,
interactive: body.interactive,
})
.await?;
Ok(Json(map_process_snapshot(snapshot)))
}
/// Run a one-shot command.
///
/// Executes a command to completion and returns its stdout, stderr, exit code,
/// and duration. Supports configurable timeout and output size limits.
#[utoipa::path(
post,
path = "/v1/processes/run",
tag = "v1",
request_body = ProcessRunRequest,
responses(
(status = 200, description = "One-off command result", body = ProcessRunResponse),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_processes_run(
State(state): State<Arc<AppState>>,
Json(body): Json<ProcessRunRequest>,
) -> Result<Json<ProcessRunResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let output = runtime
.run_once(RunSpec {
command: body.command,
args: body.args,
cwd: body.cwd,
env: body.env.into_iter().collect(),
timeout_ms: body.timeout_ms,
max_output_bytes: body.max_output_bytes,
})
.await?;
Ok(Json(ProcessRunResponse {
exit_code: output.exit_code,
timed_out: output.timed_out,
stdout: output.stdout,
stderr: output.stderr,
stdout_truncated: output.stdout_truncated,
stderr_truncated: output.stderr_truncated,
duration_ms: output.duration_ms,
}))
}
/// List all managed processes.
///
/// Returns a list of all processes (running and exited) currently tracked
/// by the runtime, sorted by process ID.
#[utoipa::path(
get,
path = "/v1/processes",
tag = "v1",
responses(
(status = 200, description = "List processes", body = ProcessListResponse),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_processes(
State(state): State<Arc<AppState>>,
) -> Result<Json<ProcessListResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshots = state.process_runtime().list_processes().await;
Ok(Json(ProcessListResponse {
processes: snapshots.into_iter().map(map_process_snapshot).collect(),
}))
}
/// Get a single process by ID.
///
/// Returns the current state of a managed process including its status,
/// PID, exit code, and creation/exit timestamps.
#[utoipa::path(
get,
path = "/v1/processes/{id}",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
responses(
(status = 200, description = "Process details", body = ProcessInfo),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_process(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshot = state.process_runtime().snapshot(&id).await?;
Ok(Json(map_process_snapshot(snapshot)))
}
/// Send SIGTERM to a process.
///
/// Sends SIGTERM to the process and optionally waits up to `waitMs`
/// milliseconds for the process to exit before returning.
#[utoipa::path(
post,
path = "/v1/processes/{id}/stop",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("waitMs" = Option<u64>, Query, description = "Wait up to N ms for process to exit")
),
responses(
(status = 200, description = "Stop signal sent", body = ProcessInfo),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_stop(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(query): Query<ProcessSignalQuery>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshot = state
.process_runtime()
.stop_process(&id, query.wait_ms)
.await?;
Ok(Json(map_process_snapshot(snapshot)))
}
/// Send SIGKILL to a process.
///
/// Sends SIGKILL to the process and optionally waits up to `waitMs`
/// milliseconds for the process to exit before returning.
#[utoipa::path(
post,
path = "/v1/processes/{id}/kill",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("waitMs" = Option<u64>, Query, description = "Wait up to N ms for process to exit")
),
responses(
(status = 200, description = "Kill signal sent", body = ProcessInfo),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_kill(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(query): Query<ProcessSignalQuery>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshot = state
.process_runtime()
.kill_process(&id, query.wait_ms)
.await?;
Ok(Json(map_process_snapshot(snapshot)))
}
/// Delete a process record.
///
/// Removes a stopped process from the runtime. Returns 409 if the process
/// is still running; stop or kill it first.
#[utoipa::path(
delete,
path = "/v1/processes/{id}",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
responses(
(status = 204, description = "Process deleted"),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 409, description = "Process is still running", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn delete_v1_process(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<StatusCode, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
state.process_runtime().delete_process(&id).await?;
Ok(StatusCode::NO_CONTENT)
}
/// Fetch process logs.
///
/// Returns buffered log entries for a process. Supports filtering by stream
/// type, tail count, and sequence-based resumption. When `follow=true`,
/// returns an SSE stream that replays buffered entries then streams live output.
#[utoipa::path(
get,
path = "/v1/processes/{id}/logs",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("stream" = Option<ProcessLogsStream>, Query, description = "stdout|stderr|combined|pty"),
("tail" = Option<usize>, Query, description = "Tail N entries"),
("follow" = Option<bool>, Query, description = "Follow via SSE"),
("since" = Option<u64>, Query, description = "Only entries with sequence greater than this")
),
responses(
(status = 200, description = "Process logs", body = ProcessLogsResponse),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_process_logs(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
headers: HeaderMap,
Query(query): Query<ProcessLogsQuery>,
) -> Result<Response, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let default_stream = if runtime.is_tty(&id).await? {
ProcessLogsStream::Pty
} else {
ProcessLogsStream::Combined
};
let requested_stream = query.stream.unwrap_or(default_stream);
let since = match (query.since, parse_last_event_id(&headers)?) {
(Some(query_since), Some(last_event_id)) => Some(query_since.max(last_event_id)),
(Some(query_since), None) => Some(query_since),
(None, Some(last_event_id)) => Some(last_event_id),
(None, None) => None,
};
let filter = ProcessLogFilter {
stream: into_runtime_log_stream(requested_stream),
tail: query.tail,
since,
};
let entries = runtime.logs(&id, filter).await?;
let response_entries: Vec<ProcessLogEntry> =
entries.iter().cloned().map(map_process_log_line).collect();
if query.follow.unwrap_or(false) {
let rx = runtime.subscribe_logs(&id).await?;
let replay_stream = stream::iter(response_entries.into_iter().map(|entry| {
Ok::<axum::response::sse::Event, Infallible>(
axum::response::sse::Event::default()
.event("log")
.id(entry.sequence.to_string())
.data(serde_json::to_string(&entry).unwrap_or_else(|_| "{}".to_string())),
)
}));
let requested_stream_copy = requested_stream;
let follow_stream = BroadcastStream::new(rx).filter_map(move |item| {
let requested_stream_copy = requested_stream_copy;
async move {
match item {
Ok(line) => {
let entry = map_process_log_line(line);
if process_log_matches(&entry, requested_stream_copy) {
Some(Ok(axum::response::sse::Event::default()
.event("log")
.id(entry.sequence.to_string())
.data(
serde_json::to_string(&entry)
.unwrap_or_else(|_| "{}".to_string()),
)))
} else {
None
}
}
Err(_) => None,
}
}
});
let stream = replay_stream.chain(follow_stream);
let response =
Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)));
return Ok(response.into_response());
}
Ok(Json(ProcessLogsResponse {
process_id: id,
stream: requested_stream,
entries: response_entries,
})
.into_response())
}
/// Write input to a process.
///
/// Sends data to a process's stdin (pipe mode) or PTY writer (tty mode).
/// Data can be encoded as base64, utf8, or text. Returns 413 if the decoded
/// payload exceeds the configured `maxInputBytesPerRequest` limit.
#[utoipa::path(
post,
path = "/v1/processes/{id}/input",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
request_body = ProcessInputRequest,
responses(
(status = 200, description = "Input accepted", body = ProcessInputResponse),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 413, description = "Input exceeds configured limit", body = ProblemDetails),
(status = 409, description = "Process not writable", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_input(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Json(body): Json<ProcessInputRequest>,
) -> Result<Json<ProcessInputResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let encoding = body.encoding.unwrap_or_else(|| "base64".to_string());
let input = decode_input_bytes(&body.data, &encoding)?;
let runtime = state.process_runtime();
let max_input = runtime.max_input_bytes().await;
if input.len() > max_input {
return Err(SandboxError::InvalidRequest {
message: format!("input payload exceeds maxInputBytesPerRequest ({max_input})"),
}
.into());
}
let bytes_written = runtime.write_input(&id, &input).await?;
Ok(Json(ProcessInputResponse { bytes_written }))
}
/// Resize a process terminal.
///
/// Sets the PTY window size (columns and rows) for a tty-mode process and
/// sends SIGWINCH so the child process can adapt.
#[utoipa::path(
post,
path = "/v1/processes/{id}/terminal/resize",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
request_body = ProcessTerminalResizeRequest,
responses(
(status = 200, description = "Resize accepted", body = ProcessTerminalResizeResponse),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 409, description = "Not a terminal process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_terminal_resize(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Json(body): Json<ProcessTerminalResizeRequest>,
) -> Result<Json<ProcessTerminalResizeResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
state
.process_runtime()
.resize_terminal(&id, body.cols, body.rows)
.await?;
Ok(Json(ProcessTerminalResizeResponse {
cols: body.cols,
rows: body.rows,
}))
}
/// Open an interactive WebSocket terminal session.
///
/// Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts
/// `access_token` query param for browser-based auth (WebSocket API cannot
/// send custom headers). Streams raw PTY output as binary frames and accepts
/// JSON control frames for input, resize, and close.
#[utoipa::path(
get,
path = "/v1/processes/{id}/terminal/ws",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("access_token" = Option<String>, Query, description = "Bearer token alternative for WS auth")
),
responses(
(status = 101, description = "WebSocket upgraded"),
(status = 400, description = "Invalid websocket frame or upgrade request", body = ProblemDetails),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 409, description = "Not a terminal process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_process_terminal_ws(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(_query): Query<ProcessWsQuery>,
ws: WebSocketUpgrade,
) -> Result<Response, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
if !runtime.is_tty(&id).await? {
return Err(SandboxError::Conflict {
message: "process is not running in tty mode".to_string(),
}
.into());
}
Ok(ws
.on_upgrade(move |socket| process_terminal_ws_session(socket, runtime, id))
.into_response())
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
enum TerminalClientFrame {
Input {
data: String,
#[serde(default)]
encoding: Option<String>,
},
Resize {
cols: u16,
rows: u16,
},
Close,
}
async fn process_terminal_ws_session(
mut socket: WebSocket,
runtime: Arc<ProcessRuntime>,
id: String,
) {
let _ = send_ws_json(
&mut socket,
json!({
"type": "ready",
"processId": &id,
}),
)
.await;
let mut log_rx = match runtime.subscribe_logs(&id).await {
Ok(rx) => rx,
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
let _ = socket.close().await;
return;
}
};
let mut exit_poll = tokio::time::interval(Duration::from_millis(150));
loop {
tokio::select! {
ws_in = socket.recv() => {
match ws_in {
Some(Ok(Message::Binary(_))) => {
let _ = send_ws_error(&mut socket, "binary input is not supported; use text JSON frames").await;
}
Some(Ok(Message::Text(text))) => {
let parsed = serde_json::from_str::<TerminalClientFrame>(&text);
match parsed {
Ok(TerminalClientFrame::Input { data, encoding }) => {
let input = match decode_input_bytes(&data, encoding.as_deref().unwrap_or("utf8")) {
Ok(input) => input,
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
continue;
}
};
let max_input = runtime.max_input_bytes().await;
if input.len() > max_input {
let _ = send_ws_error(&mut socket, &format!("input payload exceeds maxInputBytesPerRequest ({max_input})")).await;
continue;
}
if let Err(err) = runtime.write_input(&id, &input).await {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
}
}
Ok(TerminalClientFrame::Resize { cols, rows }) => {
if let Err(err) = runtime.resize_terminal(&id, cols, rows).await {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
}
}
Ok(TerminalClientFrame::Close) => {
let _ = socket.close().await;
break;
}
Err(err) => {
let _ = send_ws_error(&mut socket, &format!("invalid terminal frame: {err}")).await;
}
}
}
Some(Ok(Message::Ping(payload))) => {
let _ = socket.send(Message::Pong(payload)).await;
}
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Pong(_))) => {}
Some(Err(_)) => break,
}
}
log_in = log_rx.recv() => {
match log_in {
Ok(line) => {
if line.stream != ProcessStream::Pty {
continue;
}
let bytes = {
use base64::engine::general_purpose::STANDARD as BASE64_ENGINE;
use base64::Engine;
BASE64_ENGINE.decode(&line.data).unwrap_or_default()
};
if socket.send(Message::Binary(bytes)).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
_ = exit_poll.tick() => {
if let Ok(snapshot) = runtime.snapshot(&id).await {
if snapshot.status == ProcessStatus::Exited {
let _ = send_ws_json(
&mut socket,
json!({
"type": "exit",
"exitCode": snapshot.exit_code,
}),
)
.await;
let _ = socket.close().await;
break;
}
}
}
}
}
}
async fn send_ws_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {
socket
.send(Message::Text(
serde_json::to_string(&payload).map_err(|_| ())?,
))
.await
.map_err(|_| ())
}
async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> {
send_ws_json(
socket,
json!({
"type": "error",
"message": message,
}),
)
.await
}
#[utoipa::path(
get,
path = "/v1/config/mcp",
@ -2134,96 +1386,6 @@ async fn delete_v1_acp(
Ok(StatusCode::NO_CONTENT)
}
fn process_api_supported() -> bool {
!cfg!(windows)
}
fn process_api_not_supported() -> ProblemDetails {
ProblemDetails {
type_: ErrorType::InvalidRequest.as_urn().to_string(),
title: "Not Implemented".to_string(),
status: 501,
detail: Some("process API is not implemented on Windows".to_string()),
instance: None,
extensions: serde_json::Map::new(),
}
}
fn map_process_config(config: ProcessRuntimeConfig) -> ProcessConfig {
ProcessConfig {
max_concurrent_processes: config.max_concurrent_processes,
default_run_timeout_ms: config.default_run_timeout_ms,
max_run_timeout_ms: config.max_run_timeout_ms,
max_output_bytes: config.max_output_bytes,
max_log_bytes_per_process: config.max_log_bytes_per_process,
max_input_bytes_per_request: config.max_input_bytes_per_request,
}
}
fn into_runtime_process_config(config: ProcessConfig) -> ProcessRuntimeConfig {
ProcessRuntimeConfig {
max_concurrent_processes: config.max_concurrent_processes,
default_run_timeout_ms: config.default_run_timeout_ms,
max_run_timeout_ms: config.max_run_timeout_ms,
max_output_bytes: config.max_output_bytes,
max_log_bytes_per_process: config.max_log_bytes_per_process,
max_input_bytes_per_request: config.max_input_bytes_per_request,
}
}
fn map_process_snapshot(snapshot: ProcessSnapshot) -> ProcessInfo {
ProcessInfo {
id: snapshot.id,
command: snapshot.command,
args: snapshot.args,
cwd: snapshot.cwd,
tty: snapshot.tty,
interactive: snapshot.interactive,
status: match snapshot.status {
ProcessStatus::Running => ProcessState::Running,
ProcessStatus::Exited => ProcessState::Exited,
},
pid: snapshot.pid,
exit_code: snapshot.exit_code,
created_at_ms: snapshot.created_at_ms,
exited_at_ms: snapshot.exited_at_ms,
}
}
fn into_runtime_log_stream(stream: ProcessLogsStream) -> ProcessLogFilterStream {
match stream {
ProcessLogsStream::Stdout => ProcessLogFilterStream::Stdout,
ProcessLogsStream::Stderr => ProcessLogFilterStream::Stderr,
ProcessLogsStream::Combined => ProcessLogFilterStream::Combined,
ProcessLogsStream::Pty => ProcessLogFilterStream::Pty,
}
}
fn map_process_log_line(line: crate::process_runtime::ProcessLogLine) -> ProcessLogEntry {
ProcessLogEntry {
sequence: line.sequence,
stream: match line.stream {
ProcessStream::Stdout => ProcessLogsStream::Stdout,
ProcessStream::Stderr => ProcessLogsStream::Stderr,
ProcessStream::Pty => ProcessLogsStream::Pty,
},
timestamp_ms: line.timestamp_ms,
data: line.data,
encoding: line.encoding.to_string(),
}
}
fn process_log_matches(entry: &ProcessLogEntry, stream: ProcessLogsStream) -> bool {
match stream {
ProcessLogsStream::Stdout => entry.stream == ProcessLogsStream::Stdout,
ProcessLogsStream::Stderr => entry.stream == ProcessLogsStream::Stderr,
ProcessLogsStream::Combined => {
entry.stream == ProcessLogsStream::Stdout || entry.stream == ProcessLogsStream::Stderr
}
ProcessLogsStream::Pty => entry.stream == ProcessLogsStream::Pty,
}
}
fn validate_named_query(value: &str, field_name: &str) -> Result<(), SandboxError> {
if value.trim().is_empty() {
return Err(SandboxError::InvalidRequest {

View file

@ -33,17 +33,7 @@ pub(super) async fn require_token(
.and_then(|value| value.to_str().ok())
.and_then(|value| value.strip_prefix("Bearer "));
let allow_query_token = request.uri().path().ends_with("/terminal/ws");
let query_token = if allow_query_token {
request
.uri()
.query()
.and_then(|query| query_param(query, "access_token"))
} else {
None
};
if bearer == Some(expected.as_str()) || query_token.as_deref() == Some(expected.as_str()) {
if bearer == Some(expected.as_str()) {
return Ok(next.run(request).await);
}
@ -52,50 +42,6 @@ pub(super) async fn require_token(
}))
}
fn query_param(query: &str, key: &str) -> Option<String> {
query
.split('&')
.filter_map(|part| part.split_once('='))
.find_map(|(k, v)| {
if k == key {
Some(percent_decode(v))
} else {
None
}
})
}
fn percent_decode(input: &str) -> String {
let mut output = Vec::with_capacity(input.len());
let bytes = input.as_bytes();
let mut i = 0;
while i < bytes.len() {
if bytes[i] == b'%' && i + 2 < bytes.len() {
if let (Some(hi), Some(lo)) = (hex_nibble(bytes[i + 1]), hex_nibble(bytes[i + 2])) {
output.push((hi << 4) | lo);
i += 3;
continue;
}
}
if bytes[i] == b'+' {
output.push(b' ');
} else {
output.push(bytes[i]);
}
i += 1;
}
String::from_utf8(output).unwrap_or_else(|_| input.to_string())
}
fn hex_nibble(b: u8) -> Option<u8> {
match b {
b'0'..=b'9' => Some(b - b'0'),
b'a'..=b'f' => Some(b - b'a' + 10),
b'A'..=b'F' => Some(b - b'A' + 10),
_ => None,
}
}
pub(super) type PinBoxSseStream = crate::acp_proxy_runtime::PinBoxSseStream;
pub(super) fn credentials_available_for(
@ -144,9 +90,6 @@ pub(super) fn fallback_config_options(agent: AgentId) -> Vec<Value> {
AgentId::Codex => CODEX.clone(),
AgentId::Opencode => OPENCODE.clone(),
AgentId::Cursor => CURSOR.clone(),
// Amp returns empty configOptions from session/new but exposes modes via
// the `modes` field. The model is hardcoded. Modes discovered from ACP
// session/new response (amp-acp v0.7.0).
AgentId::Amp => vec![
json!({
"id": "model",
@ -163,10 +106,12 @@ pub(super) fn fallback_config_options(agent: AgentId) -> Vec<Value> {
"name": "Mode",
"category": "mode",
"type": "select",
"currentValue": "default",
"currentValue": "smart",
"options": [
{ "value": "default", "name": "Default" },
{ "value": "bypass", "name": "Bypass" }
{ "value": "smart", "name": "Smart" },
{ "value": "deep", "name": "Deep" },
{ "value": "free", "name": "Free" },
{ "value": "rush", "name": "Rush" }
]
}),
],
@ -180,76 +125,41 @@ pub(super) fn fallback_config_options(agent: AgentId) -> Vec<Value> {
{ "value": "default", "name": "Default" }
]
})],
AgentId::Mock => vec![
json!({
"id": "model",
"name": "Model",
"category": "model",
"type": "select",
"currentValue": "mock",
"options": [
{ "value": "mock", "name": "Mock" },
{ "value": "mock-fast", "name": "Mock Fast" }
]
}),
json!({
"id": "mode",
"name": "Mode",
"category": "mode",
"type": "select",
"currentValue": "normal",
"options": [
{ "value": "normal", "name": "Normal" },
{ "value": "plan", "name": "Plan" }
]
}),
json!({
"id": "thought_level",
"name": "Thought Level",
"category": "thought_level",
"type": "select",
"currentValue": "low",
"options": [
{ "value": "low", "name": "Low" },
{ "value": "medium", "name": "Medium" },
{ "value": "high", "name": "High" }
]
}),
],
AgentId::Mock => vec![json!({
"id": "model",
"name": "Model",
"category": "model",
"type": "select",
"currentValue": "mock",
"options": [
{ "value": "mock", "name": "Mock" }
]
})],
}
}
/// Parse an agent config JSON file (from `scripts/agent-configs/resources/`) into
/// ACP `SessionConfigOption` values. The JSON format is:
/// ```json
/// {
/// "defaultModel": "...", "models": [{id, name}],
/// "defaultMode?": "...", "modes?": [{id, name}],
/// "defaultThoughtLevel?": "...", "thoughtLevels?": [{id, name}]
/// }
/// { "defaultModel": "...", "models": [{id, name}], "defaultMode?": "...", "modes?": [{id, name}] }
/// ```
///
/// Note: Claude and Codex don't report configOptions from `session/new`, so these
/// JSON resource files are the source of truth for the capabilities report.
/// Claude modes (plan, default) were discovered via manual ACP probing —
/// `session/set_mode` works but `session/set_config_option` is not implemented.
/// Codex modes/thought levels were discovered from its `session/new` response.
fn parse_agent_config(json_str: &str) -> Vec<Value> {
#[derive(serde::Deserialize)]
struct AgentConfig {
#[serde(rename = "defaultModel")]
default_model: String,
models: Vec<ConfigEntry>,
models: Vec<ModelEntry>,
#[serde(rename = "defaultMode")]
default_mode: Option<String>,
modes: Option<Vec<ConfigEntry>>,
#[serde(rename = "defaultThoughtLevel")]
default_thought_level: Option<String>,
#[serde(rename = "thoughtLevels")]
thought_levels: Option<Vec<ConfigEntry>>,
modes: Option<Vec<ModeEntry>>,
}
#[derive(serde::Deserialize)]
struct ConfigEntry {
struct ModelEntry {
id: String,
name: String,
}
#[derive(serde::Deserialize)]
struct ModeEntry {
id: String,
name: String,
}
@ -275,7 +185,7 @@ fn parse_agent_config(json_str: &str) -> Vec<Value> {
"name": "Mode",
"category": "mode",
"type": "select",
"currentValue": config.default_mode.or_else(|| modes.first().map(|m| m.id.clone())).unwrap_or_default(),
"currentValue": config.default_mode.unwrap_or_else(|| modes[0].id.clone()),
"options": modes.iter().map(|m| json!({
"value": m.id,
"name": m.name,
@ -283,20 +193,6 @@ fn parse_agent_config(json_str: &str) -> Vec<Value> {
}));
}
if let Some(thought_levels) = config.thought_levels {
options.push(json!({
"id": "thought_level",
"name": "Thought Level",
"category": "thought_level",
"type": "select",
"currentValue": config.default_thought_level.or_else(|| thought_levels.first().map(|t| t.id.clone())).unwrap_or_default(),
"options": thought_levels.iter().map(|t| json!({
"value": t.id,
"name": t.name,
})).collect::<Vec<_>>(),
}));
}
options
}
@ -601,17 +497,8 @@ pub(super) fn problem_from_sandbox_error(error: &SandboxError) -> ProblemDetails
let mut problem = error.to_problem_details();
match error {
SandboxError::InvalidRequest { message } => {
if message.starts_with("input payload exceeds maxInputBytesPerRequest") {
problem.status = 413;
problem.title = "Payload Too Large".to_string();
} else {
problem.status = 400;
}
}
SandboxError::NotFound { .. } => {
problem.status = 404;
problem.title = "Not Found".to_string();
SandboxError::InvalidRequest { .. } => {
problem.status = 400;
}
SandboxError::Timeout { .. } => {
problem.status = 504;

View file

@ -362,173 +362,3 @@ pub struct AcpEnvelope {
#[serde(default)]
pub error: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessConfig {
pub max_concurrent_processes: usize,
pub default_run_timeout_ms: u64,
pub max_run_timeout_ms: u64,
pub max_output_bytes: usize,
pub max_log_bytes_per_process: usize,
pub max_input_bytes_per_request: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessCreateRequest {
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub env: BTreeMap<String, String>,
#[serde(default)]
pub tty: bool,
#[serde(default)]
pub interactive: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessRunRequest {
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub env: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_output_bytes: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessRunResponse {
pub exit_code: Option<i32>,
pub timed_out: bool,
pub stdout: String,
pub stderr: String,
pub stdout_truncated: bool,
pub stderr_truncated: bool,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessState {
Running,
Exited,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInfo {
pub id: String,
pub command: String,
pub args: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
pub tty: bool,
pub interactive: bool,
pub status: ProcessState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub exit_code: Option<i32>,
pub created_at_ms: i64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub exited_at_ms: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessListResponse {
pub processes: Vec<ProcessInfo>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessLogsStream {
Stdout,
Stderr,
Combined,
Pty,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessLogsQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub stream: Option<ProcessLogsStream>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tail: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub follow: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub since: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessLogEntry {
pub sequence: u64,
pub stream: ProcessLogsStream,
pub timestamp_ms: i64,
pub data: String,
pub encoding: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessLogsResponse {
pub process_id: String,
pub stream: ProcessLogsStream,
pub entries: Vec<ProcessLogEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInputRequest {
pub data: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub encoding: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInputResponse {
pub bytes_written: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessSignalQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wait_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessTerminalResizeRequest {
pub cols: u16,
pub rows: u16,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessTerminalResizeResponse {
pub cols: u16,
pub rows: u16,
}
#[derive(Debug, Clone, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessWsQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub access_token: Option<String>,
}

View file

@ -1,6 +1,6 @@
use std::fs;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::time::Duration;
@ -14,8 +14,6 @@ use sandbox_agent_agent_management::agents::AgentManager;
use serde_json::{json, Value};
use serial_test::serial;
use tempfile::TempDir;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tower::util::ServiceExt;
struct TestApp {
@ -50,56 +48,6 @@ struct EnvVarGuard {
previous: Option<std::ffi::OsString>,
}
struct LiveServer {
address: SocketAddr,
shutdown_tx: Option<oneshot::Sender<()>>,
task: JoinHandle<()>,
}
impl LiveServer {
async fn spawn(app: Router) -> Self {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind live server");
let address = listener.local_addr().expect("live server address");
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let task = tokio::spawn(async move {
let server =
axum::serve(listener, app.into_make_service()).with_graceful_shutdown(async {
let _ = shutdown_rx.await;
});
let _ = server.await;
});
Self {
address,
shutdown_tx: Some(shutdown_tx),
task,
}
}
fn http_url(&self, path: &str) -> String {
format!("http://{}{}", self.address, path)
}
fn ws_url(&self, path: &str) -> String {
format!("ws://{}{}", self.address, path)
}
async fn shutdown(mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
let _ = tokio::time::timeout(Duration::from_secs(3), async {
let _ = self.task.await;
})
.await;
}
}
impl EnvVarGuard {
fn set(key: &'static str, value: &str) -> Self {
let previous = std::env::var_os(key);
@ -135,38 +83,6 @@ fn write_executable(path: &Path, script: &str) {
}
}
fn write_fake_npm(path: &Path) {
write_executable(
path,
r#"#!/usr/bin/env sh
set -e
prefix=""
while [ "$#" -gt 0 ]; do
case "$1" in
install|--no-audit|--no-fund)
shift
;;
--prefix)
prefix="$2"
shift 2
;;
*)
shift
;;
esac
done
[ -n "$prefix" ] || exit 1
mkdir -p "$prefix/node_modules/.bin"
for bin in claude-code-acp codex-acp amp-acp pi-acp cursor-agent-acp; do
echo '#!/usr/bin/env sh' > "$prefix/node_modules/.bin/$bin"
echo 'exit 0' >> "$prefix/node_modules/.bin/$bin"
chmod +x "$prefix/node_modules/.bin/$bin"
done
exit 0
"#,
);
}
fn serve_registry_once(document: Value) -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind registry server");
let address = listener.local_addr().expect("registry address");
@ -375,5 +291,3 @@ mod acp_transport;
mod config_endpoints;
#[path = "v1_api/control_plane.rs"]
mod control_plane;
#[path = "v1_api/processes.rs"]
mod processes;

View file

@ -182,7 +182,10 @@ async fn lazy_install_runs_on_first_bootstrap() {
.expect("create agent processes dir");
write_executable(&install_path.join("codex"), "#!/usr/bin/env sh\nexit 0\n");
fs::create_dir_all(install_path.join("bin")).expect("create bin dir");
write_fake_npm(&install_path.join("bin").join("npm"));
write_executable(
&install_path.join("bin").join("npx"),
"#!/usr/bin/env sh\nwhile IFS= read -r _line; do :; done\n",
);
});
let original_path = std::env::var_os("PATH").unwrap_or_default();

View file

@ -1,654 +0,0 @@
use super::*;
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::Engine;
use futures::{SinkExt, StreamExt};
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
async fn wait_for_exited(test_app: &TestApp, process_id: &str) {
for _ in 0..30 {
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
if parsed["status"] == "exited" {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
panic!("process did not exit in time");
}
fn decode_log_entries(entries: &[Value]) -> String {
entries
.iter()
.filter_map(|entry| entry.get("data").and_then(Value::as_str))
.filter_map(|encoded| BASE64.decode(encoded).ok())
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
.collect::<Vec<_>>()
.join("")
}
async fn recv_ws_message(
ws: &mut tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
) -> Message {
tokio::time::timeout(Duration::from_secs(3), ws.next())
.await
.expect("timed out waiting for websocket frame")
.expect("websocket stream ended")
.expect("websocket frame")
}
#[tokio::test]
async fn v1_processes_config_round_trip() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/processes/config",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["maxConcurrentProcesses"], 64);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/config",
Some(json!({
"maxConcurrentProcesses": 8,
"defaultRunTimeoutMs": 1000,
"maxRunTimeoutMs": 5000,
"maxOutputBytes": 4096,
"maxLogBytesPerProcess": 32768,
"maxInputBytesPerRequest": 1024
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
assert_eq!(parsed["maxConcurrentProcesses"], 8);
assert_eq!(parsed["defaultRunTimeoutMs"], 1000);
}
#[tokio::test]
async fn v1_process_lifecycle_requires_stop_before_delete() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "sh",
"args": ["-lc", "sleep 30"],
"tty": false,
"interactive": false
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, body) = send_request(
&test_app.app,
Method::DELETE,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::CONFLICT);
assert_eq!(parse_json(&body)["status"], 409);
let (status, _, _body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/stop"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
wait_for_exited(&test_app, &process_id).await;
let (status, _, _) = send_request(
&test_app.app,
Method::DELETE,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn v1_process_run_returns_output_and_timeout() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/run",
Some(json!({
"command": "sh",
"args": ["-lc", "echo hi"],
"timeoutMs": 1000
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
assert_eq!(parsed["timedOut"], false);
assert_eq!(parsed["exitCode"], 0);
assert!(parsed["stdout"].as_str().unwrap_or_default().contains("hi"));
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/run",
Some(json!({
"command": "sh",
"args": ["-lc", "sleep 2"],
"timeoutMs": 50
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["timedOut"], true);
}
#[tokio::test]
async fn v1_process_run_reports_truncation() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/run",
Some(json!({
"command": "sh",
"args": ["-lc", "printf 'abcdefghijklmnopqrstuvwxyz'"],
"maxOutputBytes": 5
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
assert_eq!(parsed["stdoutTruncated"], true);
assert_eq!(parsed["stderrTruncated"], false);
assert_eq!(parsed["stdout"].as_str().unwrap_or_default().len(), 5);
}
#[tokio::test]
async fn v1_process_tty_input_and_logs() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "cat",
"tty": true,
"interactive": true
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, _body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/input"),
Some(json!({
"data": "aGVsbG8K",
"encoding": "base64"
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
tokio::time::sleep(Duration::from_millis(150)).await;
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/logs?stream=pty&tail=20"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let entries = parse_json(&body)["entries"]
.as_array()
.cloned()
.unwrap_or_default();
assert!(!entries.is_empty());
let (status, _, _body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/kill"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
wait_for_exited(&test_app, &process_id).await;
let (status, _, _) = send_request(
&test_app.app,
Method::DELETE,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn v1_process_not_found_returns_404() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/processes/does-not-exist",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_eq!(parse_json(&body)["status"], 404);
}
#[tokio::test]
async fn v1_process_input_limit_returns_413() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, _) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/config",
Some(json!({
"maxConcurrentProcesses": 8,
"defaultRunTimeoutMs": 1000,
"maxRunTimeoutMs": 5000,
"maxOutputBytes": 4096,
"maxLogBytesPerProcess": 32768,
"maxInputBytesPerRequest": 4
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "cat",
"tty": true,
"interactive": true
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/input"),
Some(json!({
"data": "aGVsbG8=",
"encoding": "base64"
})),
&[],
)
.await;
assert_eq!(status, StatusCode::PAYLOAD_TOO_LARGE);
assert_eq!(parse_json(&body)["status"], 413);
}
#[tokio::test]
async fn v1_tty_process_is_real_terminal() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "sh",
"args": ["-lc", "tty"],
"tty": true,
"interactive": false
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
wait_for_exited(&test_app, &process_id).await;
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/logs?stream=pty"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let entries = parse_json(&body)["entries"]
.as_array()
.cloned()
.unwrap_or_default();
let joined = decode_log_entries(&entries);
assert!(!joined.to_lowercase().contains("not a tty"));
assert!(joined.contains("/dev/"));
}
#[tokio::test]
async fn v1_process_logs_follow_sse_streams_entries() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "sh",
"args": ["-lc", "echo first; sleep 0.3; echo second"],
"tty": false,
"interactive": false
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let request = Request::builder()
.method(Method::GET)
.uri(format!(
"/v1/processes/{process_id}/logs?stream=stdout&follow=true"
))
.body(Body::empty())
.expect("build request");
let response = test_app
.app
.clone()
.oneshot(request)
.await
.expect("sse response");
assert_eq!(response.status(), StatusCode::OK);
let mut stream = response.into_body().into_data_stream();
let chunk = tokio::time::timeout(Duration::from_secs(5), async move {
while let Some(chunk) = stream.next().await {
let bytes = chunk.expect("stream chunk");
let text = String::from_utf8_lossy(&bytes).to_string();
if text.contains("data:") {
return text;
}
}
panic!("SSE stream ended before log chunk");
})
.await
.expect("timed out reading process log sse");
let payload = parse_sse_data(&chunk);
assert!(payload["sequence"].as_u64().is_some());
assert_eq!(payload["stream"], "stdout");
}
#[tokio::test]
async fn v1_access_token_query_only_allows_terminal_ws() {
let test_app = TestApp::new(AuthConfig::with_token("secret-token".to_string()));
let (status, _, _) = send_request(
&test_app.app,
Method::GET,
"/v1/health?access_token=secret-token",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::UNAUTHORIZED);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "cat",
"tty": true,
"interactive": true
})),
&[("authorization", "Bearer secret-token")],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, _) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/terminal/ws"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::UNAUTHORIZED);
let (status, _, _) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/terminal/ws?access_token=secret-token"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn v1_process_terminal_ws_e2e_is_deterministic() {
let test_app = TestApp::new(AuthConfig::disabled());
let live_server = LiveServer::spawn(test_app.app.clone()).await;
let http = reqwest::Client::new();
let create_response = http
.post(live_server.http_url("/v1/processes"))
.json(&json!({
"command": "sh",
"args": ["-lc", "stty -echo; IFS= read -r line; printf 'got:%s\\n' \"$line\""],
"tty": true,
"interactive": true
}))
.send()
.await
.expect("create process response");
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
let create_body: Value = create_response.json().await.expect("create process json");
let process_id = create_body["id"].as_str().expect("process id").to_string();
let ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
let (mut ws, _) = connect_async(&ws_url).await.expect("connect websocket");
let ready = recv_ws_message(&mut ws).await;
let ready_payload: Value =
serde_json::from_str(ready.to_text().expect("ready text frame")).expect("ready json");
assert_eq!(ready_payload["type"], "ready");
assert_eq!(ready_payload["processId"], process_id);
ws.send(Message::Text(
json!({
"type": "input",
"data": "hello from ws\n"
})
.to_string(),
))
.await
.expect("send input frame");
let mut saw_binary_output = false;
let mut saw_exit = false;
for _ in 0..10 {
let frame = recv_ws_message(&mut ws).await;
match frame {
Message::Binary(bytes) => {
let text = String::from_utf8_lossy(&bytes);
if text.contains("got:hello from ws") {
saw_binary_output = true;
}
}
Message::Text(text) => {
let payload: Value = serde_json::from_str(&text).expect("ws json");
if payload["type"] == "exit" {
saw_exit = true;
break;
}
assert_ne!(payload["type"], "error");
}
Message::Close(_) => break,
Message::Ping(_) | Message::Pong(_) => {}
_ => {}
}
}
assert!(
saw_binary_output,
"expected pty binary output over websocket"
);
assert!(saw_exit, "expected exit control frame over websocket");
let _ = ws.close(None).await;
let delete_response = http
.delete(live_server.http_url(&format!("/v1/processes/{process_id}")))
.send()
.await
.expect("delete process response");
assert_eq!(delete_response.status(), reqwest::StatusCode::NO_CONTENT);
live_server.shutdown().await;
}
#[tokio::test]
async fn v1_process_terminal_ws_auth_e2e() {
let token = "secret-token";
let test_app = TestApp::new(AuthConfig::with_token(token.to_string()));
let live_server = LiveServer::spawn(test_app.app.clone()).await;
let http = reqwest::Client::new();
let create_response = http
.post(live_server.http_url("/v1/processes"))
.bearer_auth(token)
.json(&json!({
"command": "cat",
"tty": true,
"interactive": true
}))
.send()
.await
.expect("create process response");
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
let create_body: Value = create_response.json().await.expect("create process json");
let process_id = create_body["id"].as_str().expect("process id").to_string();
let unauth_ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
let unauth_err = connect_async(&unauth_ws_url)
.await
.expect_err("unauthenticated websocket handshake should fail");
match unauth_err {
tokio_tungstenite::tungstenite::Error::Http(response) => {
assert_eq!(response.status().as_u16(), 401);
}
other => panic!("unexpected websocket auth error: {other:?}"),
}
let auth_ws_url = live_server.ws_url(&format!(
"/v1/processes/{process_id}/terminal/ws?access_token={token}"
));
let (mut ws, _) = connect_async(&auth_ws_url)
.await
.expect("authenticated websocket handshake");
let ready = recv_ws_message(&mut ws).await;
let ready_payload: Value =
serde_json::from_str(ready.to_text().expect("ready text frame")).expect("ready json");
assert_eq!(ready_payload["type"], "ready");
assert_eq!(ready_payload["processId"], process_id);
let _ = ws
.send(Message::Text(json!({ "type": "close" }).to_string()))
.await;
let _ = ws.close(None).await;
let kill_response = http
.post(live_server.http_url(&format!("/v1/processes/{process_id}/kill?waitMs=1000")))
.bearer_auth(token)
.send()
.await
.expect("kill process response");
assert_eq!(kill_response.status(), reqwest::StatusCode::OK);
let delete_response = http
.delete(live_server.http_url(&format!("/v1/processes/{process_id}")))
.bearer_auth(token)
.send()
.await
.expect("delete process response");
assert_eq!(delete_response.status(), reqwest::StatusCode::NO_CONTENT);
live_server.shutdown().await;
}